1use map_vec::Map;
2use memchr::memchr_iter;
3use std::borrow::Borrow;
4use std::collections::HashSet;
5use std::sync::Arc;
6
7#[derive(Debug, Default, Eq, PartialEq, Clone, Copy)]
8enum MetricType {
9 #[default]
10 Unknown,
11 Counter,
12 Gauge,
13 Histogram,
14}
15
16#[derive(Clone, Eq, Hash, PartialEq)]
17pub struct InternString(Arc<String>);
18
19impl std::ops::Deref for InternString {
20 type Target = str;
21 fn deref(&self) -> &str {
22 self.0.as_str()
23 }
24}
25
26impl PartialEq<&str> for InternString {
27 fn eq(&self, other: &&str) -> bool {
28 self.0.as_str() == *other
29 }
30}
31
32impl std::fmt::Display for InternString {
33 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
34 self.0.as_str().fmt(fmt)
35 }
36}
37
38impl std::fmt::Debug for InternString {
39 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
40 self.0.as_str().fmt(fmt)
41 }
42}
43
44impl Borrow<str> for InternString {
45 fn borrow(&self) -> &str {
46 self.0.as_str()
47 }
48}
49
50impl InternString {
51 pub fn new(s: &str) -> Self {
52 Self(Arc::new(s.to_string()))
53 }
54
55 pub fn as_str(&self) -> &str {
56 self.0.as_str()
57 }
58}
59
60impl AsRef<str> for InternString {
61 fn as_ref(&self) -> &str {
62 self.0.as_str()
63 }
64}
65
66pub struct Parser {
67 strings: HashSet<InternString>,
68 buffer: Vec<u8>,
69 current_type: MetricType,
70 histogram: Option<HistogramMetric>,
71}
72
73impl Default for Parser {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl Parser {
80 pub fn new() -> Self {
81 Parser {
82 strings: HashSet::new(),
83 buffer: vec![],
84 current_type: MetricType::Unknown,
85 histogram: None,
86 }
87 }
88
89 fn intern_string(&mut self, s: &str) -> InternString {
90 match self.strings.get(s) {
91 Some(k) => k.clone(),
92 None => {
93 let v = InternString::new(s);
94 self.strings.insert(v.clone());
95 v
96 }
97 }
98 }
99
100 fn flush_histogram<F: FnMut(Metric)>(&mut self, func: &mut F) {
101 if let Some(histogram) = self.histogram.take() {
102 (func)(Metric::Histogram(histogram));
103 }
104 }
105
106 pub fn push_bytes<F: FnMut(Metric), S: AsRef<[u8]>>(
107 &mut self,
108 data: S,
109 is_final: bool,
110 mut func: F,
111 ) -> anyhow::Result<()> {
112 let data = data.as_ref();
113
114 if !self.buffer.is_empty() {
115 if let Some(nl) = memchr::memchr(b'\n', data) {
116 self.buffer.extend_from_slice(&data[0..=nl]);
117 let buffer = std::mem::take(&mut self.buffer);
118 self.push_bytes_sol(&buffer, false, &mut func)?;
119 self.push_bytes_sol(&data[nl + 1..], is_final, &mut func)
120 } else {
121 self.buffer.extend_from_slice(data);
122 Ok(())
123 }
124 } else {
125 self.push_bytes_sol(data, is_final, &mut func)
126 }
127 }
128
129 fn push_bytes_sol<F: FnMut(Metric)>(
130 &mut self,
131 buffer: &[u8],
132 is_final: bool,
133 func: &mut F,
134 ) -> anyhow::Result<()> {
135 let mut start_of_line = 0;
136 for nl in memchr_iter(b'\n', buffer) {
137 let line = &buffer[start_of_line..nl];
138 start_of_line = nl + 1;
139 if line.is_empty() {
140 continue;
141 }
142 let line = std::str::from_utf8(line)?;
143
144 if line.starts_with("# TYPE ") {
145 self.flush_histogram(func);
146 match line.rsplit(' ').next() {
147 Some("counter") => self.current_type = MetricType::Counter,
148 Some("gauge") => self.current_type = MetricType::Gauge,
149 Some("histogram") => self.current_type = MetricType::Histogram,
150 Some(unknown) => anyhow::bail!("unknown metric type '{unknown}'"),
151 None => anyhow::bail!("invalid TYPE line '{line}'"),
152 }
153
154 continue;
155 }
156
157 if line.starts_with("#") {
158 continue;
159 }
160
161 let Some((name_info, value)) = line.rsplit_once(' ') else {
162 anyhow::bail!("invalid line {line}");
163 };
164 let value = match value.parse::<f64>() {
165 Ok(v) => v,
166 Err(err) => match value {
167 "+Inf" => f64::INFINITY,
168 "-Inf" => f64::NEG_INFINITY,
169 _ => anyhow::bail!("Error parsing value from {line}: {err:#}"),
170 },
171 };
172
173 let mut labels = Map::new();
174
175 let name = if let Some((name, rest)) = name_info.split_once('{') {
176 let Some(mut label_text) = rest.strip_suffix("}") else {
177 anyhow::bail!("invalid name in {line}");
178 };
179
180 while !label_text.is_empty() {
181 let Some((label_name, rest)) = label_text.split_once("=\"") else {
182 anyhow::bail!("invalid labels in {line}");
183 };
184
185 let Some((label_value, rest)) = rest.split_once("\"") else {
186 anyhow::bail!("invalid labels in {line}");
187 };
188
189 let rest = rest.strip_prefix(",").unwrap_or(rest);
190 let rest = rest.strip_prefix(" ").unwrap_or(rest);
191 label_text = rest;
192
193 labels.insert(
194 self.intern_string(label_name),
195 InternString::new(label_value),
202 );
203 }
204
205 self.intern_string(name)
206 } else {
207 self.intern_string(name_info)
208 };
209
210 match self.current_type {
211 MetricType::Counter => {
212 (func)(Metric::Counter(CounterMetric {
213 name,
214 labels,
215 value,
216 }));
217 }
218 MetricType::Gauge => {
219 (func)(Metric::Gauge(GaugeMetric {
220 name,
221 labels,
222 value,
223 }));
224 }
225 MetricType::Histogram => {
226 let Some(hist_name) = name
227 .strip_suffix("_bucket")
228 .or_else(|| name.strip_suffix("_count"))
229 .or_else(|| name.strip_suffix("_sum"))
230 else {
231 anyhow::bail!("unexpected histogram counter name in {line}");
232 };
233
234 let labels_less_le = {
235 let mut l = labels.clone();
236 l.remove("le");
237 l
238 };
239
240 let need_flush = self
241 .histogram
242 .as_ref()
243 .map(|hist| hist.name != hist_name || hist.labels != labels_less_le)
244 .unwrap_or(true);
245 if need_flush {
246 self.flush_histogram(func);
247 let histogram = HistogramMetric {
248 name: self.intern_string(hist_name),
249 labels: labels_less_le.clone(),
250 sum: 0.,
251 count: 0.,
252 bucket: vec![],
253 };
254 self.histogram.replace(histogram);
255 }
256
257 let Some(hist) = self.histogram.as_mut() else {
258 anyhow::bail!("histogram isn't set? impossible!");
259 };
260
261 if name.ends_with("_bucket") {
262 let Some(le) = labels.get("le").and_then(|le| le.parse::<f64>().ok())
263 else {
264 anyhow::bail!("failed to parse le as float in {line}");
265 };
266 hist.bucket.push((le, value));
267 } else if name.ends_with("_count") {
268 hist.count = value;
269 } else if name.ends_with("_sum") {
270 hist.sum = value;
271 } else {
272 anyhow::bail!("unexpected histogram case {line}");
273 }
274 }
275 MetricType::Unknown => {
276 anyhow::bail!("unknown metric type for {name} {value}");
277 }
278 }
279 }
280 let remainder = &buffer[start_of_line..];
281 if remainder.is_empty() {
282 self.buffer.clear();
283 } else {
284 self.buffer = remainder.to_vec();
285 }
286
287 if is_final {
288 self.flush_histogram(func);
289 }
290
291 if is_final && !self.buffer.is_empty() {
292 anyhow::bail!(
293 "final chunk received and we still have buffered data:\n{}",
294 String::from_utf8_lossy(&self.buffer)
295 );
296 }
297
298 Ok(())
299 }
300
301 pub fn parse<S: AsRef<[u8]>>(&mut self, data: S) -> anyhow::Result<Vec<Metric>> {
302 let mut metrics = vec![];
303 self.push_bytes(data, true, |metric| metrics.push(metric))?;
304 Ok(metrics)
305 }
306}
307
308#[derive(Debug, PartialEq)]
309pub enum Metric {
310 Counter(CounterMetric),
311 Gauge(GaugeMetric),
312 Histogram(HistogramMetric),
313}
314
315impl Metric {
316 pub fn name(&self) -> &InternString {
317 match self {
318 Self::Counter(c) => &c.name,
319 Self::Gauge(g) => &g.name,
320 Self::Histogram(h) => &h.name,
321 }
322 }
323
324 pub fn label_is(&self, key: &str, value: &str) -> bool {
325 self.labels()
326 .get(key)
327 .map(|v| v.as_str() == value)
328 .unwrap_or(false)
329 }
330
331 pub fn labels(&self) -> &Map<InternString, InternString> {
332 match self {
333 Self::Counter(c) => &c.labels,
334 Self::Gauge(g) => &g.labels,
335 Self::Histogram(h) => &h.labels,
336 }
337 }
338
339 pub fn value(&self) -> f64 {
340 match self {
341 Self::Counter(c) => c.value,
342 Self::Gauge(g) => g.value,
343 Self::Histogram(h) => h.sum / h.count,
344 }
345 }
346
347 pub fn key(&self) -> Vec<InternString> {
348 let mut key = vec![self.name().clone()];
349 for (k, v) in self.labels().iter() {
350 key.push(k.clone());
351 key.push(v.clone());
352 }
353 key
354 }
355
356 pub fn is_histogram(&self) -> bool {
357 matches!(self, Self::Histogram(_))
358 }
359
360 pub fn as_histogram(&self) -> &HistogramMetric {
361 match self {
362 Self::Histogram(h) => h,
363 _ => panic!("as_histogram called on non-histogram"),
364 }
365 }
366}
367
368#[derive(Debug, PartialEq)]
369pub struct CounterMetric {
370 pub name: InternString,
371 pub labels: Map<InternString, InternString>,
372 pub value: f64,
373}
374
375#[derive(Debug, PartialEq)]
376pub struct GaugeMetric {
377 pub name: InternString,
378 pub labels: Map<InternString, InternString>,
379 pub value: f64,
380}
381
382#[derive(Debug, PartialEq)]
383pub struct HistogramMetric {
384 pub name: InternString,
385 pub labels: Map<InternString, InternString>,
386 pub sum: f64,
387 pub count: f64,
388 pub bucket: Vec<(f64, f64)>,
389}
390
391impl HistogramMetric {
392 pub fn quantile(&self, q: f64) -> f64 {
400 if q < 0.0 {
401 return f64::NEG_INFINITY;
402 }
403 if q > 1.0 {
404 return f64::INFINITY;
405 }
406
407 if self.count == 0.0 || q.is_nan() {
408 return f64::NAN;
409 }
410
411 #[derive(Debug, Clone, Copy, Default)]
412 struct Bucket {
413 lower_bound: f64,
414 upper_bound: f64,
415 count: f64,
416 }
417
418 let mut buckets = vec![];
419
420 let mut lower_bound = 0.0;
421 for &(upper_bound, cumulative_count) in &self.bucket {
422 buckets.push(Bucket {
423 lower_bound,
424 upper_bound,
425 count: cumulative_count,
426 });
427 lower_bound = upper_bound;
428 }
429
430 {
434 let mut iter = buckets.iter_mut().rev().peekable();
435 while let Some(b) = iter.next() {
436 if let Some(prev) = iter.peek() {
437 b.count -= prev.count;
438 }
439 }
440 }
441
442 fn bucket_iter<'a>(
443 buckets: &'a [Bucket],
444 forward: bool,
445 ) -> Box<dyn Iterator<Item = &'a Bucket> + 'a> {
446 if forward {
447 Box::new(buckets.iter())
448 } else {
449 Box::new(buckets.iter().rev())
450 }
451 }
452
453 let forwards = self.sum.is_nan() || q < 0.5;
454
455 let (mut rank, iter) = if forwards {
456 (q * self.count, bucket_iter(&buckets, true))
457 } else {
458 ((1.0 - q) * self.count, bucket_iter(&buckets, false))
459 };
460
461 let mut count = 0.0;
462 let mut bucket = None;
463 for b in iter {
464 bucket.replace(b);
465 if b.count == 0.0 {
466 continue;
467 }
468 count += b.count;
469 if count >= rank {
470 break;
471 }
472 }
473
474 let Some(bucket) = bucket else {
475 return f64::NEG_INFINITY;
476 };
477
478 count = count.min(self.count);
479 if count < rank {
480 return bucket.upper_bound;
481 }
482 if forwards {
483 rank -= count - bucket.count;
484 } else {
485 rank = count - rank;
486 }
487
488 bucket.lower_bound + (bucket.upper_bound - bucket.lower_bound) * (rank / bucket.count)
489 }
490}
491
492#[cfg(test)]
493mod test {
494 use super::*;
495
496 #[test]
523 fn parse_counter() {
524 let sample = r#"# HELP tokio_total_overflow_count The number of times worker threads saturated their local queues.
525# TYPE tokio_total_overflow_count counter
526tokio_total_overflow_count 0
527"#;
528
529 let mut parser = Parser::new();
530 let metrics = parser.parse(sample).unwrap();
531 assert_eq!(
532 metrics,
533 vec![Metric::Counter(CounterMetric {
534 name: InternString::new("tokio_total_overflow_count"),
535 labels: Map::new(),
536 value: 0.0
537 })]
538 );
539 }
540
541 #[test]
542 fn parse_gauge() {
543 let sample = r#"# HELP lua_count the number of lua contexts currently alive
544# TYPE lua_count gauge
545lua_count 1
546"#;
547
548 let mut parser = Parser::new();
549 let metrics = parser.parse(sample).unwrap();
550 assert_eq!(
551 metrics,
552 vec![Metric::Gauge(GaugeMetric {
553 name: InternString::new("lua_count"),
554 labels: Map::new(),
555 value: 1.0
556 })]
557 );
558 }
559
560 #[test]
561 fn parse_histogram() {
562 let sample = r#"# HELP deliver_message_latency_rollup how long a deliver_message call takes for a given protocol
563# TYPE deliver_message_latency_rollup histogram
564deliver_message_latency_rollup_bucket{service="smtp_client",le="0.005"} 0
565deliver_message_latency_rollup_bucket{service="smtp_client",le="0.01"} 0
566deliver_message_latency_rollup_bucket{service="smtp_client",le="0.025"} 0
567deliver_message_latency_rollup_bucket{service="smtp_client",le="0.05"} 0
568deliver_message_latency_rollup_bucket{service="smtp_client",le="0.1"} 0
569deliver_message_latency_rollup_bucket{service="smtp_client",le="0.25"} 0
570deliver_message_latency_rollup_bucket{service="smtp_client",le="0.5"} 0
571deliver_message_latency_rollup_bucket{service="smtp_client",le="1"} 0
572deliver_message_latency_rollup_bucket{service="smtp_client",le="2.5"} 0
573deliver_message_latency_rollup_bucket{service="smtp_client",le="5"} 0
574deliver_message_latency_rollup_bucket{service="smtp_client",le="10"} 0
575deliver_message_latency_rollup_bucket{service="smtp_client",le="+Inf"} 0
576deliver_message_latency_rollup_sum{service="smtp_client"} 0
577deliver_message_latency_rollup_count{service="smtp_client"} 0
578# HELP lua_event_latency how long a given lua event callback took
579# TYPE lua_event_latency histogram
580lua_event_latency_bucket{event="context-creation",le="0.005"} 5226
581lua_event_latency_bucket{event="context-creation",le="0.01"} 5226
582lua_event_latency_bucket{event="context-creation",le="0.025"} 5226
583lua_event_latency_bucket{event="context-creation",le="0.05"} 5226
584lua_event_latency_bucket{event="context-creation",le="0.1"} 5226
585lua_event_latency_bucket{event="context-creation",le="0.25"} 5226
586lua_event_latency_bucket{event="context-creation",le="0.5"} 5226
587lua_event_latency_bucket{event="context-creation",le="1"} 5226
588lua_event_latency_bucket{event="context-creation",le="2.5"} 5226
589lua_event_latency_bucket{event="context-creation",le="5"} 5226
590lua_event_latency_bucket{event="context-creation",le="10"} 5226
591lua_event_latency_bucket{event="context-creation",le="+Inf"} 5226
592lua_event_latency_sum{event="context-creation"} 7.057928427000033
593lua_event_latency_count{event="context-creation"} 5226
594lua_event_latency_bucket{event="get_egress_path_config",le="0.005"} 10
595lua_event_latency_bucket{event="get_egress_path_config",le="0.01"} 10
596lua_event_latency_bucket{event="get_egress_path_config",le="0.025"} 10
597lua_event_latency_bucket{event="get_egress_path_config",le="0.05"} 10
598lua_event_latency_bucket{event="get_egress_path_config",le="0.1"} 10
599lua_event_latency_bucket{event="get_egress_path_config",le="0.25"} 10
600lua_event_latency_bucket{event="get_egress_path_config",le="0.5"} 10
601lua_event_latency_bucket{event="get_egress_path_config",le="1"} 10
602lua_event_latency_bucket{event="get_egress_path_config",le="2.5"} 10
603lua_event_latency_bucket{event="get_egress_path_config",le="5"} 10
604lua_event_latency_bucket{event="get_egress_path_config",le="10"} 10
605lua_event_latency_bucket{event="get_egress_path_config",le="+Inf"} 10
606lua_event_latency_sum{event="get_egress_path_config"} 0.000493053
607lua_event_latency_count{event="get_egress_path_config"} 10
608"#;
609 let mut parser = Parser::new();
610 let metrics = parser.parse(sample).unwrap();
611 assert_eq!(
612 metrics,
613 vec![
614 Metric::Histogram(HistogramMetric {
615 name: InternString::new("deliver_message_latency_rollup"),
616 labels: [(
617 InternString::new("service"),
618 InternString::new("smtp_client")
619 )]
620 .into_iter()
621 .collect(),
622 sum: 0.0,
623 count: 0.0,
624 bucket: vec![
625 (0.005, 0.0),
626 (0.01, 0.0),
627 (0.025, 0.0),
628 (0.05, 0.0),
629 (0.1, 0.0),
630 (0.25, 0.0),
631 (0.5, 0.0),
632 (1.0, 0.0),
633 (2.5, 0.0),
634 (5.0, 0.0),
635 (10.0, 0.0),
636 (f64::INFINITY, 0.0)
637 ]
638 }),
639 Metric::Histogram(HistogramMetric {
640 name: InternString::new("lua_event_latency"),
641 labels: [(
642 InternString::new("event"),
643 InternString::new("context-creation")
644 )]
645 .into_iter()
646 .collect(),
647 sum: 7.057928427000033,
648 count: 5226.0,
649 bucket: vec![
650 (0.005, 5226.0),
651 (0.01, 5226.0),
652 (0.025, 5226.0),
653 (0.05, 5226.0),
654 (0.1, 5226.0),
655 (0.25, 5226.0),
656 (0.5, 5226.0),
657 (1.0, 5226.0),
658 (2.5, 5226.0),
659 (5.0, 5226.0),
660 (10.0, 5226.0),
661 (f64::INFINITY, 5226.0)
662 ],
663 }),
664 Metric::Histogram(HistogramMetric {
665 name: InternString::new("lua_event_latency"),
666 labels: [(
667 InternString::new("event"),
668 InternString::new("get_egress_path_config")
669 )]
670 .into_iter()
671 .collect(),
672 sum: 0.000493053,
673 count: 10.0,
674 bucket: vec![
675 (0.005, 10.0),
676 (0.01, 10.0),
677 (0.025, 10.0),
678 (0.05, 10.0),
679 (0.1, 10.0),
680 (0.25, 10.0),
681 (0.5, 10.0),
682 (1.0, 10.0),
683 (2.5, 10.0),
684 (5.0, 10.0),
685 (10.0, 10.0),
686 (f64::INFINITY, 10.0)
687 ],
688 })
689 ]
690 );
691 }
692
693 #[test]
694 fn parse_label_gauge() {
695 let sample = r#"# HELP disk_free_bytes number of available bytes in a monitored location
696# TYPE disk_free_bytes gauge
697disk_free_bytes{name="data spool"} 1540683988992
698disk_free_bytes{name="log dir /var/tmp/kumo-logs"} 1540683988992
699disk_free_bytes{name="meta spool"} 1540683988992
700"#;
701 let mut parser = Parser::new();
702 let metrics = parser.parse(sample).unwrap();
703 assert_eq!(
704 metrics,
705 vec![
706 Metric::Gauge(GaugeMetric {
707 name: InternString::new("disk_free_bytes"),
708 labels: [(InternString::new("name"), InternString::new("data spool"))]
709 .into_iter()
710 .collect(),
711 value: 1540683988992.0
712 }),
713 Metric::Gauge(GaugeMetric {
714 name: InternString::new("disk_free_bytes"),
715 labels: [(
716 InternString::new("name"),
717 InternString::new("log dir /var/tmp/kumo-logs")
718 )]
719 .into_iter()
720 .collect(),
721 value: 1540683988992.0
722 }),
723 Metric::Gauge(GaugeMetric {
724 name: InternString::new("disk_free_bytes"),
725 labels: [(InternString::new("name"), InternString::new("meta spool"))]
726 .into_iter()
727 .collect(),
728 value: 1540683988992.0
729 }),
730 ]
731 );
732 }
733
734 #[test]
735 fn histogram_quantile() {
736 let entry = HistogramMetric {
737 name: InternString::new("boop"),
738 labels: Map::new(),
739 bucket: vec![
740 (0.005, 148571.),
741 (0.01, 149185.),
742 (0.025, 201435.),
743 (0.05, 505005.),
744 (0.1, 611944.),
745 (0.25, 643205.),
746 (0.5, 643876.),
747 (1., 645492.),
748 (2.5, 646039.),
749 (5., 646039.),
750 (10., 646039.),
751 ],
752 count: 646039.0,
753 sum: 25087.76664952455,
754 };
755
756 assert_eq!(entry.quantile(1.0), 2.5);
757 assert_eq!(entry.quantile(0.99), 0.23259945299254658);
758 assert_eq!(entry.quantile(0.95), 0.10860361152874175);
759 assert_eq!(entry.quantile(0.9), 0.08573537250208063);
760 assert_eq!(entry.quantile(0.75), 0.04831375382942979);
761 assert_eq!(entry.quantile(0.5), 0.03501288829594493);
762 }
763}