1pub use crate::counter::*;
2use crate::labels::MetricLabel;
3use crate::registry::StreamingCollector;
4use async_stream::stream;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use parking_lot::{RwLock, RwLockUpgradableReadGuard};
8pub use pastey as paste;
9use std::borrow::Borrow;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::sync::Arc;
13
14mod counter;
15pub mod counter_bundle;
16
17#[macro_use]
18pub mod labels;
19pub mod parser;
20pub mod registry;
21
22struct CounterRegistryInner<K, V: AtomicCounterEntry> {
23 map: RwLock<HashMap<K, V>>,
24 name: &'static str,
25 help: &'static str,
26 is_gauge: bool,
27}
28
29const CHUNK_SIZE: usize = 4 * 1024;
32
33impl<K: Clone + MetricLabel + Send + Sync, V: AtomicCounterEntry> StreamingCollector
34 for CounterRegistryInner<K, V>
35{
36 fn stream_text(&'_ self, prefix: &Option<String>) -> BoxStream<'_, String> {
37 let mut buffer = String::with_capacity(CHUNK_SIZE);
45 buffer.push_str("# HELP ");
46 let prefix = prefix.as_deref().unwrap_or("").to_string();
47 buffer.push_str(&prefix);
48 buffer.push_str(self.name);
49 buffer.push(' ');
50 buffer.push_str(self.help);
51 buffer.push_str("\n# TYPE ");
52 buffer.push_str(&prefix);
53 buffer.push_str(self.name);
54 buffer.push(' ');
55 buffer.push_str(if self.is_gauge { "gauge" } else { "counter" });
56 buffer.push('\n');
57
58 let mut buffer = Some(buffer);
59
60 let counters = {
61 let map = self.map.read();
62 let mut pairs = Vec::with_capacity(map.len());
63 for (key, weak) in map.iter() {
64 if let Some(strong) = weak.resolve() {
65 pairs.push((key.clone(), strong));
66 }
67 }
68 pairs
69 };
70
71 stream! {
72 for (key, counter) in counters {
73 let Some(buf) = buffer.as_mut() else {break;};
74
75 buf.push_str(&prefix);
76 buf.push_str(self.name);
77 key.emit_text_value(buf, &counter.get().to_string());
78 buf.push('\n');
79
80 let need_flush = buf.len() >= CHUNK_SIZE;
81
82 if need_flush {
83 yield buffer.take().expect("always have buffer");
84 buffer.replace(String::with_capacity(CHUNK_SIZE));
85 }
86 }
87
88 if let Some(buf) = buffer.take() {
89 if !buf.is_empty(){
90 yield buf;
91 }
92 }
93
94 }
95 .boxed()
96 }
97
98 fn stream_json(&'_ self) -> BoxStream<'_, String> {
99 let mut target = String::with_capacity(CHUNK_SIZE);
100 target.push_str(",\n\"");
101 target.push_str(self.name);
102 target.push_str("\":{");
103 if !self.help.is_empty() {
104 target.push_str("\"help\":\"");
105 target.push_str(self.help);
106 target.push_str("\",");
107 }
108 target.push_str("\"type\":\"");
109 target.push_str(if self.is_gauge { "gauge" } else { "counter" });
110 target.push_str("\",\"value\":");
111
112 let counters = {
113 let map = self.map.read();
114 let mut pairs = Vec::with_capacity(map.len());
115 for (key, weak) in map.iter() {
116 if let Some(strong) = weak.resolve() {
117 pairs.push((key.clone(), strong));
118 }
119 }
120 pairs
121 };
122
123 stream! {
124 if counters.is_empty() {
125 target.push_str("null}");
126 yield target;
127 return;
128 }
129
130 let labels = K::label_names();
131
132 if labels.len() == 1 {
133 target.push_str("{\"");
134 target.push_str(labels[0]);
135 target.push_str("\":{");
136 } else {
137 target.push('[');
138 }
139
140 let mut buffer = Some(target);
141
142 for (i, (key, counter)) in counters.iter().enumerate() {
143 let Some(target) = buffer.as_mut() else {break;};
144 if i > 0 {
145 target.push_str(",\n");
146 }
147
148 let value = counter.get().to_string();
149 key.emit_json_value(target,&value);
150
151 let need_flush = target.len() >= CHUNK_SIZE;
152
153 if need_flush {
154 yield buffer.take().expect("always have buffer");
155 buffer.replace(String::with_capacity(CHUNK_SIZE));
156 }
157 }
158
159 let Some(mut target) = buffer.take() else {return;};
160 if labels.len() == 1 {
161 target.push_str("}}}");
162 } else {
163 target.push_str("]}");
164 }
165
166 yield target;
167 }
168 .boxed()
169 }
170
171 fn prune(&self) {
172 if !V::needs_pruning() {
173 return;
174 }
175
176 let mut map = self.map.write();
177 map.retain(|_key, entry| entry.resolve().is_some());
178 }
179}
180
181pub struct CounterRegistry<K, V: AtomicCounterEntry = AtomicCounter> {
195 inner: Arc<CounterRegistryInner<K, V>>,
196}
197
198pub type PruningCounterRegistry<K> = CounterRegistry<K, WeakAtomicCounter>;
199
200impl<K, V: AtomicCounterEntry> Clone for CounterRegistry<K, V> {
201 fn clone(&self) -> Self {
202 Self {
203 inner: Arc::clone(&self.inner),
204 }
205 }
206}
207
208impl<K: Clone + Send + Sync + MetricLabel + 'static, V: AtomicCounterEntry + 'static>
209 CounterRegistry<K, V>
210{
211 pub fn register(name: &'static str, help: &'static str) -> Self {
214 Self::register_impl(name, help, false)
215 }
216
217 pub fn register_gauge(name: &'static str, help: &'static str) -> Self {
219 Self::register_impl(name, help, true)
220 }
221
222 fn register_impl(name: &'static str, help: &'static str, is_gauge: bool) -> Self {
223 let me = Self {
224 inner: Arc::new(CounterRegistryInner {
225 map: Default::default(),
226 name,
227 help,
228 is_gauge,
229 }),
230 };
231
232 crate::registry::Registry::register(me.inner.clone());
233
234 me
235 }
236}
237
238impl<K, V> CounterRegistry<K, V>
239where
240 V: AtomicCounterEntry,
241 K: Eq + Hash + MetricLabel,
242{
243 pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<AtomicCounter>
246 where
247 K: Borrow<Q>,
248 Q: Hash + Eq,
249 {
250 let map = self.inner.map.read();
251 map.get(key).and_then(|weak| weak.resolve())
252 }
253
254 pub fn get_or_create<'a, Q: ?Sized>(&self, key: &'a Q) -> AtomicCounter
257 where
258 K: Borrow<Q> + From<&'a Q>,
259 Q: Hash + Eq,
260 {
261 let map = self.inner.map.upgradable_read();
262 if let Some(weak) = map.get(key) {
263 if let Some(strong) = weak.resolve() {
264 return strong;
265 }
266 }
267
268 let mut map = RwLockUpgradableReadGuard::upgrade(map);
269
270 if let Some(weak) = map.get(key) {
272 if let Some(strong) = weak.resolve() {
273 return strong;
274 }
275 }
276
277 let result = AtomicCounter::new();
278 map.insert(key.into(), V::make_storable(&result));
279
280 result
281 }
282}