kumo_api_client/
lib.rs

1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6pub use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14    endpoint: Url,
15    timeout: Duration,
16}
17
18macro_rules! method {
19    ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21            self.request_with_json_response(
22                reqwest::Method::POST,
23                self.endpoint.join($path)?,
24                params,
25            )
26            .await
27        }
28    };
29
30    ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32            self.request_with_text_response(
33                reqwest::Method::DELETE,
34                self.endpoint.join($path)?,
35                params,
36            )
37            .await
38        }
39    };
40
41    ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43            self.request_with_text_response(
44                reqwest::Method::POST,
45                self.endpoint.join($path)?,
46                params,
47            )
48            .await
49        }
50    };
51
52    ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53        pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54            let mut url = self.endpoint.join($path)?;
55            get_params.apply_to_url(&mut url);
56
57            self.request_with_json_response(reqwest::Method::GET, url, &())
58                .await
59        }
60    };
61
62    ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63        pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64            self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65                .await
66        }
67    };
68}
69
70impl KumoApiClient {
71    pub fn new(endpoint: Url) -> Self {
72        Self {
73            endpoint,
74            timeout: DEFAULT_TIMEOUT,
75        }
76    }
77
78    fn client_builder(&self) -> reqwest::ClientBuilder {
79        reqwest::Client::builder().timeout(self.timeout)
80    }
81
82    method!(
83        admin_bounce_v1,
84        POST,
85        "/api/admin/bounce/v1",
86        BounceV1Request,
87        BounceV1Response
88    );
89
90    method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92    method!(
93        admin_bounce_list_v1,
94        GET,
95        "/api/admin/bounce/v1",
96        Vec<BounceV1ListEntry>
97    );
98
99    method!(
100        admin_bounce_cancel_v1,
101        TEXT,
102        DELETE,
103        "/api/admin/bounce/v1",
104        BounceV1CancelRequest
105    );
106
107    method!(
108        admin_spool_compact_v1,
109        TEXT,
110        POST,
111        "/api/admin/spool-compact/v1",
112        SpoolCompactV1Request
113    );
114
115    method!(
116        admin_inspect_sched_q_v1,
117        GET,
118        "/api/admin/inspect-sched-q/v1",
119        InspectQueueV1Request,
120        InspectQueueV1Response
121    );
122
123    method!(
124        admin_inspect_message_v1,
125        GET,
126        "/api/admin/inspect-message/v1",
127        InspectMessageV1Request,
128        InspectMessageV1Response
129    );
130
131    method!(
132        admin_xfer_v1,
133        POST,
134        "/api/admin/xfer/v1",
135        XferV1Request,
136        XferV1Response
137    );
138
139    method!(
140        admin_suspend_list_v1,
141        GET,
142        "/api/admin/suspend/v1",
143        Vec<SuspendV1ListEntry>
144    );
145
146    method!(
147        admin_suspend_ready_q_list_v1,
148        GET,
149        "/api/admin/suspend-ready-q/v1",
150        Vec<SuspendReadyQueueV1ListEntry>
151    );
152
153    method!(
154        admin_xfer_cancel_v1,
155        POST,
156        "/api/admin/xfer/cancel/v1",
157        XferCancelV1Request,
158        XferCancelV1Response
159    );
160
161    method!(
162        admin_rebind_v1,
163        POST,
164        "/api/admin/rebind/v1",
165        RebindV1Request,
166        RebindV1Response
167    );
168
169    method!(
170        admin_suspend_ready_q_v1,
171        POST,
172        "/api/admin/suspend-ready-q/v1",
173        SuspendReadyQueueV1Request,
174        SuspendV1Response
175    );
176
177    method!(
178        admin_suspend_ready_q_cancel_v1,
179        TEXT,
180        DELETE,
181        "/api/admin/suspend-ready-q/v1",
182        SuspendV1CancelRequest
183    );
184
185    method!(
186        admin_suspend_v1,
187        POST,
188        "/api/admin/suspend/v1",
189        SuspendV1Request,
190        SuspendV1Response
191    );
192
193    method!(
194        admin_suspend_cancel_v1,
195        TEXT,
196        DELETE,
197        "/api/admin/suspend/v1",
198        SuspendV1CancelRequest
199    );
200
201    method!(
202        admin_ready_q_states_v1,
203        GET,
204        "/api/admin/ready-q-states/v1",
205        ReadyQueueStateRequest,
206        ReadyQueueStateResponse
207    );
208
209    method!(
210        admin_inspect_ready_q_v1,
211        GET,
212        "/api/admin/inspect-ready-q/v1",
213        InspectReadyQV1Request,
214        InspectReadyQV1Response
215    );
216
217    method!(
218        admin_resolve_egress_path_v1,
219        GET,
220        "/api/admin/resolve-egress-path/v1",
221        ResolveEgressPathV1Request,
222        ResolveEgressPathV1Response
223    );
224
225    method!(
226        admin_abort_ready_q_conn_v1,
227        TEXT,
228        POST,
229        "/api/admin/abort-ready-q-conn/v1",
230        AbortReadyQConnV1Request
231    );
232
233    method!(
234        admin_set_diagnostic_log_filter_v1,
235        TEXT,
236        POST,
237        "/api/admin/set_diagnostic_log_filter/v1",
238        SetDiagnosticFilterRequest
239    );
240
241    pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
242        &self,
243        method: reqwest::Method,
244        url: T,
245        body: &B,
246    ) -> anyhow::Result<String> {
247        let response = self
248            .client_builder()
249            .build()?
250            .request(method, url)
251            .json(body)
252            .send()
253            .await?;
254
255        let status = response.status();
256        let body_bytes = response.bytes().await.with_context(|| {
257            format!(
258                "request status {}: {}, and failed to read response body",
259                status.as_u16(),
260                status.canonical_reason().unwrap_or("")
261            )
262        })?;
263        let body_text = String::from_utf8_lossy(&body_bytes);
264        if !status.is_success() {
265            anyhow::bail!(
266                "request status {}: {}. Response body: {body_text}",
267                status.as_u16(),
268                status.canonical_reason().unwrap_or(""),
269            );
270        }
271
272        Ok(body_text.to_string())
273    }
274
275    pub async fn admin_bump_config_epoch(&self) -> anyhow::Result<String> {
276        self.request_with_text_response(
277            reqwest::Method::POST,
278            self.endpoint.join("/api/admin/bump-config-epoch")?,
279            &(),
280        )
281        .await
282    }
283
284    /// Inject a message via the HTTP injection API.
285    /// The body is the JSON payload conforming to the InjectV1Request schema.
286    pub async fn inject_v1(
287        &self,
288        body: &impl serde::Serialize,
289    ) -> anyhow::Result<InjectV1Response> {
290        self.request_with_json_response(
291            reqwest::Method::POST,
292            self.endpoint.join("/api/inject/v1")?,
293            body,
294        )
295        .await
296    }
297
298    pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
299        &self,
300        method: reqwest::Method,
301        url: T,
302        body: &B,
303    ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
304        let response = self
305            .client_builder()
306            .build()?
307            .request(method, url)
308            .json(body)
309            .send()
310            .await?;
311
312        let status = response.status();
313        if !status.is_success() {
314            let body_bytes = response.bytes().await.with_context(|| {
315                format!(
316                    "request status {}: {}, and failed to read response body",
317                    status.as_u16(),
318                    status.canonical_reason().unwrap_or("")
319                )
320            })?;
321            let body_text = String::from_utf8_lossy(&body_bytes);
322            anyhow::bail!(
323                "request status {}: {}. Response body: {body_text}",
324                status.as_u16(),
325                status.canonical_reason().unwrap_or(""),
326            );
327        }
328
329        Ok(response.bytes_stream())
330    }
331
332    pub async fn request_with_json_response<
333        T: reqwest::IntoUrl,
334        B: serde::Serialize,
335        R: serde::de::DeserializeOwned,
336    >(
337        &self,
338        method: reqwest::Method,
339        url: T,
340        body: &B,
341    ) -> anyhow::Result<R> {
342        let response = self
343            .client_builder()
344            .build()?
345            .request(method, url)
346            .json(body)
347            .send()
348            .await?;
349
350        let status = response.status();
351        if !status.is_success() {
352            let body_bytes = response.bytes().await.with_context(|| {
353                format!(
354                    "request status {}: {}, and failed to read response body",
355                    status.as_u16(),
356                    status.canonical_reason().unwrap_or("")
357                )
358            })?;
359            anyhow::bail!(
360                "request status {}: {}. Response body: {}",
361                status.as_u16(),
362                status.canonical_reason().unwrap_or(""),
363                String::from_utf8_lossy(&body_bytes)
364            );
365        }
366        json_body(response).await.with_context(|| {
367            format!(
368                "request status {}: {}",
369                status.as_u16(),
370                status.canonical_reason().unwrap_or("")
371            )
372        })
373    }
374
375    pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
376        &self,
377        mut filter_map: F,
378    ) -> anyhow::Result<Vec<T>> {
379        let mut parser = kumo_prometheus::parser::Parser::new();
380        let mut stream = self
381            .request_with_streaming_text_response(
382                reqwest::Method::GET,
383                self.endpoint.join("/metrics")?,
384                &(),
385            )
386            .await?;
387
388        let mut result = vec![];
389        while let Some(item) = stream.next().await {
390            let bytes = item?;
391            parser.push_bytes(bytes, false, |m| {
392                if let Some(r) = (filter_map)(&m) {
393                    result.push(r);
394                }
395            })?;
396        }
397
398        Ok(result)
399    }
400}
401
402pub async fn json_body<T: serde::de::DeserializeOwned>(
403    response: reqwest::Response,
404) -> anyhow::Result<T> {
405    let data = response.bytes().await.context("ready response body")?;
406    serde_json::from_slice(&data).with_context(|| {
407        format!(
408            "parsing response as json: {}",
409            String::from_utf8_lossy(&data)
410        )
411    })
412}