kumo_api_client/
lib.rs

1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14    endpoint: Url,
15    timeout: Duration,
16}
17
18macro_rules! method {
19    ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21            self.request_with_json_response(
22                reqwest::Method::POST,
23                self.endpoint.join($path)?,
24                params,
25            )
26            .await
27        }
28    };
29
30    ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32            self.request_with_text_response(
33                reqwest::Method::DELETE,
34                self.endpoint.join($path)?,
35                params,
36            )
37            .await
38        }
39    };
40
41    ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42        pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43            self.request_with_text_response(
44                reqwest::Method::POST,
45                self.endpoint.join($path)?,
46                params,
47            )
48            .await
49        }
50    };
51
52    ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53        pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54            let mut url = self.endpoint.join($path)?;
55            get_params.apply_to_url(&mut url);
56
57            self.request_with_json_response(reqwest::Method::GET, url, &())
58                .await
59        }
60    };
61
62    ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63        pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64            self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65                .await
66        }
67    };
68}
69
70impl KumoApiClient {
71    pub fn new(endpoint: Url) -> Self {
72        Self {
73            endpoint,
74            timeout: DEFAULT_TIMEOUT,
75        }
76    }
77
78    fn client_builder(&self) -> reqwest::ClientBuilder {
79        reqwest::Client::builder().timeout(self.timeout)
80    }
81
82    method!(
83        admin_bounce_v1,
84        POST,
85        "/api/admin/bounce/v1",
86        BounceV1Request,
87        BounceV1Response
88    );
89
90    method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92    method!(
93        admin_bounce_list_v1,
94        GET,
95        "/api/admin/bounce/v1",
96        Vec<BounceV1ListEntry>
97    );
98
99    method!(
100        admin_bounce_cancel_v1,
101        TEXT,
102        DELETE,
103        "/api/admin/bounce/v1",
104        BounceV1CancelRequest
105    );
106
107    method!(
108        admin_inspect_sched_q_v1,
109        GET,
110        "/api/admin/inspect-sched-q/v1",
111        InspectQueueV1Request,
112        InspectQueueV1Response
113    );
114
115    method!(
116        admin_inspect_message_v1,
117        GET,
118        "/api/admin/inspect-message/v1",
119        InspectMessageV1Request,
120        InspectMessageV1Response
121    );
122
123    method!(
124        admin_xfer_v1,
125        POST,
126        "/api/admin/xfer/v1",
127        XferV1Request,
128        XferV1Response
129    );
130
131    method!(
132        admin_suspend_list_v1,
133        GET,
134        "/api/admin/suspend/v1",
135        Vec<SuspendV1ListEntry>
136    );
137
138    method!(
139        admin_suspend_ready_q_list_v1,
140        GET,
141        "/api/admin/suspend-ready-q/v1",
142        Vec<SuspendReadyQueueV1ListEntry>
143    );
144
145    method!(
146        admin_xfer_cancel_v1,
147        POST,
148        "/api/admin/xfer/cancel/v1",
149        XferCancelV1Request,
150        XferCancelV1Response
151    );
152
153    method!(
154        admin_rebind_v1,
155        POST,
156        "/api/admin/rebind/v1",
157        RebindV1Request,
158        RebindV1Response
159    );
160
161    method!(
162        admin_suspend_ready_q_v1,
163        POST,
164        "/api/admin/suspend-ready-q/v1",
165        SuspendReadyQueueV1Request,
166        SuspendV1Response
167    );
168
169    method!(
170        admin_suspend_ready_q_cancel_v1,
171        TEXT,
172        DELETE,
173        "/api/admin/suspend-ready-q/v1",
174        SuspendV1CancelRequest
175    );
176
177    method!(
178        admin_suspend_v1,
179        POST,
180        "/api/admin/suspend/v1",
181        SuspendV1Request,
182        SuspendV1Response
183    );
184
185    method!(
186        admin_suspend_cancel_v1,
187        TEXT,
188        DELETE,
189        "/api/admin/suspend/v1",
190        SuspendV1CancelRequest
191    );
192
193    method!(
194        admin_ready_q_states_v1,
195        GET,
196        "/api/admin/ready-q-states/v1",
197        ReadyQueueStateRequest,
198        ReadyQueueStateResponse
199    );
200
201    method!(
202        admin_set_diagnostic_log_filter_v1,
203        TEXT,
204        POST,
205        "/api/admin/set_diagnostic_log_filter/v1",
206        SetDiagnosticFilterRequest
207    );
208
209    pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
210        &self,
211        method: reqwest::Method,
212        url: T,
213        body: &B,
214    ) -> anyhow::Result<String> {
215        let response = self
216            .client_builder()
217            .build()?
218            .request(method, url)
219            .json(body)
220            .send()
221            .await?;
222
223        let status = response.status();
224        let body_bytes = response.bytes().await.with_context(|| {
225            format!(
226                "request status {}: {}, and failed to read response body",
227                status.as_u16(),
228                status.canonical_reason().unwrap_or("")
229            )
230        })?;
231        let body_text = String::from_utf8_lossy(&body_bytes);
232        if !status.is_success() {
233            anyhow::bail!(
234                "request status {}: {}. Response body: {body_text}",
235                status.as_u16(),
236                status.canonical_reason().unwrap_or(""),
237            );
238        }
239
240        Ok(body_text.to_string())
241    }
242
243    pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
244        &self,
245        method: reqwest::Method,
246        url: T,
247        body: &B,
248    ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
249        let response = self
250            .client_builder()
251            .build()?
252            .request(method, url)
253            .json(body)
254            .send()
255            .await?;
256
257        let status = response.status();
258        if !status.is_success() {
259            let body_bytes = response.bytes().await.with_context(|| {
260                format!(
261                    "request status {}: {}, and failed to read response body",
262                    status.as_u16(),
263                    status.canonical_reason().unwrap_or("")
264                )
265            })?;
266            let body_text = String::from_utf8_lossy(&body_bytes);
267            anyhow::bail!(
268                "request status {}: {}. Response body: {body_text}",
269                status.as_u16(),
270                status.canonical_reason().unwrap_or(""),
271            );
272        }
273
274        Ok(response.bytes_stream())
275    }
276
277    pub async fn request_with_json_response<
278        T: reqwest::IntoUrl,
279        B: serde::Serialize,
280        R: serde::de::DeserializeOwned,
281    >(
282        &self,
283        method: reqwest::Method,
284        url: T,
285        body: &B,
286    ) -> anyhow::Result<R> {
287        let response = self
288            .client_builder()
289            .build()?
290            .request(method, url)
291            .json(body)
292            .send()
293            .await?;
294
295        let status = response.status();
296        if !status.is_success() {
297            let body_bytes = response.bytes().await.with_context(|| {
298                format!(
299                    "request status {}: {}, and failed to read response body",
300                    status.as_u16(),
301                    status.canonical_reason().unwrap_or("")
302                )
303            })?;
304            anyhow::bail!(
305                "request status {}: {}. Response body: {}",
306                status.as_u16(),
307                status.canonical_reason().unwrap_or(""),
308                String::from_utf8_lossy(&body_bytes)
309            );
310        }
311        json_body(response).await.with_context(|| {
312            format!(
313                "request status {}: {}",
314                status.as_u16(),
315                status.canonical_reason().unwrap_or("")
316            )
317        })
318    }
319
320    pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
321        &self,
322        mut filter_map: F,
323    ) -> anyhow::Result<Vec<T>> {
324        let mut parser = kumo_prometheus::parser::Parser::new();
325        let mut stream = self
326            .request_with_streaming_text_response(
327                reqwest::Method::GET,
328                self.endpoint.join("/metrics")?,
329                &(),
330            )
331            .await?;
332
333        let mut result = vec![];
334        while let Some(item) = stream.next().await {
335            let bytes = item?;
336            parser.push_bytes(bytes, false, |m| {
337                if let Some(r) = (filter_map)(&m) {
338                    result.push(r);
339                }
340            })?;
341        }
342
343        Ok(result)
344    }
345}
346
347pub async fn json_body<T: serde::de::DeserializeOwned>(
348    response: reqwest::Response,
349) -> anyhow::Result<T> {
350    let data = response.bytes().await.context("ready response body")?;
351    serde_json::from_slice(&data).with_context(|| {
352        format!(
353            "parsing response as json: {}",
354            String::from_utf8_lossy(&data)
355        )
356    })
357}