1pub use crate::counter::*;
2use crate::labels::MetricLabel;
3use crate::registry::StreamingCollector;
4use async_stream::stream;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use parking_lot::{RwLock, RwLockUpgradableReadGuard};
8pub use paste;
9use std::borrow::Borrow;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::sync::Arc;
13
14mod counter;
15pub mod counter_bundle;
16
17#[macro_use]
18pub mod labels;
19pub mod parser;
20pub mod registry;
21
22struct CounterRegistryInner<K, V: AtomicCounterEntry> {
23 map: RwLock<HashMap<K, V>>,
24 name: &'static str,
25 help: &'static str,
26 is_gauge: bool,
27}
28
29const CHUNK_SIZE: usize = 4 * 1024;
32
33impl<K: Clone + MetricLabel + Send + Sync, V: AtomicCounterEntry> StreamingCollector
34 for CounterRegistryInner<K, V>
35{
36 fn stream_text(&self, prefix: &Option<String>) -> BoxStream<String> {
37 let mut buffer = String::with_capacity(CHUNK_SIZE);
45 buffer.push_str("# HELP ");
46 let prefix = prefix.as_deref().unwrap_or("");
47 buffer.push_str(prefix);
48 buffer.push_str(self.name);
49 buffer.push(' ');
50 buffer.push_str(self.help);
51 buffer.push_str("\n# TYPE ");
52 buffer.push_str(prefix);
53 buffer.push_str(self.name);
54 buffer.push(' ');
55 buffer.push_str(if self.is_gauge { "gauge" } else { "counter" });
56 buffer.push('\n');
57
58 let mut buffer = Some(buffer);
59
60 let counters = {
61 let map = self.map.read();
62 let mut pairs = Vec::with_capacity(map.len());
63 for (key, weak) in map.iter() {
64 if let Some(strong) = weak.resolve() {
65 pairs.push((key.clone(), strong));
66 }
67 }
68 pairs
69 };
70
71 stream! {
72 for (key, counter) in counters {
73 let Some(buf) = buffer.as_mut() else {break;};
74
75 buf.push_str(self.name);
76 key.emit_text_value(buf, &counter.get().to_string());
77 buf.push('\n');
78
79 let need_flush = buf.len() >= CHUNK_SIZE;
80
81 if need_flush {
82 yield buffer.take().expect("always have buffer");
83 buffer.replace(String::with_capacity(CHUNK_SIZE));
84 }
85 }
86
87 if let Some(buf) = buffer.take() {
88 if !buf.is_empty(){
89 yield buf;
90 }
91 }
92
93 }
94 .boxed()
95 }
96
97 fn stream_json(&self) -> BoxStream<String> {
98 let mut target = String::with_capacity(CHUNK_SIZE);
99 target.push_str(",\n\"");
100 target.push_str(self.name);
101 target.push_str("\":{");
102 if !self.help.is_empty() {
103 target.push_str("\"help\":\"");
104 target.push_str(self.help);
105 target.push_str("\",");
106 }
107 target.push_str("\"type\":\"");
108 target.push_str(if self.is_gauge { "gauge" } else { "counter" });
109 target.push_str("\",\"value\":");
110
111 let counters = {
112 let map = self.map.read();
113 let mut pairs = Vec::with_capacity(map.len());
114 for (key, weak) in map.iter() {
115 if let Some(strong) = weak.resolve() {
116 pairs.push((key.clone(), strong));
117 }
118 }
119 pairs
120 };
121
122 stream! {
123 if counters.is_empty() {
124 target.push_str("null}");
125 yield target;
126 return;
127 }
128
129 let labels = K::label_names();
130
131 if labels.len() == 1 {
132 target.push_str("{\"");
133 target.push_str(labels[0]);
134 target.push_str("\":{");
135 } else {
136 target.push('[');
137 }
138
139 let mut buffer = Some(target);
140
141 for (i, (key, counter)) in counters.iter().enumerate() {
142 let Some(target) = buffer.as_mut() else {break;};
143 if i > 0 {
144 target.push_str(",\n");
145 }
146
147 let value = counter.get().to_string();
148 key.emit_json_value(target,&value);
149
150 let need_flush = target.len() >= CHUNK_SIZE;
151
152 if need_flush {
153 yield buffer.take().expect("always have buffer");
154 buffer.replace(String::with_capacity(CHUNK_SIZE));
155 }
156 }
157
158 let Some(mut target) = buffer.take() else {return;};
159 if labels.len() == 1 {
160 target.push_str("}}}");
161 } else {
162 target.push_str("]}");
163 }
164
165 yield target;
166 }
167 .boxed()
168 }
169
170 fn prune(&self) {
171 if !V::needs_pruning() {
172 return;
173 }
174
175 let mut map = self.map.write();
176 map.retain(|_key, entry| entry.resolve().is_some());
177 }
178}
179
180pub struct CounterRegistry<K, V: AtomicCounterEntry = AtomicCounter> {
194 inner: Arc<CounterRegistryInner<K, V>>,
195}
196
197pub type PruningCounterRegistry<K> = CounterRegistry<K, WeakAtomicCounter>;
198
199impl<K, V: AtomicCounterEntry> Clone for CounterRegistry<K, V> {
200 fn clone(&self) -> Self {
201 Self {
202 inner: Arc::clone(&self.inner),
203 }
204 }
205}
206
207impl<K: Clone + Send + Sync + MetricLabel + 'static, V: AtomicCounterEntry + 'static>
208 CounterRegistry<K, V>
209{
210 pub fn register(name: &'static str, help: &'static str) -> Self {
213 Self::register_impl(name, help, false)
214 }
215
216 pub fn register_gauge(name: &'static str, help: &'static str) -> Self {
218 Self::register_impl(name, help, true)
219 }
220
221 fn register_impl(name: &'static str, help: &'static str, is_gauge: bool) -> Self {
222 let me = Self {
223 inner: Arc::new(CounterRegistryInner {
224 map: Default::default(),
225 name,
226 help,
227 is_gauge,
228 }),
229 };
230
231 crate::registry::Registry::register(me.inner.clone());
232
233 me
234 }
235}
236
237impl<K, V> CounterRegistry<K, V>
238where
239 V: AtomicCounterEntry,
240 K: Eq + Hash + MetricLabel,
241{
242 pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<AtomicCounter>
245 where
246 K: Borrow<Q>,
247 Q: Hash + Eq,
248 {
249 let map = self.inner.map.read();
250 map.get(key).and_then(|weak| weak.resolve())
251 }
252
253 pub fn get_or_create<'a, Q: ?Sized>(&self, key: &'a Q) -> AtomicCounter
256 where
257 K: Borrow<Q> + From<&'a Q>,
258 Q: Hash + Eq,
259 {
260 let map = self.inner.map.upgradable_read();
261 if let Some(weak) = map.get(key) {
262 if let Some(strong) = weak.resolve() {
263 return strong;
264 }
265 }
266
267 let mut map = RwLockUpgradableReadGuard::upgrade(map);
268
269 if let Some(weak) = map.get(key) {
271 if let Some(strong) = weak.resolve() {
272 return strong;
273 }
274 }
275
276 let result = AtomicCounter::new();
277 map.insert(key.into(), V::make_storable(&result));
278
279 result
280 }
281}