timeq/
lib.rs

1use hierarchical_hash_wheel_timer::wheels::quad_wheel::no_prune;
2use hierarchical_hash_wheel_timer::wheels::Skip;
3pub use hierarchical_hash_wheel_timer::TimerError;
4use kumo_prometheus::declare_metric;
5use std::time::{Duration, Instant};
6
7pub use hierarchical_hash_wheel_timer::wheels::quad_wheel::QuadWheelWithOverflow;
8pub use hierarchical_hash_wheel_timer::wheels::TimerEntryWithDelay;
9
10/// A TimeQ is a queue datastructure where the contained items are time
11/// ordered.
12/// The underlying storage is a hashed hierarchical timer wheel, which
13/// allows for relatively cheap insertion and popping of ready items.
14/// It is also possible to cancel an entry given its id.
15pub struct TimeQ<EntryType: TimerEntryWithDelay> {
16    wheel: QuadWheelWithOverflow<EntryType>,
17    last_check: Instant,
18    len: usize,
19}
20
21declare_metric! {
22/// The amount of time that passes between calls to `TimeQ::pop`.
23///
24/// This metric is not generally interesting and does not typically
25/// need to be charted in a dashboard.
26static POP_INTERVAL: Histogram(
27        "timeq_pop_interval",
28        // Since we generally start at 3s, let's use a custom set
29        // of buckets that will let us catch more "dynamic range"
30        // in longer intervals
31        vec![3.0, 4.0, 5.0, 8.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0]
32);
33}
34
35#[must_use]
36pub enum PopResult<EntryType> {
37    /// These items are ready for immediate action
38    Items(Vec<EntryType>),
39    /// No items will be ready for the specified duration
40    Sleep(Duration),
41    /// The queue is empty
42    Empty,
43}
44
45impl<EntryType: TimerEntryWithDelay> Default for TimeQ<EntryType> {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl<EntryType: TimerEntryWithDelay> TimeQ<EntryType> {
52    pub fn new() -> Self {
53        Self {
54            wheel: QuadWheelWithOverflow::new(no_prune),
55            last_check: Instant::now(),
56            len: 0,
57        }
58    }
59
60    /// Returns true if the wheel is empty
61    pub fn is_empty(&self) -> bool {
62        matches!(self.wheel.can_skip(), Skip::Empty)
63    }
64
65    pub fn len(&self) -> usize {
66        self.len
67    }
68
69    /// Insert a new entry
70    pub fn insert(&mut self, entry: EntryType) -> Result<(), TimerError<EntryType>> {
71        self.wheel.insert(entry)?;
72        self.len += 1;
73        Ok(())
74    }
75
76    /// Returns the set of items that need immediate action
77    pub fn pop(&mut self) -> PopResult<EntryType> {
78        let now = Instant::now();
79        let elapsed = now - self.last_check;
80        self.last_check = now;
81        let mut elapsed_ms = elapsed.as_millis() as u32;
82        POP_INTERVAL.observe(elapsed.as_secs_f64());
83
84        let mut items = vec![];
85
86        while elapsed_ms > 0 {
87            match self.wheel.can_skip() {
88                Skip::Empty => break,
89                Skip::None => {
90                    items.append(&mut self.wheel.tick());
91                    elapsed_ms -= 1;
92                }
93                Skip::Millis(m) => {
94                    let amount = m.min(elapsed_ms);
95                    self.wheel.skip(amount);
96                    elapsed_ms -= amount;
97                }
98            }
99        }
100
101        if !items.is_empty() {
102            self.len -= items.len();
103            return PopResult::Items(items);
104        }
105
106        match self.wheel.can_skip() {
107            Skip::None => PopResult::Sleep(Duration::from_millis(1)),
108            Skip::Empty => PopResult::Empty,
109            Skip::Millis(ms) => PopResult::Sleep(Duration::from_millis(ms.into())),
110        }
111    }
112
113    /// Drains the entire contents of the queue, returning all of the
114    /// contained items
115    pub fn drain(&mut self) -> Vec<EntryType> {
116        let mut items = vec![];
117        loop {
118            match self.wheel.can_skip() {
119                Skip::Empty => {
120                    self.last_check = Instant::now();
121                    self.len = 0;
122                    return items;
123                }
124                Skip::None => {
125                    items.append(&mut self.wheel.tick());
126                }
127                Skip::Millis(m) => {
128                    self.wheel.skip(m);
129                }
130            }
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[derive(Debug, PartialEq, Clone)]
140    struct Entry {
141        id: u64,
142        value: &'static str,
143        delay: Duration,
144    }
145
146    impl TimerEntryWithDelay for &Entry {
147        fn delay(&self) -> Duration {
148            self.delay
149        }
150    }
151
152    #[test]
153    fn draining() {
154        let item1 = Entry {
155            id: 1,
156            value: "foo",
157            delay: Duration::from_millis(1),
158        };
159        let item2 = Entry {
160            id: 2,
161            value: "bar",
162            delay: Duration::from_millis(10),
163        };
164        let item3 = Entry {
165            id: 3,
166            value: "baz",
167            delay: Duration::from_millis(5),
168        };
169
170        let mut queue = TimeQ::new();
171        queue.insert(&item1).unwrap();
172        queue.insert(&item2).unwrap();
173        queue.insert(&item3).unwrap();
174
175        let items = queue.drain();
176        assert_eq!(queue.len(), 0);
177        assert!(queue.is_empty());
178        assert_eq!(items, vec![&item1, &item3, &item2]);
179    }
180
181    #[test]
182    fn basic_queue() {
183        let mut queue = TimeQ::new();
184
185        let item1 = Entry {
186            id: 1,
187            value: "foo",
188            delay: Duration::from_millis(1),
189        };
190        let item2 = Entry {
191            id: 2,
192            value: "bar",
193            delay: Duration::from_secs(1),
194        };
195        let item3 = Entry {
196            id: 3,
197            value: "baz",
198            delay: Duration::from_millis(100),
199        };
200
201        queue.insert(&item1).unwrap();
202        queue.insert(&item2).unwrap();
203        queue.insert(&item3).unwrap();
204
205        assert_eq!(queue.len(), 3);
206        assert!(!queue.is_empty());
207
208        std::thread::sleep(Duration::from_millis(2));
209
210        match queue.pop() {
211            PopResult::Items(items) => assert_eq!(items, vec![&item1]),
212            _ => unreachable!(),
213        }
214
215        assert_eq!(queue.len(), 2);
216        assert!(!queue.is_empty());
217
218        std::thread::sleep(Duration::from_millis(2));
219
220        match queue.pop() {
221            PopResult::Sleep(ms) => std::thread::sleep(ms),
222            _ => unreachable!(),
223        }
224
225        // The PopResult::Sleep is approximate and often doesn't
226        // quite get us there, so sleep slightly longer
227        std::thread::sleep(Duration::from_millis(100));
228
229        match queue.pop() {
230            PopResult::Items(items) => assert_eq!(items, vec![&item3]),
231            PopResult::Sleep(ms) => unreachable!("still have {ms:?} to go"),
232            _ => unreachable!(),
233        }
234
235        assert_eq!(queue.len(), 1);
236        assert!(!queue.is_empty());
237
238        std::thread::sleep(Duration::from_secs(1));
239        match queue.pop() {
240            PopResult::Items(items) => assert_eq!(items, vec![&item2]),
241            _ => unreachable!(),
242        }
243
244        assert_eq!(queue.len(), 0);
245        assert!(queue.is_empty());
246    }
247}