1use anyhow::Context;
2use futures::{Stream, StreamExt};
3use kumo_api_types::rebind::{RebindV1Request, RebindV1Response};
4use kumo_api_types::xfer::*;
5use kumo_api_types::*;
6use kumo_prometheus::parser::Metric;
7use std::time::Duration;
8
9pub use reqwest::Url;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
12
13pub struct KumoApiClient {
14 endpoint: Url,
15 timeout: Duration,
16}
17
18macro_rules! method {
19 ($func_name:ident, POST, $path:literal, $request_ty:ty, $response_ty:ty) => {
20 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<$response_ty> {
21 self.request_with_json_response(
22 reqwest::Method::POST,
23 self.endpoint.join($path)?,
24 params,
25 )
26 .await
27 }
28 };
29
30 ($func_name:ident, TEXT, DELETE, $path:literal, $request_ty:ty) => {
31 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
32 self.request_with_text_response(
33 reqwest::Method::DELETE,
34 self.endpoint.join($path)?,
35 params,
36 )
37 .await
38 }
39 };
40
41 ($func_name:ident, TEXT, POST, $path:literal, $request_ty:ty) => {
42 pub async fn $func_name(&self, params: &$request_ty) -> anyhow::Result<String> {
43 self.request_with_text_response(
44 reqwest::Method::POST,
45 self.endpoint.join($path)?,
46 params,
47 )
48 .await
49 }
50 };
51
52 ($func_name:ident, GET, $path:literal, $request_ty:ty, $response_ty:ty) => {
53 pub async fn $func_name(&self, get_params: &$request_ty) -> anyhow::Result<$response_ty> {
54 let mut url = self.endpoint.join($path)?;
55 get_params.apply_to_url(&mut url);
56
57 self.request_with_json_response(reqwest::Method::GET, url, &())
58 .await
59 }
60 };
61
62 ($func_name:ident, GET, $path:literal, $response_ty:ty) => {
63 pub async fn $func_name(&self) -> anyhow::Result<$response_ty> {
64 self.request_with_json_response(reqwest::Method::GET, self.endpoint.join($path)?, &())
65 .await
66 }
67 };
68}
69
70impl KumoApiClient {
71 pub fn new(endpoint: Url) -> Self {
72 Self {
73 endpoint,
74 timeout: DEFAULT_TIMEOUT,
75 }
76 }
77
78 fn client_builder(&self) -> reqwest::ClientBuilder {
79 reqwest::Client::builder().timeout(self.timeout)
80 }
81
82 method!(
83 admin_bounce_v1,
84 POST,
85 "/api/admin/bounce/v1",
86 BounceV1Request,
87 BounceV1Response
88 );
89
90 method!(machine_info, GET, "/api/machine-info", MachineInfoV1);
91
92 method!(
93 admin_bounce_list_v1,
94 GET,
95 "/api/admin/bounce/v1",
96 Vec<BounceV1ListEntry>
97 );
98
99 method!(
100 admin_bounce_cancel_v1,
101 TEXT,
102 DELETE,
103 "/api/admin/bounce/v1",
104 BounceV1CancelRequest
105 );
106
107 method!(
108 admin_inspect_sched_q_v1,
109 GET,
110 "/api/admin/inspect-sched-q/v1",
111 InspectQueueV1Request,
112 InspectQueueV1Response
113 );
114
115 method!(
116 admin_inspect_message_v1,
117 GET,
118 "/api/admin/inspect-message/v1",
119 InspectMessageV1Request,
120 InspectMessageV1Response
121 );
122
123 method!(
124 admin_xfer_v1,
125 POST,
126 "/api/admin/xfer/v1",
127 XferV1Request,
128 XferV1Response
129 );
130
131 method!(
132 admin_suspend_list_v1,
133 GET,
134 "/api/admin/suspend/v1",
135 Vec<SuspendV1ListEntry>
136 );
137
138 method!(
139 admin_suspend_ready_q_list_v1,
140 GET,
141 "/api/admin/suspend-ready-q/v1",
142 Vec<SuspendReadyQueueV1ListEntry>
143 );
144
145 method!(
146 admin_xfer_cancel_v1,
147 POST,
148 "/api/admin/xfer/cancel/v1",
149 XferCancelV1Request,
150 XferCancelV1Response
151 );
152
153 method!(
154 admin_rebind_v1,
155 POST,
156 "/api/admin/rebind/v1",
157 RebindV1Request,
158 RebindV1Response
159 );
160
161 method!(
162 admin_suspend_ready_q_v1,
163 POST,
164 "/api/admin/suspend-ready-q/v1",
165 SuspendReadyQueueV1Request,
166 SuspendV1Response
167 );
168
169 method!(
170 admin_suspend_ready_q_cancel_v1,
171 TEXT,
172 DELETE,
173 "/api/admin/suspend-ready-q/v1",
174 SuspendV1CancelRequest
175 );
176
177 method!(
178 admin_suspend_v1,
179 POST,
180 "/api/admin/suspend/v1",
181 SuspendV1Request,
182 SuspendV1Response
183 );
184
185 method!(
186 admin_suspend_cancel_v1,
187 TEXT,
188 DELETE,
189 "/api/admin/suspend/v1",
190 SuspendV1CancelRequest
191 );
192
193 method!(
194 admin_ready_q_states_v1,
195 GET,
196 "/api/admin/ready-q-states/v1",
197 ReadyQueueStateRequest,
198 ReadyQueueStateResponse
199 );
200
201 method!(
202 admin_set_diagnostic_log_filter_v1,
203 TEXT,
204 POST,
205 "/api/admin/set_diagnostic_log_filter/v1",
206 SetDiagnosticFilterRequest
207 );
208
209 pub async fn request_with_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
210 &self,
211 method: reqwest::Method,
212 url: T,
213 body: &B,
214 ) -> anyhow::Result<String> {
215 let response = self
216 .client_builder()
217 .build()?
218 .request(method, url)
219 .json(body)
220 .send()
221 .await?;
222
223 let status = response.status();
224 let body_bytes = response.bytes().await.with_context(|| {
225 format!(
226 "request status {}: {}, and failed to read response body",
227 status.as_u16(),
228 status.canonical_reason().unwrap_or("")
229 )
230 })?;
231 let body_text = String::from_utf8_lossy(&body_bytes);
232 if !status.is_success() {
233 anyhow::bail!(
234 "request status {}: {}. Response body: {body_text}",
235 status.as_u16(),
236 status.canonical_reason().unwrap_or(""),
237 );
238 }
239
240 Ok(body_text.to_string())
241 }
242
243 pub async fn admin_bump_config_epoch(&self) -> anyhow::Result<String> {
244 self.request_with_text_response(
245 reqwest::Method::POST,
246 self.endpoint.join("/api/admin/bump-config-epoch")?,
247 &(),
248 )
249 .await
250 }
251
252 pub async fn request_with_streaming_text_response<T: reqwest::IntoUrl, B: serde::Serialize>(
253 &self,
254 method: reqwest::Method,
255 url: T,
256 body: &B,
257 ) -> anyhow::Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
258 let response = self
259 .client_builder()
260 .build()?
261 .request(method, url)
262 .json(body)
263 .send()
264 .await?;
265
266 let status = response.status();
267 if !status.is_success() {
268 let body_bytes = response.bytes().await.with_context(|| {
269 format!(
270 "request status {}: {}, and failed to read response body",
271 status.as_u16(),
272 status.canonical_reason().unwrap_or("")
273 )
274 })?;
275 let body_text = String::from_utf8_lossy(&body_bytes);
276 anyhow::bail!(
277 "request status {}: {}. Response body: {body_text}",
278 status.as_u16(),
279 status.canonical_reason().unwrap_or(""),
280 );
281 }
282
283 Ok(response.bytes_stream())
284 }
285
286 pub async fn request_with_json_response<
287 T: reqwest::IntoUrl,
288 B: serde::Serialize,
289 R: serde::de::DeserializeOwned,
290 >(
291 &self,
292 method: reqwest::Method,
293 url: T,
294 body: &B,
295 ) -> anyhow::Result<R> {
296 let response = self
297 .client_builder()
298 .build()?
299 .request(method, url)
300 .json(body)
301 .send()
302 .await?;
303
304 let status = response.status();
305 if !status.is_success() {
306 let body_bytes = response.bytes().await.with_context(|| {
307 format!(
308 "request status {}: {}, and failed to read response body",
309 status.as_u16(),
310 status.canonical_reason().unwrap_or("")
311 )
312 })?;
313 anyhow::bail!(
314 "request status {}: {}. Response body: {}",
315 status.as_u16(),
316 status.canonical_reason().unwrap_or(""),
317 String::from_utf8_lossy(&body_bytes)
318 );
319 }
320 json_body(response).await.with_context(|| {
321 format!(
322 "request status {}: {}",
323 status.as_u16(),
324 status.canonical_reason().unwrap_or("")
325 )
326 })
327 }
328
329 pub async fn get_metrics<T, F: FnMut(&Metric) -> Option<T>>(
330 &self,
331 mut filter_map: F,
332 ) -> anyhow::Result<Vec<T>> {
333 let mut parser = kumo_prometheus::parser::Parser::new();
334 let mut stream = self
335 .request_with_streaming_text_response(
336 reqwest::Method::GET,
337 self.endpoint.join("/metrics")?,
338 &(),
339 )
340 .await?;
341
342 let mut result = vec![];
343 while let Some(item) = stream.next().await {
344 let bytes = item?;
345 parser.push_bytes(bytes, false, |m| {
346 if let Some(r) = (filter_map)(&m) {
347 result.push(r);
348 }
349 })?;
350 }
351
352 Ok(result)
353 }
354}
355
356pub async fn json_body<T: serde::de::DeserializeOwned>(
357 response: reqwest::Response,
358) -> anyhow::Result<T> {
359 let data = response.bytes().await.context("ready response body")?;
360 serde_json::from_slice(&data).with_context(|| {
361 format!(
362 "parsing response as json: {}",
363 String::from_utf8_lossy(&data)
364 )
365 })
366}