kumo_prometheus/
registry.rs1use async_stream::stream;
2use futures::prelude::*;
3use futures::stream::BoxStream;
4use parking_lot::Mutex;
5use prometheus::proto::{Metric, MetricFamily};
6use std::sync::{Arc, LazyLock};
7
8pub trait StreamingCollector {
9 fn stream_text(&self, prefix: &Option<String>) -> BoxStream<String>;
11 fn stream_json(&self) -> BoxStream<String>;
13 fn prune(&self);
15}
16
17pub struct Registry {
19 collectors: Mutex<Arc<Vec<Arc<dyn StreamingCollector + Send + Sync>>>>,
20}
21
22impl Registry {
23 pub fn get() -> &'static Self {
26 static REG: LazyLock<Registry> = LazyLock::new(|| {
27 tokio::spawn(Registry::pruner());
28 Registry {
29 collectors: Mutex::new(Arc::new(vec![])),
30 }
31 });
32 ®
33 }
34
35 async fn pruner() {
37 loop {
38 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
39 let collectors = Self::get_collectors();
40 for c in collectors.iter() {
41 c.prune();
42 }
43 }
44 }
45
46 pub fn register(collector: Arc<dyn StreamingCollector + Send + Sync>) {
48 let reg = Self::get();
49 let mut collectors = reg.collectors.lock();
50 let mut new_set: Vec<_> = collectors.iter().map(Arc::clone).collect();
51 new_set.push(collector);
52 *collectors = Arc::new(new_set);
53 }
54
55 fn get_collectors() -> Arc<Vec<Arc<dyn StreamingCollector + Send + Sync>>> {
56 Self::get().collectors.lock().clone()
57 }
58
59 pub fn stream_text(prefix: Option<String>) -> BoxStream<'static, String> {
69 let collectors = Self::get_collectors();
70
71 stream! {
72 let mut metrics = prometheus::default_registry().gather();
73 if let Some(prefix) = &prefix {
74 metrics.iter_mut().for_each(|metric| {
75 let name = format!("{prefix}{}", metric.get_name());
76 metric.set_name(name);
77 });
78 }
79 if let Ok(report) = prometheus::TextEncoder::new().encode_to_string(&metrics) {
80 yield report;
81 }
82
83 for c in collectors.iter() {
84 let mut text_stream = c.stream_text(&prefix);
85 while let Some(chunk) = text_stream.next().await {
86 yield chunk;
87 }
88 }
89 }
90 .boxed()
91 }
92
93 pub fn stream_json() -> BoxStream<'static, String> {
100 let collectors = Self::get_collectors();
101
102 stream! {
103 let mut buf = "{".to_string();
104 let metrics = prometheus::default_registry().gather();
105 metrics_to_partial_json(&metrics, &mut buf);
106 yield buf;
107
108 for c in collectors.iter() {
109 let mut text_stream = c.stream_json();
110 while let Some(chunk) = text_stream.next().await {
111 yield chunk;
112 }
113 }
114
115 yield "}".to_string();
116 }
117 .boxed()
118 }
119}
120
121fn metrics_to_partial_json(metrics: &[MetricFamily], target: &mut String) {
127 use prometheus::proto::MetricType;
128
129 for (midx, mf) in metrics.iter().enumerate() {
130 if midx > 0 {
131 target.push(',');
132 }
133 let name = mf.get_name();
134 let help = mf.get_help();
135
136 target.push('"');
137 target.push_str(name);
138 target.push_str("\":{");
139 if !help.is_empty() {
140 target.push_str("\"help\":\"");
141 target.push_str(help);
142 target.push_str("\",");
143 }
144
145 let metric_type = mf.get_field_type();
146
147 target.push_str("\"type\":\"");
148 target.push_str(&format!("{metric_type:?}").to_lowercase());
149 target.push_str("\",\"value\":");
150
151 let metric_values = mf.get_metric();
152 if metric_values.is_empty() {
153 target.push_str("null}");
154 continue;
155 }
156
157 let first_label = metric_values[0].get_label();
158 if first_label.len() == 1 {
159 target.push_str("{\"");
160 target.push_str(first_label[0].get_name());
161 target.push_str("\":{");
162 } else if first_label.len() > 1 {
163 target.push('[');
164 }
165
166 for (i, metric) in metric_values.iter().enumerate() {
167 let label = metric.get_label();
168
169 if i > 0 {
170 target.push(',');
171 }
172
173 fn emit_value(metric_type: MetricType, metric: &Metric, target: &mut String) {
174 match metric_type {
175 MetricType::COUNTER | MetricType::GAUGE => {
176 let value = if metric_type == MetricType::COUNTER {
177 metric.get_counter().get_value()
178 } else {
179 metric.get_gauge().get_value()
180 };
181 target.push_str(&value.to_string());
182 }
183 MetricType::HISTOGRAM => {
184 let hist = metric.get_histogram();
185
186 let count = hist.get_sample_count();
187 let sum = hist.get_sample_sum();
188 let avg = if count != 0 { sum / count as f64 } else { 0. };
189
190 let mut bucket = vec![];
191 for b in hist.get_bucket() {
192 bucket.push(vec![b.get_upper_bound(), b.get_cumulative_count() as f64]);
193 }
194
195 let hist_value = serde_json::json!({
196 "count": count,
197 "sum": sum,
198 "avg": avg,
199 "bucket": bucket,
200 });
201
202 if let Ok(s) = serde_json::to_string(&hist_value) {
203 target.push_str(&s);
204 }
205 }
206 _ => {
207 target.push_str("null");
210 }
211 }
212 }
213
214 if label.is_empty() {
215 emit_value(metric_type, metric, target);
216 break;
217 }
218
219 if label.len() == 1 {
220 target.push('"');
221 target.push_str(label[0].get_value());
222 target.push_str("\":");
223 emit_value(metric_type, metric, target);
224 continue;
225 }
226
227 target.push('{');
228 for pair in label {
229 target.push('"');
230 target.push_str(pair.get_name());
231 target.push_str("\":\"");
232 target.push_str(pair.get_value());
233 target.push_str("\",");
234 }
235 target.push_str("\"@\":");
236 emit_value(metric_type, metric, target);
237 target.push('}');
238 }
239
240 if first_label.len() == 1 {
241 target.push_str("}}}");
242 } else if first_label.len() > 1 {
243 target.push_str("]}");
244 } else {
245 target.push('}');
246 }
247 }
248}
249
250#[cfg(test)]
251mod test {
252 use super::*;
253 use prometheus::proto::{Counter, LabelPair, Metric, MetricFamily, MetricType};
254
255 #[test]
256 fn test_json_encode_counter_no_help() {
257 let mut family = MetricFamily::new();
258 family.set_name("family_name".into());
259 family.set_field_type(MetricType::COUNTER);
260
261 let mut metric = Metric::new();
262 metric.set_counter(Counter::new());
263 family.set_metric(vec![metric].into());
264
265 let mut buf = "{".into();
266 metrics_to_partial_json(&[family], &mut buf);
267 buf.push('}');
268
269 println!("{buf}");
270 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
271 println!("{value:?}");
272 assert_eq!(
273 value,
274 serde_json::json!({
275 "family_name": {
276 "type": "counter",
277 "value": 0
278 }
279 })
280 );
281 }
282
283 #[test]
284 fn test_json_encode_histogram_one_label() {
285 use prometheus::core::Collector;
286 let hist = prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
287 "hist_name",
288 "hist_help",
289 ))
290 .unwrap();
291 let family = hist.collect();
292
293 let mut buf = "{".into();
294 metrics_to_partial_json(&family, &mut buf);
295 buf.push('}');
296
297 println!("{buf}");
298 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
299 println!("{value:?}");
300 assert_eq!(
301 value,
302 serde_json::json!({
303 "hist_name": {
304 "help": "hist_help",
305 "type": "histogram",
306 "value": {
307 "avg": 0.0,
308 "bucket": [
309 [0.005, 0.0],
310 [0.01, 0.0],
311 [0.025, 0.0],
312 [0.05, 0.0],
313 [0.1, 0.0],
314 [0.25, 0.0],
315 [0.5, 0.0],
316 [1.0, 0.0],
317 [2.5, 0.0],
318 [5.0, 0.0],
319 [10.0, 0.0]
320 ],
321 "count":0,
322 "sum":0.0
323 }}})
324 );
325 }
326
327 #[test]
328 fn test_json_encode_counter_one_label() {
329 let mut family = MetricFamily::new();
330 family.set_name("family_name".into());
331 family.set_field_type(MetricType::COUNTER);
332
333 let mut metric = Metric::new();
334 metric.set_counter(Counter::new());
335 let mut label = LabelPair::new();
336 label.set_name("label_name".into());
337 label.set_value("label_value".into());
338 metric.set_label(vec![label].into());
339 family.set_metric(vec![metric].into());
340
341 let mut buf = "{".into();
342 metrics_to_partial_json(&[family], &mut buf);
343 buf.push('}');
344
345 println!("{buf}");
346 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
347 println!("{value:?}");
348 assert_eq!(
349 value,
350 serde_json::json!({
351 "family_name": {
352 "type": "counter",
353 "value": {
354 "label_name": {
355 "label_value": 0
356 }
357 }
358 }
359 })
360 );
361 }
362
363 #[test]
364 fn test_json_encode_counter_one_label_two_values() {
365 let mut family = MetricFamily::new();
366 family.set_name("family_name".into());
367 family.set_field_type(MetricType::COUNTER);
368
369 let mut metric = Metric::new();
370 metric.set_counter(Counter::new());
371 let mut label = LabelPair::new();
372 label.set_name("label_name".into());
373 label.set_value("label_value".into());
374 metric.set_label(vec![label].into());
375
376 let mut metric2 = Metric::new();
377 metric2.set_counter(Counter::new());
378 let mut label2 = LabelPair::new();
379 label2.set_name("label_name".into());
380 label2.set_value("2nd".into());
381 metric2.set_label(vec![label2].into());
382
383 family.set_metric(vec![metric, metric2].into());
384
385 let mut buf = "{".into();
386 metrics_to_partial_json(&[family], &mut buf);
387 buf.push('}');
388
389 println!("{buf}");
390 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
391 println!("{value:?}");
392 assert_eq!(
393 value,
394 serde_json::json!({
395 "family_name": {
396 "type": "counter",
397 "value": {
398 "label_name": {
399 "label_value": 0,
400 "2nd": 0
401 }
402 }
403 }
404 })
405 );
406 }
407
408 #[test]
409 fn test_json_encode_counter_two_labels() {
410 let mut family = MetricFamily::new();
411 family.set_name("family_name".into());
412 family.set_field_type(MetricType::COUNTER);
413
414 let mut metric = Metric::new();
415 metric.set_counter(Counter::new());
416 let mut label1 = LabelPair::new();
417 label1.set_name("first_label_name".into());
418 label1.set_value("first_label_value".into());
419 let mut label2 = LabelPair::new();
420 label2.set_name("2nd_label_name".into());
421 label2.set_value("2nd_label_value".into());
422 metric.set_label(vec![label1, label2].into());
423 family.set_metric(vec![metric].into());
424
425 let mut buf = "{".into();
426 metrics_to_partial_json(&[family], &mut buf);
427 buf.push('}');
428
429 println!("{buf}");
430 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
431 println!("{value:?}");
432 assert_eq!(
433 value,
434 serde_json::json!({
435 "family_name": {
436 "type": "counter",
437 "value": [{
438 "first_label_name": "first_label_value",
439 "2nd_label_name": "2nd_label_value",
440 "@": 0
441 }
442 ]
443 }
444 })
445 );
446 }
447
448 #[test]
449 fn test_json_encode_counter_with_help() {
450 let mut family = MetricFamily::new();
451 family.set_name("family_name".into());
452 family.set_help("me".into());
453 family.set_field_type(MetricType::COUNTER);
454
455 let mut metric = Metric::new();
456 metric.set_counter(Counter::new());
457 family.set_metric(vec![metric].into());
458
459 let mut buf = "{".into();
460 metrics_to_partial_json(&[family], &mut buf);
461 buf.push('}');
462
463 println!("{buf}");
464 let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
465 println!("{value:?}");
466 assert_eq!(
467 value,
468 serde_json::json!({
469 "family_name": {
470 "type": "counter",
471 "help": "me",
472 "value": 0
473 }
474 })
475 );
476 }
477}