1use tokio::time::{Duration, Instant};
2
3pub struct CounterSeriesConfig {
4 pub num_buckets: u8,
6 pub bucket_size: u64,
9}
10
11pub struct CounterSeries {
34 buckets: Vec<u64>,
36 bucket_size: u64,
39 curr_bucket: u8,
41 updated: Instant,
43}
44
45impl CounterSeries {
46 pub fn with_config(config: CounterSeriesConfig) -> Self {
49 Self::with_initial_value(config, 0)
50 }
51
52 pub fn with_initial_value(config: CounterSeriesConfig, value: u64) -> Self {
56 let mut buckets = vec![0u64; config.num_buckets as usize];
57
58 buckets[0] = value;
59
60 Self {
61 buckets,
62 bucket_size: config.bucket_size,
63 curr_bucket: 0,
64 updated: Instant::now(),
65 }
66 }
67
68 fn rotate_and_get_current_bucket(&mut self) -> usize {
75 let num_buckets = self.buckets.len() as u64;
76 let elapsed_seconds = self.updated.elapsed().as_secs();
77 let elapsed_slots = elapsed_seconds / self.bucket_size;
78
79 if elapsed_slots > 0 {
80 let num_prune = elapsed_slots.min(num_buckets) as isize;
81 self.curr_bucket = ((elapsed_slots + self.curr_bucket as u64) % num_buckets) as u8;
82 self.updated = Instant::now();
84
85 for prune in 0..num_prune {
86 let mut idx = (self.curr_bucket as isize) - prune;
87 if idx < 0 {
88 idx += num_buckets as isize;
89 }
90 self.buckets[idx as usize] = 0;
91 }
92 }
93
94 self.curr_bucket as usize
95 }
96
97 pub fn increment(&mut self, to_add: u64) {
100 let idx = self.rotate_and_get_current_bucket();
101 self.buckets[idx] = self.buckets[idx].saturating_add(to_add);
102 }
103
104 pub fn delta(&mut self, delta: i64) {
106 let idx = self.rotate_and_get_current_bucket();
107 if delta > 0 {
108 self.buckets[idx] = self.buckets[idx].saturating_add(delta as u64);
109 } else {
110 self.buckets[idx] = self.buckets[idx].saturating_sub((-delta) as u64);
111 }
112 }
113
114 pub fn observe(&mut self, current_value: u64) {
116 let idx = self.rotate_and_get_current_bucket();
117 self.buckets[idx] = current_value;
118 }
119
120 pub fn sum(&mut self) -> u64 {
122 let _idx = self.rotate_and_get_current_bucket();
123 self.buckets.iter().sum()
124 }
125
126 pub fn sum_over(&mut self, duration: Duration) -> u64 {
130 let idx = self.rotate_and_get_current_bucket() as isize;
131 let buckets_to_sum = (duration.as_secs().div_ceil(self.bucket_size))
132 .min(self.buckets.len() as u64)
133 .max(1) as isize;
134
135 let mut result = 0;
136 for i in 0..buckets_to_sum {
137 let mut i = idx - i;
138 if i < 0 {
139 i += self.buckets.len() as isize;
140 }
141 result += self.buckets[i as usize];
142 }
143
144 result
145 }
146}
147
148#[cfg(test)]
149mod test {
150 use super::*;
151
152 #[derive(Debug, PartialEq)]
153 #[allow(dead_code)] struct Delta<'a> {
155 buckets: &'a [u64],
156 curr: u8,
157 elapsed: Duration,
158 }
159
160 fn delta(series: &CounterSeries) -> Delta<'_> {
161 Delta {
162 buckets: &series.buckets,
163 curr: series.curr_bucket,
164 elapsed: series.updated.elapsed(),
165 }
166 }
167
168 #[tokio::test]
169 async fn test_delta_observe() {
170 let mut series = CounterSeries::with_config(CounterSeriesConfig {
171 num_buckets: 5,
172 bucket_size: 2,
173 });
174
175 series.delta(3);
176 series.delta(-2);
177 k9::assert_equal!(series.sum(), 1);
178 series.observe(42);
179 k9::assert_equal!(series.sum(), 42);
180 }
181
182 #[tokio::test]
183 async fn test_rotation() {
184 tokio::time::pause();
185
186 let mut series = CounterSeries::with_config(CounterSeriesConfig {
187 num_buckets: 5,
188 bucket_size: 2,
189 });
190
191 k9::assert_equal!(
192 delta(&series),
193 Delta {
194 buckets: &[0, 0, 0, 0, 0],
195 curr: 0,
196 elapsed: Duration::ZERO
197 }
198 );
199
200 k9::assert_equal!(series.sum(), 0);
201
202 series.increment(1);
203
204 k9::assert_equal!(
205 delta(&series),
206 Delta {
207 buckets: &[1, 0, 0, 0, 0],
208 curr: 0,
209 elapsed: Duration::ZERO
210 }
211 );
212 k9::assert_equal!(series.sum(), 1);
213
214 tokio::time::advance(Duration::from_secs(1)).await;
215
216 series.increment(1);
217
218 k9::assert_equal!(
219 delta(&series),
220 Delta {
221 buckets: &[2, 0, 0, 0, 0],
222 curr: 0,
223 elapsed: Duration::from_secs(1)
224 }
225 );
226 k9::assert_equal!(series.sum(), 2);
227
228 tokio::time::advance(Duration::from_secs(1)).await;
229 series.increment(1);
230
231 k9::assert_equal!(
232 delta(&series),
233 Delta {
234 buckets: &[2, 1, 0, 0, 0],
235 curr: 1,
236 elapsed: Duration::ZERO
237 }
238 );
239 k9::assert_equal!(series.sum(), 3);
240
241 tokio::time::advance(Duration::from_secs(2)).await;
242 series.increment(3);
243
244 k9::assert_equal!(
245 delta(&series),
246 Delta {
247 buckets: &[2, 1, 3, 0, 0],
248 curr: 2,
249 elapsed: Duration::ZERO
250 }
251 );
252 k9::assert_equal!(series.sum(), 6);
253
254 tokio::time::advance(Duration::from_secs(2)).await;
255 series.increment(4);
256
257 k9::assert_equal!(
258 delta(&series),
259 Delta {
260 buckets: &[2, 1, 3, 4, 0],
261 curr: 3,
262 elapsed: Duration::ZERO
263 }
264 );
265 k9::assert_equal!(series.sum(), 10);
266
267 tokio::time::advance(Duration::from_secs(2)).await;
268 series.increment(5);
269
270 k9::assert_equal!(
271 delta(&series),
272 Delta {
273 buckets: &[2, 1, 3, 4, 5],
274 curr: 4,
275 elapsed: Duration::ZERO
276 }
277 );
278 k9::assert_equal!(series.sum(), 15);
279
280 tokio::time::advance(Duration::from_secs(2)).await;
281 series.increment(6);
282
283 k9::assert_equal!(
284 delta(&series),
285 Delta {
286 buckets: &[6, 1, 3, 4, 5],
287 curr: 0,
288 elapsed: Duration::ZERO
289 }
290 );
291 k9::assert_equal!(series.sum(), 19);
292
293 tokio::time::advance(Duration::from_secs(4)).await;
295 series.increment(7);
296
297 k9::assert_equal!(
298 delta(&series),
299 Delta {
300 buckets: &[6, 0, 7, 4, 5],
301 curr: 2,
302 elapsed: Duration::ZERO
303 }
304 );
305 k9::assert_equal!(series.sum(), 22);
306 k9::assert_equal!(series.sum_over(Duration::ZERO), 7);
307 k9::assert_equal!(series.sum_over(Duration::from_secs(1)), 7);
308 k9::assert_equal!(series.sum_over(Duration::from_secs(2)), 7);
309 k9::assert_equal!(series.sum_over(Duration::from_secs(3)), 7);
310 k9::assert_equal!(series.sum_over(Duration::from_secs(4)), 7);
311 k9::assert_equal!(series.sum_over(Duration::from_secs(5)), 13);
312 k9::assert_equal!(series.sum_over(Duration::from_secs(6)), 13);
313 k9::assert_equal!(series.sum_over(Duration::from_secs(7)), 18);
314 k9::assert_equal!(series.sum_over(Duration::from_secs(8)), 18);
315 k9::assert_equal!(series.sum_over(Duration::from_secs(9)), 22);
316 k9::assert_equal!(series.sum_over(Duration::from_secs(10)), 22);
317 k9::assert_equal!(series.sum_over(Duration::from_secs(60)), 22);
318
319 tokio::time::advance(Duration::from_secs(12)).await;
321 series.increment(8);
322
323 k9::assert_equal!(
324 delta(&series),
325 Delta {
326 buckets: &[0, 0, 0, 8, 0],
327 curr: 3,
328 elapsed: Duration::ZERO
329 }
330 );
331 k9::assert_equal!(series.sum(), 8);
332
333 for i in 1..=4 {
334 tokio::time::advance(Duration::from_secs(2)).await;
335 series.increment(i);
336 }
337
338 k9::assert_equal!(
339 delta(&series),
340 Delta {
341 buckets: &[2, 3, 4, 8, 1],
342 curr: 2,
343 elapsed: Duration::ZERO
344 }
345 );
346 k9::assert_equal!(series.sum(), 18);
347
348 tokio::time::advance(Duration::from_secs(60)).await;
349 series.observe(0);
350
351 k9::assert_equal!(
352 delta(&series),
353 Delta {
354 buckets: &[0, 0, 0, 0, 0],
355 curr: 2,
356 elapsed: Duration::ZERO
357 }
358 );
359 k9::assert_equal!(series.sum(), 0);
360 }
361}