kumo_prometheus/
lib.rs

1pub use crate::counter::*;
2use crate::labels::MetricLabel;
3use crate::registry::StreamingCollector;
4use async_stream::stream;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use parking_lot::{RwLock, RwLockUpgradableReadGuard};
8pub use pastey as paste;
9use serde::Serialize;
10use std::borrow::Borrow;
11use std::collections::HashMap;
12use std::hash::Hash;
13use std::sync::{Arc, LazyLock};
14
15mod counter;
16pub mod counter_bundle;
17
18#[macro_use]
19pub mod labels;
20pub mod parser;
21pub mod registry;
22pub use prometheus;
23
24struct CounterRegistryInner<K, V: AtomicCounterEntry> {
25    map: RwLock<HashMap<K, V>>,
26    name: &'static str,
27    help: String,
28    metric_type: MetricType,
29}
30
31/// Keep up to 4k at a time of pending text or json data
32/// when streaming out the serialized counter values
33const CHUNK_SIZE: usize = 4 * 1024;
34
35impl<K: Clone + MetricLabel + Send + Sync, V: AtomicCounterEntry> StreamingCollector
36    for CounterRegistryInner<K, V>
37{
38    fn stream_text(&'_ self, prefix: &Option<String>) -> BoxStream<'_, String> {
39        /*
40        # HELP tokio_total_overflow_count The number of times worker threads saturated their local queues.
41        # TYPE tokio_total_overflow_count counter
42        tokio_total_overflow_count 0
43        total_connection_count{service="smtp_client:source2->loopback.dummy-mx.wezfurlong.org@smtp_client"} 25
44        */
45
46        let mut buffer = String::with_capacity(CHUNK_SIZE);
47        buffer.push_str("# HELP ");
48        let prefix = prefix.as_deref().unwrap_or("").to_string();
49        buffer.push_str(&prefix);
50        buffer.push_str(self.name);
51        buffer.push(' ');
52        buffer.push_str(&self.help);
53        buffer.push_str("\n# TYPE ");
54        buffer.push_str(&prefix);
55        buffer.push_str(self.name);
56        buffer.push(' ');
57        buffer.push_str(self.metric_type.label());
58        buffer.push('\n');
59
60        let mut buffer = Some(buffer);
61
62        let counters = {
63            let map = self.map.read();
64            let mut pairs = Vec::with_capacity(map.len());
65            for (key, weak) in map.iter() {
66                if let Some(strong) = weak.resolve() {
67                    pairs.push((key.clone(), strong));
68                }
69            }
70            pairs
71        };
72
73        stream! {
74            for (key, counter) in counters {
75                let Some(buf) = buffer.as_mut() else {break;};
76
77                buf.push_str(&prefix);
78                buf.push_str(self.name);
79                key.emit_text_value(buf, &counter.get().to_string());
80                buf.push('\n');
81
82                let need_flush = buf.len() >= CHUNK_SIZE;
83
84                if need_flush {
85                    yield buffer.take().expect("always have buffer");
86                    buffer.replace(String::with_capacity(CHUNK_SIZE));
87                }
88            }
89
90            if let Some(buf) = buffer.take() {
91                if !buf.is_empty(){
92                    yield buf;
93                }
94            }
95
96        }
97        .boxed()
98    }
99
100    fn stream_json(&'_ self) -> BoxStream<'_, String> {
101        let mut target = String::with_capacity(CHUNK_SIZE);
102        target.push_str(",\n\"");
103        target.push_str(self.name);
104        target.push_str("\":{");
105        if !self.help.is_empty() {
106            target.push_str("\"help\":\"");
107            target.push_str(&self.help);
108            target.push_str("\",");
109        }
110        target.push_str("\"type\":\"");
111        target.push_str(self.metric_type.label());
112        target.push_str("\",\"value\":");
113
114        let counters = {
115            let map = self.map.read();
116            let mut pairs = Vec::with_capacity(map.len());
117            for (key, weak) in map.iter() {
118                if let Some(strong) = weak.resolve() {
119                    pairs.push((key.clone(), strong));
120                }
121            }
122            pairs
123        };
124
125        stream! {
126            if counters.is_empty() {
127                target.push_str("null}");
128                yield target;
129                return;
130            }
131
132            let labels = K::label_names();
133
134            if labels.len() == 1 {
135                target.push_str("{\"");
136                target.push_str(labels[0]);
137                target.push_str("\":{");
138            } else {
139                target.push('[');
140            }
141
142            let mut buffer = Some(target);
143
144            for (i, (key, counter)) in counters.iter().enumerate() {
145                let Some(target) = buffer.as_mut() else {break;};
146                if i > 0 {
147                    target.push_str(",\n");
148                }
149
150                let value = counter.get().to_string();
151                key.emit_json_value(target,&value);
152
153                let need_flush = target.len() >= CHUNK_SIZE;
154
155                if need_flush {
156                    yield buffer.take().expect("always have buffer");
157                    buffer.replace(String::with_capacity(CHUNK_SIZE));
158                }
159            }
160
161            let Some(mut target) = buffer.take() else {return;};
162            if labels.len() == 1 {
163                target.push_str("}}}");
164            } else {
165                target.push_str("]}");
166            }
167
168            yield target;
169        }
170        .boxed()
171    }
172
173    fn prune(&self) {
174        if !V::needs_pruning() {
175            return;
176        }
177
178        let mut map = self.map.write();
179        map.retain(|_key, entry| entry.resolve().is_some());
180    }
181}
182
183/// Either a Counter or Gauge with a specific name, where there can
184/// be multiple labelled counter instances.
185///
186/// CounterRegistry has a PruningCounterRegistry variant which will
187/// drop unreferenced counter instances when they fall out of scope.
188///
189/// The key type K must be created via the label_key! macro provided
190/// by this crate. It allows making type-safe keys and resolving
191/// counter instances without making extraneous copies of the keys.
192///
193/// CounterRegistry implements the StreamingCollector trait which
194/// allows for efficient streaming serialization of its set of
195/// counters in either text or json format.
196pub struct CounterRegistry<K, V: AtomicCounterEntry = AtomicCounter> {
197    inner: Arc<CounterRegistryInner<K, V>>,
198}
199
200impl<K, V: AtomicCounterEntry> CounterRegistry<K, V> {
201    pub fn metric_type(&self) -> MetricType {
202        self.inner.metric_type
203    }
204}
205
206pub type PruningCounterRegistry<K> = CounterRegistry<K, WeakAtomicCounter>;
207
208#[derive(Serialize, Clone, Copy)]
209pub enum MetricType {
210    Counter,
211    Gauge,
212    Histogram,
213}
214
215impl MetricType {
216    pub fn label(&self) -> &'static str {
217        match self {
218            Self::Counter => "counter",
219            Self::Gauge => "gauge",
220            Self::Histogram => "histogram",
221        }
222    }
223}
224
225#[derive(Serialize, Clone, Copy)]
226pub enum MetricPrune {
227    Pruning,
228    NonPruning,
229}
230
231#[derive(Serialize, Clone)]
232pub struct CounterDescription {
233    /// The name of the counter, as it appears in the metric export
234    pub name: String,
235    /// one-line help description that is included in the metric export
236    pub help: String,
237    /// If multi-line comments are present, this will hold the comments
238    /// after the first help line.
239    pub doc: Option<String>,
240    /// What sort of metric this is
241    pub metric_type: MetricType,
242    /// If the metric has labels, this lists them out
243    pub label_names: Vec<String>,
244    /// If the metric is a histogram, this holds the bucket thresholds
245    pub buckets: Vec<f64>,
246    /// True if the metric is subject to pruning
247    pub pruning: MetricPrune,
248}
249
250/// Accumulates metric metadata
251#[linkme::distributed_slice]
252pub static COUNTER_METADATA: [fn() -> CounterDescription];
253
254fn compute_metadata() -> Vec<CounterDescription> {
255    let mut metadata = vec![];
256
257    for func in COUNTER_METADATA.iter() {
258        let desc = (func)();
259        metadata.push(desc);
260    }
261
262    metadata.sort_by(|a, b| a.name.cmp(&b.name));
263    metadata
264}
265
266static METADATA: LazyLock<Vec<CounterDescription>> = LazyLock::new(compute_metadata);
267
268pub fn export_metadata() -> Vec<CounterDescription> {
269    // We're generally called early in a process lifecycle, and Registry
270    // requires a running tokio environment otherwise it will panic,
271    // so we make a little one just for this call.
272    let rt = tokio::runtime::Builder::new_current_thread()
273        .build()
274        .expect("failed to make single thread runtime for export_metadata call");
275    let _guard = rt.enter();
276    METADATA.clone()
277}
278
279/// This macro matches a series of doc comment attributes.
280/// Multi-line doc comments appear as a sequence of doc
281/// comment attributes, so we need to be able to match
282/// both the individual case and the sequence, and map
283/// them back to a single string.
284///
285/// While this macro accepts the no-doc-comment case,
286/// we require that every metric have a doc comment,
287/// so we will emit a compile error if none were present.
288#[macro_export]
289macro_rules! mandatory_doc {
290    ($doc:expr) => {
291        $doc
292    };
293    ($($doc:expr)+) => {
294        // Join the sequence into a multi-line string
295        concat!($($doc, "\n",)+)
296    };
297    () => {
298        compile_error!("doc comments are mandatory")
299    };
300}
301
302/// Utility function for dealing with doc comment metadata.
303/// Look for two successive line breaks; if they are present
304/// they denote the break between the short first-logical-line
305/// and a longer descriptive exposition.  Returns that first
306/// logical line and the optional exposition.
307pub fn split_help(help: &str) -> (String, Option<String>) {
308    // The input will be a series of lines each with a space
309    // at the start because "/// something" -> " something".
310    // Normalize those away.
311    let normalized = help.trim().replace("\n ", "\n");
312
313    fn one_line(s: &str) -> String {
314        s.replace("\n", " ").trim().to_string()
315    }
316
317    match normalized.split_once("\n\n") {
318        Some((a, b)) => (one_line(a), Some(b.to_string())),
319        None => (one_line(help), None),
320    }
321}
322
323#[doc(hidden)]
324#[macro_export]
325macro_rules! __histogram_buckets {
326    () => {
327        $crate::prometheus::DEFAULT_BUCKETS.to_vec()
328    };
329    ($buckets:expr) => {
330        $buckets
331    };
332}
333
334#[doc(hidden)]
335#[macro_export]
336macro_rules! __register_metric {
337    (
338        $sym:ident,
339        $name:expr,
340        $($doc:expr)+,
341        $metric:expr,
342        $labels:expr,
343        $pruning:expr
344    ) => {
345        $crate::__register_metric!($sym, $name, $($doc)+, $metric, $labels, $pruning, vec![]);
346    };
347
348    (
349        $sym:ident,
350        $name:expr,
351        $($doc:expr)+,
352        $metric:expr,
353        $labels:expr,
354        $pruning:expr,
355        $buckets:expr
356    ) => {
357        // Link into COUNTER_METADATA
358        $crate::paste::paste! {
359            #[linkme::distributed_slice($crate::COUNTER_METADATA)]
360            static [<VIVIFY_ $sym>]: fn() -> $crate::CounterDescription = || {
361                ::std::sync::LazyLock::force(&$sym);
362                let (help, doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
363                let labels : &[&str] = $labels;
364                $crate::CounterDescription {
365                    name: $name.to_string(),
366                    help,
367                    doc,
368                    metric_type: $metric,
369                    label_names: labels.iter().map(|s| s.to_string()).collect(),
370                    buckets: $buckets,
371                    pruning: $pruning,
372                }
373            };
374        }
375    }
376}
377
378/// This macro aids in declaring metrics.  Usage looks like:
379///
380/// ```rust
381/// declare_metric! {
382/// /// The number of active outgoing connections in the system,
383/// /// keyed by the service name.
384/// pub static CONN_GAUGE: PruningGaugeRegistry<ServiceKey>("connection_count");
385/// }
386/// ```
387///
388/// This will wrap the declaration of the global into a LazyLock as
389/// well as capture metadata about the metric in a way that allows
390/// it to be retrieved via the `export_metadata()` function.
391///
392/// Doc comments are required for every metric declared by this
393/// macro.
394///
395/// Different types of metric collector are supported, not just
396/// the `PruningGaugeRegistry` shown above.
397///
398/// `PruningGaugeRegistry` is not actually a real type, it is
399/// some sugar allowed here to enable setting up a Gauge
400/// rather than a Counter.
401#[macro_export]
402macro_rules! declare_metric {
403    (
404        $(#[doc = $doc:expr])*
405        $vis:vis
406        static $sym:ident:
407        CounterRegistry<$key:ty>(
408            $name:expr $(,)?
409        );
410    ) => {
411        $(#[doc = $doc])*
412        $vis static $sym: ::std::sync::LazyLock<$crate::CounterRegistry<$key>> =
413            ::std::sync::LazyLock::new(
414                || {
415                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
416                    $crate::CounterRegistry::register($name, help)
417                });
418
419        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Counter,
420            <$key as $crate::labels::MetricLabel>::label_names(),
421            $crate::MetricPrune::NonPruning
422        );
423    };
424
425    (
426        $(#[doc = $doc:expr])*
427        $vis:vis
428        static $sym:ident:
429        PruningCounterRegistry<$key:ty>(
430            $name:expr $(,)?
431        );
432    ) => {
433        $(#[doc = $doc])*
434        $vis static $sym: ::std::sync::LazyLock<$crate::PruningCounterRegistry<$key>> =
435            ::std::sync::LazyLock::new(
436                || {
437                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
438                    $crate::PruningCounterRegistry::register($name, help)
439                });
440
441        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Counter,
442            <$key as $crate::labels::MetricLabel>::label_names(),
443            $crate::MetricPrune::Pruning
444        );
445    };
446
447    (
448        $(#[doc = $doc:expr])*
449        $vis:vis
450        static $sym:ident:
451        PruningGaugeRegistry<$key:ty>(
452            $name:expr $(,)?
453        );
454    ) => {
455        $(#[doc = $doc])*
456        $vis static $sym: ::std::sync::LazyLock<$crate::PruningCounterRegistry<$key>> =
457            ::std::sync::LazyLock::new(
458                || {
459                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
460                    $crate::PruningCounterRegistry::register_gauge($name, help)
461                });
462
463        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Gauge,
464            <$key as $crate::labels::MetricLabel>::label_names(),
465            $crate::MetricPrune::Pruning
466        );
467    };
468
469
470    (
471        $(#[doc = $doc:expr])*
472        $vis:vis
473        static $sym:ident:
474        IntGaugeVec(
475            $name:expr,
476            $labels:expr $(,)*
477        );
478    ) => {
479        $(#[doc = $doc])*
480        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::IntGaugeVec> =
481            ::std::sync::LazyLock::new(
482                || {
483                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
484
485                    $crate::prometheus::register_int_gauge_vec!(
486                        $name,
487                        help,
488                        $labels
489                    ).unwrap()
490                });
491
492        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Gauge,
493            $labels, $crate::MetricPrune::NonPruning);
494    };
495    (
496        $(#[doc = $doc:expr])*
497        $vis:vis
498        static $sym:ident:
499        IntCounterVec(
500            $name:expr,
501            $labels:expr $(,)*
502        );
503    ) => {
504        $(#[doc = $doc])*
505        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::IntCounterVec> =
506            ::std::sync::LazyLock::new(
507                || {
508                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
509
510                    $crate::prometheus::register_int_counter_vec!(
511                        $name,
512                        help,
513                        $labels
514                    ).unwrap()
515                });
516
517        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Counter,
518            &$labels[..], $crate::MetricPrune::NonPruning);
519    };
520    (
521        $(#[doc = $doc:expr])*
522        $vis:vis
523        static $sym:ident:
524        CounterVec(
525            $name:expr,
526            $labels:expr $(,)*
527        );
528    ) => {
529        $(#[doc = $doc])*
530        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::CounterVec> =
531            ::std::sync::LazyLock::new(
532                || {
533                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
534
535                    $crate::prometheus::register_counter_vec!(
536                        $name,
537                        help,
538                        $labels
539                    ).unwrap()
540                });
541
542        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Counter,
543            &$labels[..], $crate::MetricPrune::NonPruning);
544    };
545    (
546        $(#[doc = $doc:expr])*
547        $vis:vis
548        static $sym:ident:
549        HistogramVec(
550            $name:expr,
551            $labels:expr
552            $(, $buckets:expr)?
553            $(,)*
554        );
555    ) => {
556        $(#[doc = $doc])*
557        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::HistogramVec> =
558            ::std::sync::LazyLock::new(
559                || {
560                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
561
562                    $crate::prometheus::register_histogram_vec!(
563                        $name,
564                        help,
565                        $labels
566                        $(,$buckets)?
567                    ).unwrap()
568                });
569
570        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Histogram,
571            &$labels[..],
572            $crate::MetricPrune::NonPruning,
573            $crate::__histogram_buckets!($($buckets)?)
574        );
575    };
576
577    (
578        $(#[doc = $doc:expr])*
579        $vis:vis
580        static $sym:ident:
581        Histogram($name:expr $(, $buckets:expr)?);
582    ) => {
583        $(#[doc = $doc])*
584        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::Histogram> =
585            ::std::sync::LazyLock::new(
586                || {
587                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
588
589                    $crate::prometheus::register_histogram!(
590                        $name,
591                        help,
592                        $($buckets)?
593                    ).unwrap()
594                });
595
596        $crate::__register_metric!($sym, $name, $($doc)*, $crate::MetricType::Histogram, &[],
597            $crate::MetricPrune::NonPruning,
598            $crate::__histogram_buckets!($($buckets)?)
599        );
600    };
601
602    (
603        $(#[doc = $doc:expr])*
604        $vis:vis
605        static $sym:ident:
606        IntCounter($name:expr);
607    ) => {
608        $(#[doc = $doc])*
609        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::IntCounter> =
610            ::std::sync::LazyLock::new(
611                || {
612                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
613
614                    $crate::prometheus::register_int_counter!(
615                        $name,
616                        help,
617                    ).unwrap()
618                });
619
620        $crate::__register_metric!($sym, $name, $($doc)*,
621            $crate::MetricType::Counter, &[], $crate::MetricPrune::NonPruning);
622    };
623
624    (
625        $(#[doc = $doc:expr])*
626        $vis:vis
627        static $sym:ident:
628        IntGauge($name:expr);
629    ) => {
630        $(#[doc = $doc])*
631        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::IntGauge> =
632            ::std::sync::LazyLock::new(
633                || {
634                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
635
636                    $crate::prometheus::register_int_gauge!(
637                        $name,
638                        help,
639                    ).unwrap()
640                });
641
642        $crate::__register_metric!($sym, $name, $($doc)*,
643            $crate::MetricType::Gauge, &[], $crate::MetricPrune::NonPruning);
644    };
645    (
646        $(#[doc = $doc:expr])*
647        $vis:vis
648        static $sym:ident:
649        Gauge($name:expr);
650    ) => {
651        $(#[doc = $doc])*
652        $vis static $sym: ::std::sync::LazyLock<$crate::prometheus::Gauge> =
653            ::std::sync::LazyLock::new(
654                || {
655                    let (help, _doc) = $crate::split_help($crate::mandatory_doc!($($doc)*));
656
657                    $crate::prometheus::register_gauge!(
658                        $name,
659                        help,
660                    ).unwrap()
661                });
662
663        $crate::__register_metric!($sym, $name, $($doc)*,
664            $crate::MetricType::Gauge, &[], $crate::MetricPrune::NonPruning);
665    };
666}
667
668impl<K, V: AtomicCounterEntry> Clone for CounterRegistry<K, V> {
669    fn clone(&self) -> Self {
670        Self {
671            inner: Arc::clone(&self.inner),
672        }
673    }
674}
675
676impl<K: Clone + Send + Sync + MetricLabel + 'static, V: AtomicCounterEntry + 'static>
677    CounterRegistry<K, V>
678{
679    /// Register a set of Counters, values that are only allowed
680    /// to increment.
681    pub fn register(name: &'static str, help: String) -> Self {
682        Self::register_impl(name, help, MetricType::Counter)
683    }
684
685    /// Register a set of Gauges, values that are allowed to increase and decrease.
686    pub fn register_gauge(name: &'static str, help: String) -> Self {
687        Self::register_impl(name, help, MetricType::Gauge)
688    }
689
690    fn register_impl(name: &'static str, help: String, metric_type: MetricType) -> Self {
691        let me = Self {
692            inner: Arc::new(CounterRegistryInner {
693                map: Default::default(),
694                name,
695                help,
696                metric_type,
697            }),
698        };
699
700        crate::registry::Registry::register(me.inner.clone());
701
702        me
703    }
704}
705
706impl<K, V> CounterRegistry<K, V>
707where
708    V: AtomicCounterEntry,
709    K: Eq + Hash + MetricLabel,
710{
711    /// Resolve an already-existing counter for the given key, or None
712    /// if there either has never been such a value, or if it was pruned.
713    pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<AtomicCounter>
714    where
715        K: Borrow<Q>,
716        Q: Hash + Eq,
717    {
718        let map = self.inner.map.read();
719        map.get(key).and_then(|weak| weak.resolve())
720    }
721
722    /// Resolve an already-existing counter for the given key, creating
723    /// a new one if it didn't already exist, or was previously pruned.
724    pub fn get_or_create<'a, Q: ?Sized>(&self, key: &'a Q) -> AtomicCounter
725    where
726        K: Borrow<Q> + From<&'a Q>,
727        Q: Hash + Eq,
728    {
729        let map = self.inner.map.upgradable_read();
730        if let Some(weak) = map.get(key) {
731            if let Some(strong) = weak.resolve() {
732                return strong;
733            }
734        }
735
736        let mut map = RwLockUpgradableReadGuard::upgrade(map);
737
738        // Check again, as we may have lost a race
739        if let Some(weak) = map.get(key) {
740            if let Some(strong) = weak.resolve() {
741                return strong;
742            }
743        }
744
745        let result = AtomicCounter::new();
746        map.insert(key.into(), V::make_storable(&result));
747
748        result
749    }
750}