timeq/
lib.rs

1use hierarchical_hash_wheel_timer::wheels::quad_wheel::no_prune;
2use hierarchical_hash_wheel_timer::wheels::Skip;
3pub use hierarchical_hash_wheel_timer::TimerError;
4use prometheus::Histogram;
5use std::sync::LazyLock;
6use std::time::{Duration, Instant};
7
8pub use hierarchical_hash_wheel_timer::wheels::quad_wheel::QuadWheelWithOverflow;
9pub use hierarchical_hash_wheel_timer::wheels::TimerEntryWithDelay;
10
11/// A TimeQ is a queue datastructure where the contained items are time
12/// ordered.
13/// The underlying storage is a hashed hierarchical timer wheel, which
14/// allows for relatively cheap insertion and popping of ready items.
15/// It is also possible to cancel an entry given its id.
16pub struct TimeQ<EntryType: TimerEntryWithDelay> {
17    wheel: QuadWheelWithOverflow<EntryType>,
18    last_check: Instant,
19    len: usize,
20}
21
22static POP_INTERVAL: LazyLock<Histogram> = LazyLock::new(|| {
23    prometheus::register_histogram!(
24        "timeq_pop_interval",
25        "The amount of time that passes between calls to TimeQ::pop",
26        // Since we generally start at 3s, let's use a custom set
27        // of buckets that will let us catch more "dynamic range"
28        // in longer intervals
29        vec![3.0, 4.0, 5.0, 8.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0]
30    )
31    .unwrap()
32});
33
34#[must_use]
35pub enum PopResult<EntryType> {
36    /// These items are ready for immediate action
37    Items(Vec<EntryType>),
38    /// No items will be ready for the specified duration
39    Sleep(Duration),
40    /// The queue is empty
41    Empty,
42}
43
44impl<EntryType: TimerEntryWithDelay> Default for TimeQ<EntryType> {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl<EntryType: TimerEntryWithDelay> TimeQ<EntryType> {
51    pub fn new() -> Self {
52        Self {
53            wheel: QuadWheelWithOverflow::new(no_prune),
54            last_check: Instant::now(),
55            len: 0,
56        }
57    }
58
59    /// Returns true if the wheel is empty
60    pub fn is_empty(&self) -> bool {
61        matches!(self.wheel.can_skip(), Skip::Empty)
62    }
63
64    pub fn len(&self) -> usize {
65        self.len
66    }
67
68    /// Insert a new entry
69    pub fn insert(&mut self, entry: EntryType) -> Result<(), TimerError<EntryType>> {
70        self.wheel.insert(entry)?;
71        self.len += 1;
72        Ok(())
73    }
74
75    /// Returns the set of items that need immediate action
76    pub fn pop(&mut self) -> PopResult<EntryType> {
77        let now = Instant::now();
78        let elapsed = now - self.last_check;
79        self.last_check = now;
80        let mut elapsed_ms = elapsed.as_millis() as u32;
81        POP_INTERVAL.observe(elapsed.as_secs_f64());
82
83        let mut items = vec![];
84
85        while elapsed_ms > 0 {
86            match self.wheel.can_skip() {
87                Skip::Empty => break,
88                Skip::None => {
89                    items.append(&mut self.wheel.tick());
90                    elapsed_ms -= 1;
91                }
92                Skip::Millis(m) => {
93                    let amount = m.min(elapsed_ms);
94                    self.wheel.skip(amount);
95                    elapsed_ms -= amount;
96                }
97            }
98        }
99
100        if !items.is_empty() {
101            self.len -= items.len();
102            return PopResult::Items(items);
103        }
104
105        match self.wheel.can_skip() {
106            Skip::None => PopResult::Sleep(Duration::from_millis(1)),
107            Skip::Empty => PopResult::Empty,
108            Skip::Millis(ms) => PopResult::Sleep(Duration::from_millis(ms.into())),
109        }
110    }
111
112    /// Drains the entire contents of the queue, returning all of the
113    /// contained items
114    pub fn drain(&mut self) -> Vec<EntryType> {
115        let mut items = vec![];
116        loop {
117            match self.wheel.can_skip() {
118                Skip::Empty => {
119                    self.last_check = Instant::now();
120                    self.len = 0;
121                    return items;
122                }
123                Skip::None => {
124                    items.append(&mut self.wheel.tick());
125                }
126                Skip::Millis(m) => {
127                    self.wheel.skip(m);
128                }
129            }
130        }
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[derive(Debug, PartialEq, Clone)]
139    struct Entry {
140        id: u64,
141        value: &'static str,
142        delay: Duration,
143    }
144
145    impl TimerEntryWithDelay for &Entry {
146        fn delay(&self) -> Duration {
147            self.delay
148        }
149    }
150
151    #[test]
152    fn draining() {
153        let item1 = Entry {
154            id: 1,
155            value: "foo",
156            delay: Duration::from_millis(1),
157        };
158        let item2 = Entry {
159            id: 2,
160            value: "bar",
161            delay: Duration::from_millis(10),
162        };
163        let item3 = Entry {
164            id: 3,
165            value: "baz",
166            delay: Duration::from_millis(5),
167        };
168
169        let mut queue = TimeQ::new();
170        queue.insert(&item1).unwrap();
171        queue.insert(&item2).unwrap();
172        queue.insert(&item3).unwrap();
173
174        let items = queue.drain();
175        assert_eq!(queue.len(), 0);
176        assert!(queue.is_empty());
177        assert_eq!(items, vec![&item1, &item3, &item2]);
178    }
179
180    #[test]
181    fn basic_queue() {
182        let mut queue = TimeQ::new();
183
184        let item1 = Entry {
185            id: 1,
186            value: "foo",
187            delay: Duration::from_millis(1),
188        };
189        let item2 = Entry {
190            id: 2,
191            value: "bar",
192            delay: Duration::from_secs(1),
193        };
194        let item3 = Entry {
195            id: 3,
196            value: "baz",
197            delay: Duration::from_millis(100),
198        };
199
200        queue.insert(&item1).unwrap();
201        queue.insert(&item2).unwrap();
202        queue.insert(&item3).unwrap();
203
204        assert_eq!(queue.len(), 3);
205        assert!(!queue.is_empty());
206
207        std::thread::sleep(Duration::from_millis(2));
208
209        match queue.pop() {
210            PopResult::Items(items) => assert_eq!(items, vec![&item1]),
211            _ => unreachable!(),
212        }
213
214        assert_eq!(queue.len(), 2);
215        assert!(!queue.is_empty());
216
217        std::thread::sleep(Duration::from_millis(2));
218
219        match queue.pop() {
220            PopResult::Sleep(ms) => std::thread::sleep(ms),
221            _ => unreachable!(),
222        }
223
224        // The PopResult::Sleep is approximate and often doesn't
225        // quite get us there, so sleep slightly longer
226        std::thread::sleep(Duration::from_millis(100));
227
228        match queue.pop() {
229            PopResult::Items(items) => assert_eq!(items, vec![&item3]),
230            PopResult::Sleep(ms) => unreachable!("still have {ms:?} to go"),
231            _ => unreachable!(),
232        }
233
234        assert_eq!(queue.len(), 1);
235        assert!(!queue.is_empty());
236
237        std::thread::sleep(Duration::from_secs(1));
238        match queue.pop() {
239            PopResult::Items(items) => assert_eq!(items, vec![&item2]),
240            _ => unreachable!(),
241        }
242
243        assert_eq!(queue.len(), 0);
244        assert!(queue.is_empty());
245    }
246}