kumo_prometheus/
registry.rs

1use async_stream::stream;
2use futures::prelude::*;
3use futures::stream::BoxStream;
4use parking_lot::Mutex;
5use prometheus::proto::{Metric, MetricFamily};
6use std::sync::{Arc, LazyLock};
7
8pub trait StreamingCollector {
9    /// Stream chunks of text in prometheus text exposition format
10    fn stream_text(&self, prefix: &Option<String>) -> BoxStream<String>;
11    /// Stream chunks in our json format, as chunks of text
12    fn stream_json(&self) -> BoxStream<String>;
13    /// Prune any stale entries from this collector
14    fn prune(&self);
15}
16
17/// Keeps track of all streaming collector instances
18pub struct Registry {
19    collectors: Mutex<Arc<Vec<Arc<dyn StreamingCollector + Send + Sync>>>>,
20}
21
22impl Registry {
23    /// Get the Registry singleton, and spawn the pruning task if it
24    /// hasn't already been launched.
25    pub fn get() -> &'static Self {
26        static REG: LazyLock<Registry> = LazyLock::new(|| {
27            tokio::spawn(Registry::pruner());
28            Registry {
29                collectors: Mutex::new(Arc::new(vec![])),
30            }
31        });
32        &REG
33    }
34
35    /// Periodically maintain the hashmaps, removing any pruned entries
36    async fn pruner() {
37        loop {
38            tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
39            let collectors = Self::get_collectors();
40            for c in collectors.iter() {
41                c.prune();
42            }
43        }
44    }
45
46    /// Register a new collector
47    pub fn register(collector: Arc<dyn StreamingCollector + Send + Sync>) {
48        let reg = Self::get();
49        let mut collectors = reg.collectors.lock();
50        let mut new_set: Vec<_> = collectors.iter().map(Arc::clone).collect();
51        new_set.push(collector);
52        *collectors = Arc::new(new_set);
53    }
54
55    fn get_collectors() -> Arc<Vec<Arc<dyn StreamingCollector + Send + Sync>>> {
56        Self::get().collectors.lock().clone()
57    }
58
59    /// Produce a stream of String chunks that represent all known metrics
60    /// in the Prometheus exposition format.
61    ///
62    /// This will include the MetricFamily's that have been registered with
63    /// the prometheus crate and then supplement the output with our own
64    /// set of registered streaming collectors.
65    ///
66    /// The optional prefix parameter is used to "namespace" the returned
67    /// metric names.
68    pub fn stream_text(prefix: Option<String>) -> BoxStream<'static, String> {
69        let collectors = Self::get_collectors();
70
71        stream! {
72            let mut metrics = prometheus::default_registry().gather();
73            if let Some(prefix) = &prefix {
74                metrics.iter_mut().for_each(|metric| {
75                    let name = format!("{prefix}{}", metric.get_name());
76                    metric.set_name(name);
77                });
78            }
79            if let Ok(report) = prometheus::TextEncoder::new().encode_to_string(&metrics) {
80                yield report;
81            }
82
83            for c in collectors.iter() {
84                let mut text_stream = c.stream_text(&prefix);
85                while let Some(chunk) = text_stream.next().await {
86                    yield chunk;
87                }
88            }
89        }
90        .boxed()
91    }
92
93    /// Produce a stream of String chunks that represent all known metrics
94    /// in the informal kumomta json format.
95    ///
96    /// This will include the MetricFamily's that have been registered with
97    /// the prometheus crate and then supplement the output with our own
98    /// set of registered streaming collectors.
99    pub fn stream_json() -> BoxStream<'static, String> {
100        let collectors = Self::get_collectors();
101
102        stream! {
103            let mut buf = "{".to_string();
104            let metrics = prometheus::default_registry().gather();
105            metrics_to_partial_json(&metrics, &mut buf);
106            yield buf;
107
108            for c in collectors.iter() {
109                let mut text_stream = c.stream_json();
110                while let Some(chunk) = text_stream.next().await {
111                    yield chunk;
112                }
113            }
114
115            yield "}".to_string();
116        }
117        .boxed()
118    }
119}
120
121/// This function emits prometheus crate MetricFamily's into the informal
122/// kumomta json representation.  The json text fragment is emitted into
123/// the provided target String.
124/// The result is not a complete JSON document, it is just the keys and values
125/// produced from the provided list of metrics.
126fn metrics_to_partial_json(metrics: &[MetricFamily], target: &mut String) {
127    use prometheus::proto::MetricType;
128
129    for (midx, mf) in metrics.iter().enumerate() {
130        if midx > 0 {
131            target.push(',');
132        }
133        let name = mf.get_name();
134        let help = mf.get_help();
135
136        target.push('"');
137        target.push_str(name);
138        target.push_str("\":{");
139        if !help.is_empty() {
140            target.push_str("\"help\":\"");
141            target.push_str(help);
142            target.push_str("\",");
143        }
144
145        let metric_type = mf.get_field_type();
146
147        target.push_str("\"type\":\"");
148        target.push_str(&format!("{metric_type:?}").to_lowercase());
149        target.push_str("\",\"value\":");
150
151        let metric_values = mf.get_metric();
152        if metric_values.is_empty() {
153            target.push_str("null}");
154            continue;
155        }
156
157        let first_label = metric_values[0].get_label();
158        if first_label.len() == 1 {
159            target.push_str("{\"");
160            target.push_str(first_label[0].get_name());
161            target.push_str("\":{");
162        } else if first_label.len() > 1 {
163            target.push('[');
164        }
165
166        for (i, metric) in metric_values.iter().enumerate() {
167            let label = metric.get_label();
168
169            if i > 0 {
170                target.push(',');
171            }
172
173            fn emit_value(metric_type: MetricType, metric: &Metric, target: &mut String) {
174                match metric_type {
175                    MetricType::COUNTER | MetricType::GAUGE => {
176                        let value = if metric_type == MetricType::COUNTER {
177                            metric.get_counter().get_value()
178                        } else {
179                            metric.get_gauge().get_value()
180                        };
181                        target.push_str(&value.to_string());
182                    }
183                    MetricType::HISTOGRAM => {
184                        let hist = metric.get_histogram();
185
186                        let count = hist.get_sample_count();
187                        let sum = hist.get_sample_sum();
188                        let avg = if count != 0 { sum / count as f64 } else { 0. };
189
190                        let mut bucket = vec![];
191                        for b in hist.get_bucket() {
192                            bucket.push(vec![b.get_upper_bound(), b.get_cumulative_count() as f64]);
193                        }
194
195                        let hist_value = serde_json::json!({
196                            "count": count,
197                            "sum": sum,
198                            "avg": avg,
199                            "bucket": bucket,
200                        });
201
202                        if let Ok(s) = serde_json::to_string(&hist_value) {
203                            target.push_str(&s);
204                        }
205                    }
206                    _ => {
207                        // Other types are currently not implemented
208                        // as we don't currently export any other type
209                        target.push_str("null");
210                    }
211                }
212            }
213
214            if label.is_empty() {
215                emit_value(metric_type, metric, target);
216                break;
217            }
218
219            if label.len() == 1 {
220                target.push('"');
221                target.push_str(label[0].get_value());
222                target.push_str("\":");
223                emit_value(metric_type, metric, target);
224                continue;
225            }
226
227            target.push('{');
228            for pair in label {
229                target.push('"');
230                target.push_str(pair.get_name());
231                target.push_str("\":\"");
232                target.push_str(pair.get_value());
233                target.push_str("\",");
234            }
235            target.push_str("\"@\":");
236            emit_value(metric_type, metric, target);
237            target.push('}');
238        }
239
240        if first_label.len() == 1 {
241            target.push_str("}}}");
242        } else if first_label.len() > 1 {
243            target.push_str("]}");
244        } else {
245            target.push('}');
246        }
247    }
248}
249
250#[cfg(test)]
251mod test {
252    use super::*;
253    use prometheus::proto::{Counter, LabelPair, Metric, MetricFamily, MetricType};
254
255    #[test]
256    fn test_json_encode_counter_no_help() {
257        let mut family = MetricFamily::new();
258        family.set_name("family_name".into());
259        family.set_field_type(MetricType::COUNTER);
260
261        let mut metric = Metric::new();
262        metric.set_counter(Counter::new());
263        family.set_metric(vec![metric].into());
264
265        let mut buf = "{".into();
266        metrics_to_partial_json(&[family], &mut buf);
267        buf.push('}');
268
269        println!("{buf}");
270        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
271        println!("{value:?}");
272        assert_eq!(
273            value,
274            serde_json::json!({
275                "family_name": {
276                    "type": "counter",
277                    "value": 0
278                }
279            })
280        );
281    }
282
283    #[test]
284    fn test_json_encode_histogram_one_label() {
285        use prometheus::core::Collector;
286        let hist = prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
287            "hist_name",
288            "hist_help",
289        ))
290        .unwrap();
291        let family = hist.collect();
292
293        let mut buf = "{".into();
294        metrics_to_partial_json(&family, &mut buf);
295        buf.push('}');
296
297        println!("{buf}");
298        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
299        println!("{value:?}");
300        assert_eq!(
301            value,
302            serde_json::json!({
303            "hist_name": {
304                "help": "hist_help",
305                "type": "histogram",
306                "value": {
307                    "avg": 0.0,
308                    "bucket": [
309                        [0.005, 0.0],
310                        [0.01, 0.0],
311                        [0.025, 0.0],
312                        [0.05, 0.0],
313                        [0.1, 0.0],
314                        [0.25, 0.0],
315                        [0.5, 0.0],
316                        [1.0, 0.0],
317                        [2.5, 0.0],
318                        [5.0, 0.0],
319                        [10.0, 0.0]
320                    ],
321                    "count":0,
322                    "sum":0.0
323                }}})
324        );
325    }
326
327    #[test]
328    fn test_json_encode_counter_one_label() {
329        let mut family = MetricFamily::new();
330        family.set_name("family_name".into());
331        family.set_field_type(MetricType::COUNTER);
332
333        let mut metric = Metric::new();
334        metric.set_counter(Counter::new());
335        let mut label = LabelPair::new();
336        label.set_name("label_name".into());
337        label.set_value("label_value".into());
338        metric.set_label(vec![label].into());
339        family.set_metric(vec![metric].into());
340
341        let mut buf = "{".into();
342        metrics_to_partial_json(&[family], &mut buf);
343        buf.push('}');
344
345        println!("{buf}");
346        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
347        println!("{value:?}");
348        assert_eq!(
349            value,
350            serde_json::json!({
351                "family_name": {
352                    "type": "counter",
353                    "value": {
354                        "label_name": {
355                            "label_value": 0
356                        }
357                    }
358                }
359            })
360        );
361    }
362
363    #[test]
364    fn test_json_encode_counter_one_label_two_values() {
365        let mut family = MetricFamily::new();
366        family.set_name("family_name".into());
367        family.set_field_type(MetricType::COUNTER);
368
369        let mut metric = Metric::new();
370        metric.set_counter(Counter::new());
371        let mut label = LabelPair::new();
372        label.set_name("label_name".into());
373        label.set_value("label_value".into());
374        metric.set_label(vec![label].into());
375
376        let mut metric2 = Metric::new();
377        metric2.set_counter(Counter::new());
378        let mut label2 = LabelPair::new();
379        label2.set_name("label_name".into());
380        label2.set_value("2nd".into());
381        metric2.set_label(vec![label2].into());
382
383        family.set_metric(vec![metric, metric2].into());
384
385        let mut buf = "{".into();
386        metrics_to_partial_json(&[family], &mut buf);
387        buf.push('}');
388
389        println!("{buf}");
390        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
391        println!("{value:?}");
392        assert_eq!(
393            value,
394            serde_json::json!({
395                "family_name": {
396                    "type": "counter",
397                    "value": {
398                        "label_name": {
399                            "label_value": 0,
400                            "2nd": 0
401                        }
402                    }
403                }
404            })
405        );
406    }
407
408    #[test]
409    fn test_json_encode_counter_two_labels() {
410        let mut family = MetricFamily::new();
411        family.set_name("family_name".into());
412        family.set_field_type(MetricType::COUNTER);
413
414        let mut metric = Metric::new();
415        metric.set_counter(Counter::new());
416        let mut label1 = LabelPair::new();
417        label1.set_name("first_label_name".into());
418        label1.set_value("first_label_value".into());
419        let mut label2 = LabelPair::new();
420        label2.set_name("2nd_label_name".into());
421        label2.set_value("2nd_label_value".into());
422        metric.set_label(vec![label1, label2].into());
423        family.set_metric(vec![metric].into());
424
425        let mut buf = "{".into();
426        metrics_to_partial_json(&[family], &mut buf);
427        buf.push('}');
428
429        println!("{buf}");
430        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
431        println!("{value:?}");
432        assert_eq!(
433            value,
434            serde_json::json!({
435                "family_name": {
436                    "type": "counter",
437                    "value": [{
438                        "first_label_name": "first_label_value",
439                        "2nd_label_name": "2nd_label_value",
440                        "@": 0
441                        }
442                    ]
443                }
444            })
445        );
446    }
447
448    #[test]
449    fn test_json_encode_counter_with_help() {
450        let mut family = MetricFamily::new();
451        family.set_name("family_name".into());
452        family.set_help("me".into());
453        family.set_field_type(MetricType::COUNTER);
454
455        let mut metric = Metric::new();
456        metric.set_counter(Counter::new());
457        family.set_metric(vec![metric].into());
458
459        let mut buf = "{".into();
460        metrics_to_partial_json(&[family], &mut buf);
461        buf.push('}');
462
463        println!("{buf}");
464        let value: serde_json::Value = serde_json::from_str(&buf).unwrap();
465        println!("{value:?}");
466        assert_eq!(
467            value,
468            serde_json::json!({
469                "family_name": {
470                    "type": "counter",
471                    "help": "me",
472                    "value": 0
473                }
474            })
475        );
476    }
477}