kumo_counter_series/
lib.rs

1use tokio::time::{Duration, Instant};
2
3pub struct CounterSeriesConfig {
4    /// How many buckets should be maintained.
5    pub num_buckets: u8,
6    /// How long a time interval each bucket represents,
7    /// expressed in seconds.
8    pub bucket_size: u64,
9}
10
11/// CounterSeries implements a time series stored in a
12/// sequence of a fixed number of buckets each with a fixed
13/// (and equal) duration.
14///
15/// The buckets are implemented as a ring buffer held in memory.
16/// The counter can be incremented or updated to a new value,
17/// but only for the bucket representing the current point
18/// in time.
19///
20/// As time elapses, the current bucket changes based on
21/// the bucket duration, with older buckets being zeroed
22/// out.  No background maintenance tasks are required
23/// to manage this rotation, as the counter series maintains
24/// book keeping to fixup the structure prior to accessing
25/// the buckets.
26///
27/// The value tracked in each bucket is a u64, meaning
28/// that we cannot track negative numbers.  If you try
29/// to delta outside the valid range, the resulting
30/// value is saturated to the bounds of a u64; it will
31/// never be less than zero and never wrap around due
32/// to overflow.
33pub struct CounterSeries {
34    /// The time series data itself
35    buckets: Vec<u64>,
36    /// How long a time interval each bucket represents,
37    /// expressed in seconds
38    bucket_size: u64,
39    /// Which slot corresponds to the current time interval
40    curr_bucket: u8,
41    /// When we last changed curr_bucket
42    updated: Instant,
43}
44
45impl CounterSeries {
46    /// Create a new instance. All buckets will be initialized
47    /// to zero.
48    pub fn with_config(config: CounterSeriesConfig) -> Self {
49        Self::with_initial_value(config, 0)
50    }
51
52    /// Create a new instance with a pre-set initial value.
53    /// Useful when setting up the initial state for observation
54    /// based tracking
55    pub fn with_initial_value(config: CounterSeriesConfig, value: u64) -> Self {
56        let mut buckets = vec![0u64; config.num_buckets as usize];
57
58        buckets[0] = value;
59
60        Self {
61            buckets,
62            bucket_size: config.bucket_size,
63            curr_bucket: 0,
64            updated: Instant::now(),
65        }
66    }
67
68    /// Manage aging out of older bucket values as time elapses.
69    /// The strategy here is: figure out how many bucket slots
70    /// we need to advance since the prior operation and zero them
71    /// out.  We clip that count to the number of buckets so that
72    /// we don't do excess iterations if it has been a very long time
73    /// since we last touched this structure.
74    fn rotate_and_get_current_bucket(&mut self) -> usize {
75        let num_buckets = self.buckets.len() as u64;
76        let elapsed_seconds = self.updated.elapsed().as_secs();
77        let elapsed_slots = elapsed_seconds / self.bucket_size;
78
79        if elapsed_slots > 0 {
80            let num_prune = elapsed_slots.min(num_buckets) as isize;
81            self.curr_bucket = ((elapsed_slots + self.curr_bucket as u64) % num_buckets) as u8;
82            // we updated curr_bucket, so revise the updated time
83            self.updated = Instant::now();
84
85            for prune in 0..num_prune {
86                let mut idx = (self.curr_bucket as isize) - prune;
87                if idx < 0 {
88                    idx += num_buckets as isize;
89                }
90                self.buckets[idx as usize] = 0;
91            }
92        }
93
94        self.curr_bucket as usize
95    }
96
97    /// Increment the counter for the current time window by
98    /// the specified value.
99    pub fn increment(&mut self, to_add: u64) {
100        let idx = self.rotate_and_get_current_bucket();
101        self.buckets[idx] = self.buckets[idx].saturating_add(to_add);
102    }
103
104    /// Adjust the counter for the current time window by the specified value
105    pub fn delta(&mut self, delta: i64) {
106        let idx = self.rotate_and_get_current_bucket();
107        if delta > 0 {
108            self.buckets[idx] = self.buckets[idx].saturating_add(delta as u64);
109        } else {
110            self.buckets[idx] = self.buckets[idx].saturating_sub((-delta) as u64);
111        }
112    }
113
114    /// Record an observation; assigns current_value to the current bucket
115    pub fn observe(&mut self, current_value: u64) {
116        let idx = self.rotate_and_get_current_bucket();
117        self.buckets[idx] = current_value;
118    }
119
120    /// Returns the total tracked over the entire series duration
121    pub fn sum(&mut self) -> u64 {
122        let _idx = self.rotate_and_get_current_bucket();
123        self.buckets.iter().sum()
124    }
125
126    /// Returns the total tracked over a specific time duration.
127    /// Rounds up to the next bucket for spans smaller than
128    /// the bucket size.
129    pub fn sum_over(&mut self, duration: Duration) -> u64 {
130        let idx = self.rotate_and_get_current_bucket() as isize;
131        let buckets_to_sum = (duration.as_secs().div_ceil(self.bucket_size))
132            .min(self.buckets.len() as u64)
133            .max(1) as isize;
134
135        let mut result = 0;
136        for i in 0..buckets_to_sum {
137            let mut i = idx - i;
138            if i < 0 {
139                i += self.buckets.len() as isize;
140            }
141            result += self.buckets[i as usize];
142        }
143
144        result
145    }
146}
147
148#[cfg(test)]
149mod test {
150    use super::*;
151
152    #[derive(Debug, PartialEq)]
153    #[allow(dead_code)] // we inspect via Debug, so it is not dead!
154    struct Delta<'a> {
155        buckets: &'a [u64],
156        curr: u8,
157        elapsed: Duration,
158    }
159
160    fn delta(series: &CounterSeries) -> Delta<'_> {
161        Delta {
162            buckets: &series.buckets,
163            curr: series.curr_bucket,
164            elapsed: series.updated.elapsed(),
165        }
166    }
167
168    #[tokio::test]
169    async fn test_delta_observe() {
170        let mut series = CounterSeries::with_config(CounterSeriesConfig {
171            num_buckets: 5,
172            bucket_size: 2,
173        });
174
175        series.delta(3);
176        series.delta(-2);
177        k9::assert_equal!(series.sum(), 1);
178        series.observe(42);
179        k9::assert_equal!(series.sum(), 42);
180    }
181
182    #[tokio::test]
183    async fn test_rotation() {
184        tokio::time::pause();
185
186        let mut series = CounterSeries::with_config(CounterSeriesConfig {
187            num_buckets: 5,
188            bucket_size: 2,
189        });
190
191        k9::assert_equal!(
192            delta(&series),
193            Delta {
194                buckets: &[0, 0, 0, 0, 0],
195                curr: 0,
196                elapsed: Duration::ZERO
197            }
198        );
199
200        k9::assert_equal!(series.sum(), 0);
201
202        series.increment(1);
203
204        k9::assert_equal!(
205            delta(&series),
206            Delta {
207                buckets: &[1, 0, 0, 0, 0],
208                curr: 0,
209                elapsed: Duration::ZERO
210            }
211        );
212        k9::assert_equal!(series.sum(), 1);
213
214        tokio::time::advance(Duration::from_secs(1)).await;
215
216        series.increment(1);
217
218        k9::assert_equal!(
219            delta(&series),
220            Delta {
221                buckets: &[2, 0, 0, 0, 0],
222                curr: 0,
223                elapsed: Duration::from_secs(1)
224            }
225        );
226        k9::assert_equal!(series.sum(), 2);
227
228        tokio::time::advance(Duration::from_secs(1)).await;
229        series.increment(1);
230
231        k9::assert_equal!(
232            delta(&series),
233            Delta {
234                buckets: &[2, 1, 0, 0, 0],
235                curr: 1,
236                elapsed: Duration::ZERO
237            }
238        );
239        k9::assert_equal!(series.sum(), 3);
240
241        tokio::time::advance(Duration::from_secs(2)).await;
242        series.increment(3);
243
244        k9::assert_equal!(
245            delta(&series),
246            Delta {
247                buckets: &[2, 1, 3, 0, 0],
248                curr: 2,
249                elapsed: Duration::ZERO
250            }
251        );
252        k9::assert_equal!(series.sum(), 6);
253
254        tokio::time::advance(Duration::from_secs(2)).await;
255        series.increment(4);
256
257        k9::assert_equal!(
258            delta(&series),
259            Delta {
260                buckets: &[2, 1, 3, 4, 0],
261                curr: 3,
262                elapsed: Duration::ZERO
263            }
264        );
265        k9::assert_equal!(series.sum(), 10);
266
267        tokio::time::advance(Duration::from_secs(2)).await;
268        series.increment(5);
269
270        k9::assert_equal!(
271            delta(&series),
272            Delta {
273                buckets: &[2, 1, 3, 4, 5],
274                curr: 4,
275                elapsed: Duration::ZERO
276            }
277        );
278        k9::assert_equal!(series.sum(), 15);
279
280        tokio::time::advance(Duration::from_secs(2)).await;
281        series.increment(6);
282
283        k9::assert_equal!(
284            delta(&series),
285            Delta {
286                buckets: &[6, 1, 3, 4, 5],
287                curr: 0,
288                elapsed: Duration::ZERO
289            }
290        );
291        k9::assert_equal!(series.sum(), 19);
292
293        // Now skip a slot
294        tokio::time::advance(Duration::from_secs(4)).await;
295        series.increment(7);
296
297        k9::assert_equal!(
298            delta(&series),
299            Delta {
300                buckets: &[6, 0, 7, 4, 5],
301                curr: 2,
302                elapsed: Duration::ZERO
303            }
304        );
305        k9::assert_equal!(series.sum(), 22);
306        k9::assert_equal!(series.sum_over(Duration::ZERO), 7);
307        k9::assert_equal!(series.sum_over(Duration::from_secs(1)), 7);
308        k9::assert_equal!(series.sum_over(Duration::from_secs(2)), 7);
309        k9::assert_equal!(series.sum_over(Duration::from_secs(3)), 7);
310        k9::assert_equal!(series.sum_over(Duration::from_secs(4)), 7);
311        k9::assert_equal!(series.sum_over(Duration::from_secs(5)), 13);
312        k9::assert_equal!(series.sum_over(Duration::from_secs(6)), 13);
313        k9::assert_equal!(series.sum_over(Duration::from_secs(7)), 18);
314        k9::assert_equal!(series.sum_over(Duration::from_secs(8)), 18);
315        k9::assert_equal!(series.sum_over(Duration::from_secs(9)), 22);
316        k9::assert_equal!(series.sum_over(Duration::from_secs(10)), 22);
317        k9::assert_equal!(series.sum_over(Duration::from_secs(60)), 22);
318
319        // Now skip 6 slots
320        tokio::time::advance(Duration::from_secs(12)).await;
321        series.increment(8);
322
323        k9::assert_equal!(
324            delta(&series),
325            Delta {
326                buckets: &[0, 0, 0, 8, 0],
327                curr: 3,
328                elapsed: Duration::ZERO
329            }
330        );
331        k9::assert_equal!(series.sum(), 8);
332
333        for i in 1..=4 {
334            tokio::time::advance(Duration::from_secs(2)).await;
335            series.increment(i);
336        }
337
338        k9::assert_equal!(
339            delta(&series),
340            Delta {
341                buckets: &[2, 3, 4, 8, 1],
342                curr: 2,
343                elapsed: Duration::ZERO
344            }
345        );
346        k9::assert_equal!(series.sum(), 18);
347
348        tokio::time::advance(Duration::from_secs(60)).await;
349        series.observe(0);
350
351        k9::assert_equal!(
352            delta(&series),
353            Delta {
354                buckets: &[0, 0, 0, 0, 0],
355                curr: 2,
356                elapsed: Duration::ZERO
357            }
358        );
359        k9::assert_equal!(series.sum(), 0);
360    }
361}