kumo_prometheus/
lib.rs

1pub use crate::counter::*;
2use crate::labels::MetricLabel;
3use crate::registry::StreamingCollector;
4use async_stream::stream;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use parking_lot::{RwLock, RwLockUpgradableReadGuard};
8pub use pastey as paste;
9use std::borrow::Borrow;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::sync::Arc;
13
14mod counter;
15pub mod counter_bundle;
16
17#[macro_use]
18pub mod labels;
19pub mod parser;
20pub mod registry;
21
22struct CounterRegistryInner<K, V: AtomicCounterEntry> {
23    map: RwLock<HashMap<K, V>>,
24    name: &'static str,
25    help: &'static str,
26    is_gauge: bool,
27}
28
29/// Keep up to 4k at a time of pending text or json data
30/// when streaming out the serialized counter values
31const CHUNK_SIZE: usize = 4 * 1024;
32
33impl<K: Clone + MetricLabel + Send + Sync, V: AtomicCounterEntry> StreamingCollector
34    for CounterRegistryInner<K, V>
35{
36    fn stream_text(&'_ self, prefix: &Option<String>) -> BoxStream<'_, String> {
37        /*
38        # HELP tokio_total_overflow_count The number of times worker threads saturated their local queues.
39        # TYPE tokio_total_overflow_count counter
40        tokio_total_overflow_count 0
41        total_connection_count{service="smtp_client:source2->loopback.dummy-mx.wezfurlong.org@smtp_client"} 25
42        */
43
44        let mut buffer = String::with_capacity(CHUNK_SIZE);
45        buffer.push_str("# HELP ");
46        let prefix = prefix.as_deref().unwrap_or("").to_string();
47        buffer.push_str(&prefix);
48        buffer.push_str(self.name);
49        buffer.push(' ');
50        buffer.push_str(self.help);
51        buffer.push_str("\n# TYPE ");
52        buffer.push_str(&prefix);
53        buffer.push_str(self.name);
54        buffer.push(' ');
55        buffer.push_str(if self.is_gauge { "gauge" } else { "counter" });
56        buffer.push('\n');
57
58        let mut buffer = Some(buffer);
59
60        let counters = {
61            let map = self.map.read();
62            let mut pairs = Vec::with_capacity(map.len());
63            for (key, weak) in map.iter() {
64                if let Some(strong) = weak.resolve() {
65                    pairs.push((key.clone(), strong));
66                }
67            }
68            pairs
69        };
70
71        stream! {
72            for (key, counter) in counters {
73                let Some(buf) = buffer.as_mut() else {break;};
74
75                buf.push_str(&prefix);
76                buf.push_str(self.name);
77                key.emit_text_value(buf, &counter.get().to_string());
78                buf.push('\n');
79
80                let need_flush = buf.len() >= CHUNK_SIZE;
81
82                if need_flush {
83                    yield buffer.take().expect("always have buffer");
84                    buffer.replace(String::with_capacity(CHUNK_SIZE));
85                }
86            }
87
88            if let Some(buf) = buffer.take() {
89                if !buf.is_empty(){
90                    yield buf;
91                }
92            }
93
94        }
95        .boxed()
96    }
97
98    fn stream_json(&'_ self) -> BoxStream<'_, String> {
99        let mut target = String::with_capacity(CHUNK_SIZE);
100        target.push_str(",\n\"");
101        target.push_str(self.name);
102        target.push_str("\":{");
103        if !self.help.is_empty() {
104            target.push_str("\"help\":\"");
105            target.push_str(self.help);
106            target.push_str("\",");
107        }
108        target.push_str("\"type\":\"");
109        target.push_str(if self.is_gauge { "gauge" } else { "counter" });
110        target.push_str("\",\"value\":");
111
112        let counters = {
113            let map = self.map.read();
114            let mut pairs = Vec::with_capacity(map.len());
115            for (key, weak) in map.iter() {
116                if let Some(strong) = weak.resolve() {
117                    pairs.push((key.clone(), strong));
118                }
119            }
120            pairs
121        };
122
123        stream! {
124            if counters.is_empty() {
125                target.push_str("null}");
126                yield target;
127                return;
128            }
129
130            let labels = K::label_names();
131
132            if labels.len() == 1 {
133                target.push_str("{\"");
134                target.push_str(labels[0]);
135                target.push_str("\":{");
136            } else {
137                target.push('[');
138            }
139
140            let mut buffer = Some(target);
141
142            for (i, (key, counter)) in counters.iter().enumerate() {
143                let Some(target) = buffer.as_mut() else {break;};
144                if i > 0 {
145                    target.push_str(",\n");
146                }
147
148                let value = counter.get().to_string();
149                key.emit_json_value(target,&value);
150
151                let need_flush = target.len() >= CHUNK_SIZE;
152
153                if need_flush {
154                    yield buffer.take().expect("always have buffer");
155                    buffer.replace(String::with_capacity(CHUNK_SIZE));
156                }
157            }
158
159            let Some(mut target) = buffer.take() else {return;};
160            if labels.len() == 1 {
161                target.push_str("}}}");
162            } else {
163                target.push_str("]}");
164            }
165
166            yield target;
167        }
168        .boxed()
169    }
170
171    fn prune(&self) {
172        if !V::needs_pruning() {
173            return;
174        }
175
176        let mut map = self.map.write();
177        map.retain(|_key, entry| entry.resolve().is_some());
178    }
179}
180
181/// Either a Counter or Gauge with a specific name, where there can
182/// be multiple labelled counter instances.
183///
184/// CounterRegistry has a PruningCounterRegistry variant which will
185/// drop unreferenced counter instances when they fall out of scope.
186///
187/// The key type K must be created via the label_key! macro provided
188/// by this crate. It allows making type-safe keys and resolving
189/// counter instances without making extraneous copies of the keys.
190///
191/// CounterRegistry implements the StreamingCollector trait which
192/// allows for efficient streaming serialization of its set of
193/// counters in either text or json format.
194pub struct CounterRegistry<K, V: AtomicCounterEntry = AtomicCounter> {
195    inner: Arc<CounterRegistryInner<K, V>>,
196}
197
198pub type PruningCounterRegistry<K> = CounterRegistry<K, WeakAtomicCounter>;
199
200impl<K, V: AtomicCounterEntry> Clone for CounterRegistry<K, V> {
201    fn clone(&self) -> Self {
202        Self {
203            inner: Arc::clone(&self.inner),
204        }
205    }
206}
207
208impl<K: Clone + Send + Sync + MetricLabel + 'static, V: AtomicCounterEntry + 'static>
209    CounterRegistry<K, V>
210{
211    /// Register a set of Counters, values that are only allowed
212    /// to increment.
213    pub fn register(name: &'static str, help: &'static str) -> Self {
214        Self::register_impl(name, help, false)
215    }
216
217    /// Register a set of Gauges, values that are allowed to increase and decrease.
218    pub fn register_gauge(name: &'static str, help: &'static str) -> Self {
219        Self::register_impl(name, help, true)
220    }
221
222    fn register_impl(name: &'static str, help: &'static str, is_gauge: bool) -> Self {
223        let me = Self {
224            inner: Arc::new(CounterRegistryInner {
225                map: Default::default(),
226                name,
227                help,
228                is_gauge,
229            }),
230        };
231
232        crate::registry::Registry::register(me.inner.clone());
233
234        me
235    }
236}
237
238impl<K, V> CounterRegistry<K, V>
239where
240    V: AtomicCounterEntry,
241    K: Eq + Hash + MetricLabel,
242{
243    /// Resolve an already-existing counter for the given key, or None
244    /// if there either has never been such a value, or if it was pruned.
245    pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<AtomicCounter>
246    where
247        K: Borrow<Q>,
248        Q: Hash + Eq,
249    {
250        let map = self.inner.map.read();
251        map.get(key).and_then(|weak| weak.resolve())
252    }
253
254    /// Resolve an already-existing counter for the given key, creating
255    /// a new one if it didn't already exist, or was previously pruned.
256    pub fn get_or_create<'a, Q: ?Sized>(&self, key: &'a Q) -> AtomicCounter
257    where
258        K: Borrow<Q> + From<&'a Q>,
259        Q: Hash + Eq,
260    {
261        let map = self.inner.map.upgradable_read();
262        if let Some(weak) = map.get(key) {
263            if let Some(strong) = weak.resolve() {
264                return strong;
265            }
266        }
267
268        let mut map = RwLockUpgradableReadGuard::upgrade(map);
269
270        // Check again, as we may have lost a race
271        if let Some(weak) = map.get(key) {
272            if let Some(strong) = weak.resolve() {
273                return strong;
274            }
275        }
276
277        let result = AtomicCounter::new();
278        map.insert(key.into(), V::make_storable(&result));
279
280        result
281    }
282}