kumo_api_client/
lib.rs

1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14    endpoint: Url,
15    timeout: Duration,
16}
17
18macro_rules! method {
19    ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21            self.request_with_json_response(
22                reqwest::Method::POST,
23                self.endpoint.join($path)?,
24                params,
25            )
26            .await
27        }
28    };
29
30    ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32            self.request_with_text_response(
33                reqwest::Method::DELETE,
34                self.endpoint.join($path)?,
35                params,
36            )
37            .await
38        }
39    };
40
41    ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43            self.request_with_text_response(
44                reqwest::Method::POST,
45                self.endpoint.join($path)?,
46                params,
47            )
48            .await
49        }
50    };
51
52    ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53        pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54            let mut url = self.endpoint.join($path)?;
55            get_params.apply_to_url(&mut url);
56
57            self.request_with_json_response(reqwest::Method::GET, url, &())
58                .await
59        }
60    };
61
62    ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63        pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64            self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65                .await
66        }
67    };
68}
69
70impl KumoApiClient {
71    pub fn new(endpoint: Url) -> Self {
72        Self {
73            endpoint,
74            timeout: DEFAULT_TIMEOUT,
75        }
76    }
77
78    fn client_builder(&self) -> reqwest::ClientBuilder {
79        reqwest::Client::builder().timeout(self.timeout)
80    }
81
82    method!(
83        admin_bounce_v1,
84        POST,
85        "/api/admin/bounce/v1",
86        BounceV1Request,
87        BounceV1Response
88    );
89
90    method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92    method!(
93        admin_bounce_list_v1,
94        GET,
95        "/api/admin/bounce/v1",
96        Vec<BounceV1ListEntry>
97    );
98
99    method!(
100        admin_bounce_cancel_v1,
101        TEXT,
102        DELETE,
103        "/api/admin/bounce/v1",
104        BounceV1CancelRequest
105    );
106
107    method!(
108        admin_inspect_sched_q_v1,
109        GET,
110        "/api/admin/inspect-sched-q/v1",
111        InspectQueueV1Request,
112        InspectQueueV1Response
113    );
114
115    method!(
116        admin_inspect_message_v1,
117        GET,
118        "/api/admin/inspect-message/v1",
119        InspectMessageV1Request,
120        InspectMessageV1Response
121    );
122
123    method!(
124        admin_xfer_v1,
125        POST,
126        "/api/admin/xfer/v1",
127        XferV1Request,
128        XferV1Response
129    );
130
131    method!(
132        admin_suspend_list_v1,
133        GET,
134        "/api/admin/suspend/v1",
135        Vec<SuspendV1ListEntry>
136    );
137
138    method!(
139        admin_suspend_ready_q_list_v1,
140        GET,
141        "/api/admin/suspend-ready-q/v1",
142        Vec<SuspendReadyQueueV1ListEntry>
143    );
144
145    method!(
146        admin_xfer_cancel_v1,
147        POST,
148        "/api/admin/xfer/cancel/v1",
149        XferCancelV1Request,
150        XferCancelV1Response
151    );
152
153    method!(
154        admin_rebind_v1,
155        POST,
156        "/api/admin/rebind/v1",
157        RebindV1Request,
158        RebindV1Response
159    );
160
161    method!(
162        admin_suspend_ready_q_v1,
163        POST,
164        "/api/admin/suspend-ready-q/v1",
165        SuspendReadyQueueV1Request,
166        SuspendV1Response
167    );
168
169    method!(
170        admin_suspend_ready_q_cancel_v1,
171        TEXT,
172        DELETE,
173        "/api/admin/suspend-ready-q/v1",
174        SuspendV1CancelRequest
175    );
176
177    method!(
178        admin_suspend_v1,
179        POST,
180        "/api/admin/suspend/v1",
181        SuspendV1Request,
182        SuspendV1Response
183    );
184
185    method!(
186        admin_suspend_cancel_v1,
187        TEXT,
188        DELETE,
189        "/api/admin/suspend/v1",
190        SuspendV1CancelRequest
191    );
192
193    method!(
194        admin_ready_q_states_v1,
195        GET,
196        "/api/admin/ready-q-states/v1",
197        ReadyQueueStateRequest,
198        ReadyQueueStateResponse
199    );
200
201    method!(
202        admin_set_diagnostic_log_filter_v1,
203        TEXT,
204        POST,
205        "/api/admin/set_diagnostic_log_filter/v1",
206        SetDiagnosticFilterRequest
207    );
208
209    pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
210        &self,
211        method: reqwest::Method,
212        url: T,
213        body: &B,
214    ) -> anyhow::Result<String> {
215        let response = self
216            .client_builder()
217            .build()?
218            .request(method, url)
219            .json(body)
220            .send()
221            .await?;
222
223        let status = response.status();
224        let body_bytes = response.bytes().await.with_context(|| {
225            format!(
226                "request status {}: {}, and failed to read response body",
227                status.as_u16(),
228                status.canonical_reason().unwrap_or("")
229            )
230        })?;
231        let body_text = String::from_utf8_lossy(&body_bytes);
232        if !status.is_success() {
233            anyhow::bail!(
234                "request status {}: {}. Response body: {body_text}",
235                status.as_u16(),
236                status.canonical_reason().unwrap_or(""),
237            );
238        }
239
240        Ok(body_text.to_string())
241    }
242
243    pub async fn admin_bump_config_epoch(&self) -> anyhow::Result<String> {
244        self.request_with_text_response(
245            reqwest::Method::POST,
246            self.endpoint.join("/api/admin/bump-config-epoch")?,
247            &(),
248        )
249        .await
250    }
251
252    pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
253        &self,
254        method: reqwest::Method,
255        url: T,
256        body: &B,
257    ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
258        let response = self
259            .client_builder()
260            .build()?
261            .request(method, url)
262            .json(body)
263            .send()
264            .await?;
265
266        let status = response.status();
267        if !status.is_success() {
268            let body_bytes = response.bytes().await.with_context(|| {
269                format!(
270                    "request status {}: {}, and failed to read response body",
271                    status.as_u16(),
272                    status.canonical_reason().unwrap_or("")
273                )
274            })?;
275            let body_text = String::from_utf8_lossy(&body_bytes);
276            anyhow::bail!(
277                "request status {}: {}. Response body: {body_text}",
278                status.as_u16(),
279                status.canonical_reason().unwrap_or(""),
280            );
281        }
282
283        Ok(response.bytes_stream())
284    }
285
286    pub async fn request_with_json_response<
287        T: reqwest::IntoUrl,
288        B: serde::Serialize,
289        R: serde::de::DeserializeOwned,
290    >(
291        &self,
292        method: reqwest::Method,
293        url: T,
294        body: &B,
295    ) -> anyhow::Result<R> {
296        let response = self
297            .client_builder()
298            .build()?
299            .request(method, url)
300            .json(body)
301            .send()
302            .await?;
303
304        let status = response.status();
305        if !status.is_success() {
306            let body_bytes = response.bytes().await.with_context(|| {
307                format!(
308                    "request status {}: {}, and failed to read response body",
309                    status.as_u16(),
310                    status.canonical_reason().unwrap_or("")
311                )
312            })?;
313            anyhow::bail!(
314                "request status {}: {}. Response body: {}",
315                status.as_u16(),
316                status.canonical_reason().unwrap_or(""),
317                String::from_utf8_lossy(&body_bytes)
318            );
319        }
320        json_body(response).await.with_context(|| {
321            format!(
322                "request status {}: {}",
323                status.as_u16(),
324                status.canonical_reason().unwrap_or("")
325            )
326        })
327    }
328
329    pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
330        &self,
331        mut filter_map: F,
332    ) -> anyhow::Result<Vec<T>> {
333        let mut parser = kumo_prometheus::parser::Parser::new();
334        let mut stream = self
335            .request_with_streaming_text_response(
336                reqwest::Method::GET,
337                self.endpoint.join("/metrics")?,
338                &(),
339            )
340            .await?;
341
342        let mut result = vec![];
343        while let Some(item) = stream.next().await {
344            let bytes = item?;
345            parser.push_bytes(bytes, false, |m| {
346                if let Some(r) = (filter_map)(&m) {
347                    result.push(r);
348                }
349            })?;
350        }
351
352        Ok(result)
353    }
354}
355
356pub async fn json_body<T: serde::de::DeserializeOwned>(
357    response: reqwest::Response,
358) -> anyhow::Result<T> {
359    let data = response.bytes().await.context("ready response body")?;
360    serde_json::from_slice(&data).with_context(|| {
361        format!(
362            "parsing response as json: {}",
363            String::from_utf8_lossy(&data)
364        )
365    })
366}