kumo_prometheus/
lib.rs

1pub use crate::counter::*;
2use crate::labels::MetricLabel;
3use crate::registry::StreamingCollector;
4use async_stream::stream;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use parking_lot::{RwLock, RwLockUpgradableReadGuard};
8pub use paste;
9use std::borrow::Borrow;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::sync::Arc;
13
14mod counter;
15pub mod counter_bundle;
16
17#[macro_use]
18pub mod labels;
19pub mod parser;
20pub mod registry;
21
22struct CounterRegistryInner<K, V: AtomicCounterEntry> {
23    map: RwLock<HashMap<K, V>>,
24    name: &'static str,
25    help: &'static str,
26    is_gauge: bool,
27}
28
29/// Keep up to 4k at a time of pending text or json data
30/// when streaming out the serialized counter values
31const CHUNK_SIZE: usize = 4 * 1024;
32
33impl<K: Clone + MetricLabel + Send + Sync, V: AtomicCounterEntry> StreamingCollector
34    for CounterRegistryInner<K, V>
35{
36    fn stream_text(&self, prefix: &Option<String>) -> BoxStream<String> {
37        /*
38        # HELP tokio_total_overflow_count The number of times worker threads saturated their local queues.
39        # TYPE tokio_total_overflow_count counter
40        tokio_total_overflow_count 0
41        total_connection_count{service="smtp_client:source2->loopback.dummy-mx.wezfurlong.org@smtp_client"} 25
42        */
43
44        let mut buffer = String::with_capacity(CHUNK_SIZE);
45        buffer.push_str("# HELP ");
46        let prefix = prefix.as_deref().unwrap_or("");
47        buffer.push_str(prefix);
48        buffer.push_str(self.name);
49        buffer.push(' ');
50        buffer.push_str(self.help);
51        buffer.push_str("\n# TYPE ");
52        buffer.push_str(prefix);
53        buffer.push_str(self.name);
54        buffer.push(' ');
55        buffer.push_str(if self.is_gauge { "gauge" } else { "counter" });
56        buffer.push('\n');
57
58        let mut buffer = Some(buffer);
59
60        let counters = {
61            let map = self.map.read();
62            let mut pairs = Vec::with_capacity(map.len());
63            for (key, weak) in map.iter() {
64                if let Some(strong) = weak.resolve() {
65                    pairs.push((key.clone(), strong));
66                }
67            }
68            pairs
69        };
70
71        stream! {
72            for (key, counter) in counters {
73                let Some(buf) = buffer.as_mut() else {break;};
74
75                buf.push_str(self.name);
76                key.emit_text_value(buf, &counter.get().to_string());
77                buf.push('\n');
78
79                let need_flush = buf.len() >= CHUNK_SIZE;
80
81                if need_flush {
82                    yield buffer.take().expect("always have buffer");
83                    buffer.replace(String::with_capacity(CHUNK_SIZE));
84                }
85            }
86
87            if let Some(buf) = buffer.take() {
88                if !buf.is_empty(){
89                    yield buf;
90                }
91            }
92
93        }
94        .boxed()
95    }
96
97    fn stream_json(&self) -> BoxStream<String> {
98        let mut target = String::with_capacity(CHUNK_SIZE);
99        target.push_str(",\n\"");
100        target.push_str(self.name);
101        target.push_str("\":{");
102        if !self.help.is_empty() {
103            target.push_str("\"help\":\"");
104            target.push_str(self.help);
105            target.push_str("\",");
106        }
107        target.push_str("\"type\":\"");
108        target.push_str(if self.is_gauge { "gauge" } else { "counter" });
109        target.push_str("\",\"value\":");
110
111        let counters = {
112            let map = self.map.read();
113            let mut pairs = Vec::with_capacity(map.len());
114            for (key, weak) in map.iter() {
115                if let Some(strong) = weak.resolve() {
116                    pairs.push((key.clone(), strong));
117                }
118            }
119            pairs
120        };
121
122        stream! {
123            if counters.is_empty() {
124                target.push_str("null}");
125                yield target;
126                return;
127            }
128
129            let labels = K::label_names();
130
131            if labels.len() == 1 {
132                target.push_str("{\"");
133                target.push_str(labels[0]);
134                target.push_str("\":{");
135            } else {
136                target.push('[');
137            }
138
139            let mut buffer = Some(target);
140
141            for (i, (key, counter)) in counters.iter().enumerate() {
142                let Some(target) = buffer.as_mut() else {break;};
143                if i > 0 {
144                    target.push_str(",\n");
145                }
146
147                let value = counter.get().to_string();
148                key.emit_json_value(target,&value);
149
150                let need_flush = target.len() >= CHUNK_SIZE;
151
152                if need_flush {
153                    yield buffer.take().expect("always have buffer");
154                    buffer.replace(String::with_capacity(CHUNK_SIZE));
155                }
156            }
157
158            let Some(mut target) = buffer.take() else {return;};
159            if labels.len() == 1 {
160                target.push_str("}}}");
161            } else {
162                target.push_str("]}");
163            }
164
165            yield target;
166        }
167        .boxed()
168    }
169
170    fn prune(&self) {
171        if !V::needs_pruning() {
172            return;
173        }
174
175        let mut map = self.map.write();
176        map.retain(|_key, entry| entry.resolve().is_some());
177    }
178}
179
180/// Either a Counter or Gauge with a specific name, where there can
181/// be multiple labelled counter instances.
182///
183/// CounterRegistry has a PruningCounterRegistry variant which will
184/// drop unreferenced counter instances when they fall out of scope.
185///
186/// The key type K must be created via the label_key! macro provided
187/// by this crate. It allows making type-safe keys and resolving
188/// counter instances without making extraneous copies of the keys.
189///
190/// CounterRegistry implements the StreamingCollector trait which
191/// allows for efficient streaming serialization of its set of
192/// counters in either text or json format.
193pub struct CounterRegistry<K, V: AtomicCounterEntry = AtomicCounter> {
194    inner: Arc<CounterRegistryInner<K, V>>,
195}
196
197pub type PruningCounterRegistry<K> = CounterRegistry<K, WeakAtomicCounter>;
198
199impl<K, V: AtomicCounterEntry> Clone for CounterRegistry<K, V> {
200    fn clone(&self) -> Self {
201        Self {
202            inner: Arc::clone(&self.inner),
203        }
204    }
205}
206
207impl<K: Clone + Send + Sync + MetricLabel + 'static, V: AtomicCounterEntry + 'static>
208    CounterRegistry<K, V>
209{
210    /// Register a set of Counters, values that are only allowed
211    /// to increment.
212    pub fn register(name: &'static str, help: &'static str) -> Self {
213        Self::register_impl(name, help, false)
214    }
215
216    /// Register a set of Gauges, values that are allowed to increase and decrease.
217    pub fn register_gauge(name: &'static str, help: &'static str) -> Self {
218        Self::register_impl(name, help, true)
219    }
220
221    fn register_impl(name: &'static str, help: &'static str, is_gauge: bool) -> Self {
222        let me = Self {
223            inner: Arc::new(CounterRegistryInner {
224                map: Default::default(),
225                name,
226                help,
227                is_gauge,
228            }),
229        };
230
231        crate::registry::Registry::register(me.inner.clone());
232
233        me
234    }
235}
236
237impl<K, V> CounterRegistry<K, V>
238where
239    V: AtomicCounterEntry,
240    K: Eq + Hash + MetricLabel,
241{
242    /// Resolve an already-existing counter for the given key, or None
243    /// if there either has never been such a value, or if it was pruned.
244    pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<AtomicCounter>
245    where
246        K: Borrow<Q>,
247        Q: Hash + Eq,
248    {
249        let map = self.inner.map.read();
250        map.get(key).and_then(|weak| weak.resolve())
251    }
252
253    /// Resolve an already-existing counter for the given key, creating
254    /// a new one if it didn't already exist, or was previously pruned.
255    pub fn get_or_create<'a, Q: ?Sized>(&self, key: &'a Q) -> AtomicCounter
256    where
257        K: Borrow<Q> + From<&'a Q>,
258        Q: Hash + Eq,
259    {
260        let map = self.inner.map.upgradable_read();
261        if let Some(weak) = map.get(key) {
262            if let Some(strong) = weak.resolve() {
263                return strong;
264            }
265        }
266
267        let mut map = RwLockUpgradableReadGuard::upgrade(map);
268
269        // Check again, as we may have lost a race
270        if let Some(weak) = map.get(key) {
271            if let Some(strong) = weak.resolve() {
272                return strong;
273            }
274        }
275
276        let result = AtomicCounter::new();
277        map.insert(key.into(), V::make_storable(&result));
278
279        result
280    }
281}