1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6pub use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14 endpoint: Url,
15 timeout: Duration,
16}
17
18macro_rules! method {
19 ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21 self.request_with_json_response(
22 reqwest::Method::POST,
23 self.endpoint.join($path)?,
24 params,
25 )
26 .await
27 }
28 };
29
30 ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32 self.request_with_text_response(
33 reqwest::Method::DELETE,
34 self.endpoint.join($path)?,
35 params,
36 )
37 .await
38 }
39 };
40
41 ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43 self.request_with_text_response(
44 reqwest::Method::POST,
45 self.endpoint.join($path)?,
46 params,
47 )
48 .await
49 }
50 };
51
52 ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53 pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54 let mut url = self.endpoint.join($path)?;
55 get_params.apply_to_url(&mut url);
56
57 self.request_with_json_response(reqwest::Method::GET, url, &())
58 .await
59 }
60 };
61
62 ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63 pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64 self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65 .await
66 }
67 };
68}
69
70impl KumoApiClient {
71 pub fn new(endpoint: Url) -> Self {
72 Self {
73 endpoint,
74 timeout: DEFAULT_TIMEOUT,
75 }
76 }
77
78 fn client_builder(&self) -> reqwest::ClientBuilder {
79 reqwest::Client::builder().timeout(self.timeout)
80 }
81
82 method!(
83 admin_bounce_v1,
84 POST,
85 "/api/admin/bounce/v1",
86 BounceV1Request,
87 BounceV1Response
88 );
89
90 method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92 method!(
93 admin_bounce_list_v1,
94 GET,
95 "/api/admin/bounce/v1",
96 Vec<BounceV1ListEntry>
97 );
98
99 method!(
100 admin_bounce_cancel_v1,
101 TEXT,
102 DELETE,
103 "/api/admin/bounce/v1",
104 BounceV1CancelRequest
105 );
106
107 method!(
108 admin_spool_compact_v1,
109 TEXT,
110 POST,
111 "/api/admin/spool-compact/v1",
112 SpoolCompactV1Request
113 );
114
115 method!(
116 admin_inspect_sched_q_v1,
117 GET,
118 "/api/admin/inspect-sched-q/v1",
119 InspectQueueV1Request,
120 InspectQueueV1Response
121 );
122
123 method!(
124 admin_inspect_message_v1,
125 GET,
126 "/api/admin/inspect-message/v1",
127 InspectMessageV1Request,
128 InspectMessageV1Response
129 );
130
131 method!(
132 admin_xfer_v1,
133 POST,
134 "/api/admin/xfer/v1",
135 XferV1Request,
136 XferV1Response
137 );
138
139 method!(
140 admin_suspend_list_v1,
141 GET,
142 "/api/admin/suspend/v1",
143 Vec<SuspendV1ListEntry>
144 );
145
146 method!(
147 admin_suspend_ready_q_list_v1,
148 GET,
149 "/api/admin/suspend-ready-q/v1",
150 Vec<SuspendReadyQueueV1ListEntry>
151 );
152
153 method!(
154 admin_xfer_cancel_v1,
155 POST,
156 "/api/admin/xfer/cancel/v1",
157 XferCancelV1Request,
158 XferCancelV1Response
159 );
160
161 method!(
162 admin_rebind_v1,
163 POST,
164 "/api/admin/rebind/v1",
165 RebindV1Request,
166 RebindV1Response
167 );
168
169 method!(
170 admin_suspend_ready_q_v1,
171 POST,
172 "/api/admin/suspend-ready-q/v1",
173 SuspendReadyQueueV1Request,
174 SuspendV1Response
175 );
176
177 method!(
178 admin_suspend_ready_q_cancel_v1,
179 TEXT,
180 DELETE,
181 "/api/admin/suspend-ready-q/v1",
182 SuspendV1CancelRequest
183 );
184
185 method!(
186 admin_suspend_v1,
187 POST,
188 "/api/admin/suspend/v1",
189 SuspendV1Request,
190 SuspendV1Response
191 );
192
193 method!(
194 admin_suspend_cancel_v1,
195 TEXT,
196 DELETE,
197 "/api/admin/suspend/v1",
198 SuspendV1CancelRequest
199 );
200
201 method!(
202 admin_ready_q_states_v1,
203 GET,
204 "/api/admin/ready-q-states/v1",
205 ReadyQueueStateRequest,
206 ReadyQueueStateResponse
207 );
208
209 method!(
210 admin_inspect_ready_q_v1,
211 GET,
212 "/api/admin/inspect-ready-q/v1",
213 InspectReadyQV1Request,
214 InspectReadyQV1Response
215 );
216
217 method!(
218 admin_resolve_egress_path_v1,
219 GET,
220 "/api/admin/resolve-egress-path/v1",
221 ResolveEgressPathV1Request,
222 ResolveEgressPathV1Response
223 );
224
225 method!(
226 admin_abort_ready_q_conn_v1,
227 TEXT,
228 POST,
229 "/api/admin/abort-ready-q-conn/v1",
230 AbortReadyQConnV1Request
231 );
232
233 method!(
234 admin_set_diagnostic_log_filter_v1,
235 TEXT,
236 POST,
237 "/api/admin/set_diagnostic_log_filter/v1",
238 SetDiagnosticFilterRequest
239 );
240
241 pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
242 &self,
243 method: reqwest::Method,
244 url: T,
245 body: &B,
246 ) -> anyhow::Result<String> {
247 let response = self
248 .client_builder()
249 .build()?
250 .request(method, url)
251 .json(body)
252 .send()
253 .await?;
254
255 let status = response.status();
256 let body_bytes = response.bytes().await.with_context(|| {
257 format!(
258 "request status {}: {}, and failed to read response body",
259 status.as_u16(),
260 status.canonical_reason().unwrap_or("")
261 )
262 })?;
263 let body_text = String::from_utf8_lossy(&body_bytes);
264 if !status.is_success() {
265 anyhow::bail!(
266 "request status {}: {}. Response body: {body_text}",
267 status.as_u16(),
268 status.canonical_reason().unwrap_or(""),
269 );
270 }
271
272 Ok(body_text.to_string())
273 }
274
275 pub async fn admin_bump_config_epoch(&self) -> anyhow::Result<String> {
276 self.request_with_text_response(
277 reqwest::Method::POST,
278 self.endpoint.join("/api/admin/bump-config-epoch")?,
279 &(),
280 )
281 .await
282 }
283
284 pub async fn inject_v1(
287 &self,
288 body: &impl serde::Serialize,
289 ) -> anyhow::Result<InjectV1Response> {
290 self.request_with_json_response(
291 reqwest::Method::POST,
292 self.endpoint.join("/api/inject/v1")?,
293 body,
294 )
295 .await
296 }
297
298 pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
299 &self,
300 method: reqwest::Method,
301 url: T,
302 body: &B,
303 ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
304 let response = self
305 .client_builder()
306 .build()?
307 .request(method, url)
308 .json(body)
309 .send()
310 .await?;
311
312 let status = response.status();
313 if !status.is_success() {
314 let body_bytes = response.bytes().await.with_context(|| {
315 format!(
316 "request status {}: {}, and failed to read response body",
317 status.as_u16(),
318 status.canonical_reason().unwrap_or("")
319 )
320 })?;
321 let body_text = String::from_utf8_lossy(&body_bytes);
322 anyhow::bail!(
323 "request status {}: {}. Response body: {body_text}",
324 status.as_u16(),
325 status.canonical_reason().unwrap_or(""),
326 );
327 }
328
329 Ok(response.bytes_stream())
330 }
331
332 pub async fn request_with_json_response<
333 T: reqwest::IntoUrl,
334 B: serde::Serialize,
335 R: serde::de::DeserializeOwned,
336 >(
337 &self,
338 method: reqwest::Method,
339 url: T,
340 body: &B,
341 ) -> anyhow::Result<R> {
342 let response = self
343 .client_builder()
344 .build()?
345 .request(method, url)
346 .json(body)
347 .send()
348 .await?;
349
350 let status = response.status();
351 if !status.is_success() {
352 let body_bytes = response.bytes().await.with_context(|| {
353 format!(
354 "request status {}: {}, and failed to read response body",
355 status.as_u16(),
356 status.canonical_reason().unwrap_or("")
357 )
358 })?;
359 anyhow::bail!(
360 "request status {}: {}. Response body: {}",
361 status.as_u16(),
362 status.canonical_reason().unwrap_or(""),
363 String::from_utf8_lossy(&body_bytes)
364 );
365 }
366 json_body(response).await.with_context(|| {
367 format!(
368 "request status {}: {}",
369 status.as_u16(),
370 status.canonical_reason().unwrap_or("")
371 )
372 })
373 }
374
375 pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
376 &self,
377 mut filter_map: F,
378 ) -> anyhow::Result<Vec<T>> {
379 let mut parser = kumo_prometheus::parser::Parser::new();
380 let mut stream = self
381 .request_with_streaming_text_response(
382 reqwest::Method::GET,
383 self.endpoint.join("/metrics")?,
384 &(),
385 )
386 .await?;
387
388 let mut result = vec![];
389 while let Some(item) = stream.next().await {
390 let bytes = item?;
391 parser.push_bytes(bytes, false, |m| {
392 if let Some(r) = (filter_map)(&m) {
393 result.push(r);
394 }
395 })?;
396 }
397
398 Ok(result)
399 }
400}
401
402pub async fn json_body<T: serde::de::DeserializeOwned>(
403 response: reqwest::Response,
404) -> anyhow::Result<T> {
405 let data = response.bytes().await.context("ready response body")?;
406 serde_json::from_slice(&data).with_context(|| {
407 format!(
408 "parsing response as json: {}",
409 String::from_utf8_lossy(&data)
410 )
411 })
412}