1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14 endpoint: Url,
15 timeout: Duration,
16}
17
18macro_rules! method {
19 ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21 self.request_with_json_response(
22 reqwest::Method::POST,
23 self.endpoint.join($path)?,
24 params,
25 )
26 .await
27 }
28 };
29
30 ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32 self.request_with_text_response(
33 reqwest::Method::DELETE,
34 self.endpoint.join($path)?,
35 params,
36 )
37 .await
38 }
39 };
40
41 ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43 self.request_with_text_response(
44 reqwest::Method::POST,
45 self.endpoint.join($path)?,
46 params,
47 )
48 .await
49 }
50 };
51
52 ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53 pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54 let mut url = self.endpoint.join($path)?;
55 get_params.apply_to_url(&mut url);
56
57 self.request_with_json_response(reqwest::Method::GET, url, &())
58 .await
59 }
60 };
61
62 ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63 pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64 self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65 .await
66 }
67 };
68}
69
70impl KumoApiClient {
71 pub fn new(endpoint: Url) -> Self {
72 Self {
73 endpoint,
74 timeout: DEFAULT_TIMEOUT,
75 }
76 }
77
78 fn client_builder(&self) -> reqwest::ClientBuilder {
79 reqwest::Client::builder().timeout(self.timeout)
80 }
81
82 method!(
83 admin_bounce_v1,
84 POST,
85 "/api/admin/bounce/v1",
86 BounceV1Request,
87 BounceV1Response
88 );
89
90 method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92 method!(
93 admin_bounce_list_v1,
94 GET,
95 "/api/admin/bounce/v1",
96 Vec<BounceV1ListEntry>
97 );
98
99 method!(
100 admin_bounce_cancel_v1,
101 TEXT,
102 DELETE,
103 "/api/admin/bounce/v1",
104 BounceV1CancelRequest
105 );
106
107 method!(
108 admin_inspect_sched_q_v1,
109 GET,
110 "/api/admin/inspect-sched-q/v1",
111 InspectQueueV1Request,
112 InspectQueueV1Response
113 );
114
115 method!(
116 admin_inspect_message_v1,
117 GET,
118 "/api/admin/inspect-message/v1",
119 InspectMessageV1Request,
120 InspectMessageV1Response
121 );
122
123 method!(
124 admin_xfer_v1,
125 POST,
126 "/api/admin/xfer/v1",
127 XferV1Request,
128 XferV1Response
129 );
130
131 method!(
132 admin_suspend_list_v1,
133 GET,
134 "/api/admin/suspend/v1",
135 Vec<SuspendV1ListEntry>
136 );
137
138 method!(
139 admin_suspend_ready_q_list_v1,
140 GET,
141 "/api/admin/suspend-ready-q/v1",
142 Vec<SuspendReadyQueueV1ListEntry>
143 );
144
145 method!(
146 admin_xfer_cancel_v1,
147 POST,
148 "/api/admin/xfer/cancel/v1",
149 XferCancelV1Request,
150 XferCancelV1Response
151 );
152
153 method!(
154 admin_rebind_v1,
155 POST,
156 "/api/admin/rebind/v1",
157 RebindV1Request,
158 RebindV1Response
159 );
160
161 method!(
162 admin_suspend_ready_q_v1,
163 POST,
164 "/api/admin/suspend-ready-q/v1",
165 SuspendReadyQueueV1Request,
166 SuspendV1Response
167 );
168
169 method!(
170 admin_suspend_ready_q_cancel_v1,
171 TEXT,
172 DELETE,
173 "/api/admin/suspend-ready-q/v1",
174 SuspendV1CancelRequest
175 );
176
177 method!(
178 admin_suspend_v1,
179 POST,
180 "/api/admin/suspend/v1",
181 SuspendV1Request,
182 SuspendV1Response
183 );
184
185 method!(
186 admin_suspend_cancel_v1,
187 TEXT,
188 DELETE,
189 "/api/admin/suspend/v1",
190 SuspendV1CancelRequest
191 );
192
193 method!(
194 admin_ready_q_states_v1,
195 GET,
196 "/api/admin/ready-q-states/v1",
197 ReadyQueueStateRequest,
198 ReadyQueueStateResponse
199 );
200
201 method!(
202 admin_set_diagnostic_log_filter_v1,
203 TEXT,
204 POST,
205 "/api/admin/set_diagnostic_log_filter/v1",
206 SetDiagnosticFilterRequest
207 );
208
209 pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
210 &self,
211 method: reqwest::Method,
212 url: T,
213 body: &B,
214 ) -> anyhow::Result<String> {
215 let response = self
216 .client_builder()
217 .build()?
218 .request(method, url)
219 .json(body)
220 .send()
221 .await?;
222
223 let status = response.status();
224 let body_bytes = response.bytes().await.with_context(|| {
225 format!(
226 "request status {}: {}, and failed to read response body",
227 status.as_u16(),
228 status.canonical_reason().unwrap_or("")
229 )
230 })?;
231 let body_text = String::from_utf8_lossy(&body_bytes);
232 if !status.is_success() {
233 anyhow::bail!(
234 "request status {}: {}. Response body: {body_text}",
235 status.as_u16(),
236 status.canonical_reason().unwrap_or(""),
237 );
238 }
239
240 Ok(body_text.to_string())
241 }
242
243 pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
244 &self,
245 method: reqwest::Method,
246 url: T,
247 body: &B,
248 ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
249 let response = self
250 .client_builder()
251 .build()?
252 .request(method, url)
253 .json(body)
254 .send()
255 .await?;
256
257 let status = response.status();
258 if !status.is_success() {
259 let body_bytes = response.bytes().await.with_context(|| {
260 format!(
261 "request status {}: {}, and failed to read response body",
262 status.as_u16(),
263 status.canonical_reason().unwrap_or("")
264 )
265 })?;
266 let body_text = String::from_utf8_lossy(&body_bytes);
267 anyhow::bail!(
268 "request status {}: {}. Response body: {body_text}",
269 status.as_u16(),
270 status.canonical_reason().unwrap_or(""),
271 );
272 }
273
274 Ok(response.bytes_stream())
275 }
276
277 pub async fn request_with_json_response<
278 T: reqwest::IntoUrl,
279 B: serde::Serialize,
280 R: serde::de::DeserializeOwned,
281 >(
282 &self,
283 method: reqwest::Method,
284 url: T,
285 body: &B,
286 ) -> anyhow::Result<R> {
287 let response = self
288 .client_builder()
289 .build()?
290 .request(method, url)
291 .json(body)
292 .send()
293 .await?;
294
295 let status = response.status();
296 if !status.is_success() {
297 let body_bytes = response.bytes().await.with_context(|| {
298 format!(
299 "request status {}: {}, and failed to read response body",
300 status.as_u16(),
301 status.canonical_reason().unwrap_or("")
302 )
303 })?;
304 anyhow::bail!(
305 "request status {}: {}. Response body: {}",
306 status.as_u16(),
307 status.canonical_reason().unwrap_or(""),
308 String::from_utf8_lossy(&body_bytes)
309 );
310 }
311 json_body(response).await.with_context(|| {
312 format!(
313 "request status {}: {}",
314 status.as_u16(),
315 status.canonical_reason().unwrap_or("")
316 )
317 })
318 }
319
320 pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
321 &self,
322 mut filter_map: F,
323 ) -> anyhow::Result<Vec<T>> {
324 let mut parser = kumo_prometheus::parser::Parser::new();
325 let mut stream = self
326 .request_with_streaming_text_response(
327 reqwest::Method::GET,
328 self.endpoint.join("/metrics")?,
329 &(),
330 )
331 .await?;
332
333 let mut result = vec![];
334 while let Some(item) = stream.next().await {
335 let bytes = item?;
336 parser.push_bytes(bytes, false, |m| {
337 if let Some(r) = (filter_map)(&m) {
338 result.push(r);
339 }
340 })?;
341 }
342
343 Ok(result)
344 }
345}
346
347pub async fn json_body<T: serde::de::DeserializeOwned>(
348 response: reqwest::Response,
349) -> anyhow::Result<T> {
350 let data = response.bytes().await.context("ready response body")?;
351 serde_json::from_slice(&data).with_context(|| {
352 format!(
353 "parsing response as json: {}",
354 String::from_utf8_lossy(&data)
355 )
356 })
357}