1use hierarchical_hash_wheel_timer::wheels::quad_wheel::no_prune;
2use hierarchical_hash_wheel_timer::wheels::Skip;
3pub use hierarchical_hash_wheel_timer::TimerError;
4use prometheus::Histogram;
5use std::sync::LazyLock;
6use std::time::{Duration, Instant};
7
8pub use hierarchical_hash_wheel_timer::wheels::quad_wheel::QuadWheelWithOverflow;
9pub use hierarchical_hash_wheel_timer::wheels::TimerEntryWithDelay;
10
11pub struct TimeQ<EntryType: TimerEntryWithDelay> {
17 wheel: QuadWheelWithOverflow<EntryType>,
18 last_check: Instant,
19 len: usize,
20}
21
22static POP_INTERVAL: LazyLock<Histogram> = LazyLock::new(|| {
23 prometheus::register_histogram!(
24 "timeq_pop_interval",
25 "The amount of time that passes between calls to TimeQ::pop",
26 vec![3.0, 4.0, 5.0, 8.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0]
30 )
31 .unwrap()
32});
33
34#[must_use]
35pub enum PopResult<EntryType> {
36 Items(Vec<EntryType>),
38 Sleep(Duration),
40 Empty,
42}
43
44impl<EntryType: TimerEntryWithDelay> Default for TimeQ<EntryType> {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl<EntryType: TimerEntryWithDelay> TimeQ<EntryType> {
51 pub fn new() -> Self {
52 Self {
53 wheel: QuadWheelWithOverflow::new(no_prune),
54 last_check: Instant::now(),
55 len: 0,
56 }
57 }
58
59 pub fn is_empty(&self) -> bool {
61 matches!(self.wheel.can_skip(), Skip::Empty)
62 }
63
64 pub fn len(&self) -> usize {
65 self.len
66 }
67
68 pub fn insert(&mut self, entry: EntryType) -> Result<(), TimerError<EntryType>> {
70 self.wheel.insert(entry)?;
71 self.len += 1;
72 Ok(())
73 }
74
75 pub fn pop(&mut self) -> PopResult<EntryType> {
77 let now = Instant::now();
78 let elapsed = now - self.last_check;
79 self.last_check = now;
80 let mut elapsed_ms = elapsed.as_millis() as u32;
81 POP_INTERVAL.observe(elapsed.as_secs_f64());
82
83 let mut items = vec![];
84
85 while elapsed_ms > 0 {
86 match self.wheel.can_skip() {
87 Skip::Empty => break,
88 Skip::None => {
89 items.append(&mut self.wheel.tick());
90 elapsed_ms -= 1;
91 }
92 Skip::Millis(m) => {
93 let amount = m.min(elapsed_ms);
94 self.wheel.skip(amount);
95 elapsed_ms -= amount;
96 }
97 }
98 }
99
100 if !items.is_empty() {
101 self.len -= items.len();
102 return PopResult::Items(items);
103 }
104
105 match self.wheel.can_skip() {
106 Skip::None => PopResult::Sleep(Duration::from_millis(1)),
107 Skip::Empty => PopResult::Empty,
108 Skip::Millis(ms) => PopResult::Sleep(Duration::from_millis(ms.into())),
109 }
110 }
111
112 pub fn drain(&mut self) -> Vec<EntryType> {
115 let mut items = vec![];
116 loop {
117 match self.wheel.can_skip() {
118 Skip::Empty => {
119 self.last_check = Instant::now();
120 self.len = 0;
121 return items;
122 }
123 Skip::None => {
124 items.append(&mut self.wheel.tick());
125 }
126 Skip::Millis(m) => {
127 self.wheel.skip(m);
128 }
129 }
130 }
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[derive(Debug, PartialEq, Clone)]
139 struct Entry {
140 id: u64,
141 value: &'static str,
142 delay: Duration,
143 }
144
145 impl TimerEntryWithDelay for &Entry {
146 fn delay(&self) -> Duration {
147 self.delay
148 }
149 }
150
151 #[test]
152 fn draining() {
153 let item1 = Entry {
154 id: 1,
155 value: "foo",
156 delay: Duration::from_millis(1),
157 };
158 let item2 = Entry {
159 id: 2,
160 value: "bar",
161 delay: Duration::from_millis(10),
162 };
163 let item3 = Entry {
164 id: 3,
165 value: "baz",
166 delay: Duration::from_millis(5),
167 };
168
169 let mut queue = TimeQ::new();
170 queue.insert(&item1).unwrap();
171 queue.insert(&item2).unwrap();
172 queue.insert(&item3).unwrap();
173
174 let items = queue.drain();
175 assert_eq!(queue.len(), 0);
176 assert!(queue.is_empty());
177 assert_eq!(items, vec![&item1, &item3, &item2]);
178 }
179
180 #[test]
181 fn basic_queue() {
182 let mut queue = TimeQ::new();
183
184 let item1 = Entry {
185 id: 1,
186 value: "foo",
187 delay: Duration::from_millis(1),
188 };
189 let item2 = Entry {
190 id: 2,
191 value: "bar",
192 delay: Duration::from_secs(1),
193 };
194 let item3 = Entry {
195 id: 3,
196 value: "baz",
197 delay: Duration::from_millis(100),
198 };
199
200 queue.insert(&item1).unwrap();
201 queue.insert(&item2).unwrap();
202 queue.insert(&item3).unwrap();
203
204 assert_eq!(queue.len(), 3);
205 assert!(!queue.is_empty());
206
207 std::thread::sleep(Duration::from_millis(2));
208
209 match queue.pop() {
210 PopResult::Items(items) => assert_eq!(items, vec![&item1]),
211 _ => unreachable!(),
212 }
213
214 assert_eq!(queue.len(), 2);
215 assert!(!queue.is_empty());
216
217 std::thread::sleep(Duration::from_millis(2));
218
219 match queue.pop() {
220 PopResult::Sleep(ms) => std::thread::sleep(ms),
221 _ => unreachable!(),
222 }
223
224 std::thread::sleep(Duration::from_millis(100));
227
228 match queue.pop() {
229 PopResult::Items(items) => assert_eq!(items, vec![&item3]),
230 PopResult::Sleep(ms) => unreachable!("still have {ms:?} to go"),
231 _ => unreachable!(),
232 }
233
234 assert_eq!(queue.len(), 1);
235 assert!(!queue.is_empty());
236
237 std::thread::sleep(Duration::from_secs(1));
238 match queue.pop() {
239 PopResult::Items(items) => assert_eq!(items, vec![&item2]),
240 _ => unreachable!(),
241 }
242
243 assert_eq!(queue.len(), 0);
244 assert!(queue.is_empty());
245 }
246}