1use hierarchical_hash_wheel_timer::wheels::quad_wheel::no_prune;
2use hierarchical_hash_wheel_timer::wheels::Skip;
3pub use hierarchical_hash_wheel_timer::TimerError;
4use kumo_prometheus::declare_metric;
5use std::time::{Duration, Instant};
6
7pub use hierarchical_hash_wheel_timer::wheels::quad_wheel::QuadWheelWithOverflow;
8pub use hierarchical_hash_wheel_timer::wheels::TimerEntryWithDelay;
9
10pub struct TimeQ<EntryType: TimerEntryWithDelay> {
16 wheel: QuadWheelWithOverflow<EntryType>,
17 last_check: Instant,
18 len: usize,
19}
20
21declare_metric! {
22static POP_INTERVAL: Histogram(
27 "timeq_pop_interval",
28 vec![3.0, 4.0, 5.0, 8.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0]
32);
33}
34
35#[must_use]
36pub enum PopResult<EntryType> {
37 Items(Vec<EntryType>),
39 Sleep(Duration),
41 Empty,
43}
44
45impl<EntryType: TimerEntryWithDelay> Default for TimeQ<EntryType> {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl<EntryType: TimerEntryWithDelay> TimeQ<EntryType> {
52 pub fn new() -> Self {
53 Self {
54 wheel: QuadWheelWithOverflow::new(no_prune),
55 last_check: Instant::now(),
56 len: 0,
57 }
58 }
59
60 pub fn is_empty(&self) -> bool {
62 matches!(self.wheel.can_skip(), Skip::Empty)
63 }
64
65 pub fn len(&self) -> usize {
66 self.len
67 }
68
69 pub fn insert(&mut self, entry: EntryType) -> Result<(), TimerError<EntryType>> {
71 self.wheel.insert(entry)?;
72 self.len += 1;
73 Ok(())
74 }
75
76 pub fn pop(&mut self) -> PopResult<EntryType> {
78 let now = Instant::now();
79 let elapsed = now - self.last_check;
80 self.last_check = now;
81 let mut elapsed_ms = elapsed.as_millis() as u32;
82 POP_INTERVAL.observe(elapsed.as_secs_f64());
83
84 let mut items = vec![];
85
86 while elapsed_ms > 0 {
87 match self.wheel.can_skip() {
88 Skip::Empty => break,
89 Skip::None => {
90 items.append(&mut self.wheel.tick());
91 elapsed_ms -= 1;
92 }
93 Skip::Millis(m) => {
94 let amount = m.min(elapsed_ms);
95 self.wheel.skip(amount);
96 elapsed_ms -= amount;
97 }
98 }
99 }
100
101 if !items.is_empty() {
102 self.len -= items.len();
103 return PopResult::Items(items);
104 }
105
106 match self.wheel.can_skip() {
107 Skip::None => PopResult::Sleep(Duration::from_millis(1)),
108 Skip::Empty => PopResult::Empty,
109 Skip::Millis(ms) => PopResult::Sleep(Duration::from_millis(ms.into())),
110 }
111 }
112
113 pub fn drain(&mut self) -> Vec<EntryType> {
116 let mut items = vec![];
117 loop {
118 match self.wheel.can_skip() {
119 Skip::Empty => {
120 self.last_check = Instant::now();
121 self.len = 0;
122 return items;
123 }
124 Skip::None => {
125 items.append(&mut self.wheel.tick());
126 }
127 Skip::Millis(m) => {
128 self.wheel.skip(m);
129 }
130 }
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[derive(Debug, PartialEq, Clone)]
140 struct Entry {
141 id: u64,
142 value: &'static str,
143 delay: Duration,
144 }
145
146 impl TimerEntryWithDelay for &Entry {
147 fn delay(&self) -> Duration {
148 self.delay
149 }
150 }
151
152 #[test]
153 fn draining() {
154 let item1 = Entry {
155 id: 1,
156 value: "foo",
157 delay: Duration::from_millis(1),
158 };
159 let item2 = Entry {
160 id: 2,
161 value: "bar",
162 delay: Duration::from_millis(10),
163 };
164 let item3 = Entry {
165 id: 3,
166 value: "baz",
167 delay: Duration::from_millis(5),
168 };
169
170 let mut queue = TimeQ::new();
171 queue.insert(&item1).unwrap();
172 queue.insert(&item2).unwrap();
173 queue.insert(&item3).unwrap();
174
175 let items = queue.drain();
176 assert_eq!(queue.len(), 0);
177 assert!(queue.is_empty());
178 assert_eq!(items, vec![&item1, &item3, &item2]);
179 }
180
181 #[test]
182 fn basic_queue() {
183 let mut queue = TimeQ::new();
184
185 let item1 = Entry {
186 id: 1,
187 value: "foo",
188 delay: Duration::from_millis(1),
189 };
190 let item2 = Entry {
191 id: 2,
192 value: "bar",
193 delay: Duration::from_secs(1),
194 };
195 let item3 = Entry {
196 id: 3,
197 value: "baz",
198 delay: Duration::from_millis(100),
199 };
200
201 queue.insert(&item1).unwrap();
202 queue.insert(&item2).unwrap();
203 queue.insert(&item3).unwrap();
204
205 assert_eq!(queue.len(), 3);
206 assert!(!queue.is_empty());
207
208 std::thread::sleep(Duration::from_millis(2));
209
210 match queue.pop() {
211 PopResult::Items(items) => assert_eq!(items, vec![&item1]),
212 _ => unreachable!(),
213 }
214
215 assert_eq!(queue.len(), 2);
216 assert!(!queue.is_empty());
217
218 std::thread::sleep(Duration::from_millis(2));
219
220 match queue.pop() {
221 PopResult::Sleep(ms) => std::thread::sleep(ms),
222 _ => unreachable!(),
223 }
224
225 std::thread::sleep(Duration::from_millis(100));
228
229 match queue.pop() {
230 PopResult::Items(items) => assert_eq!(items, vec![&item3]),
231 PopResult::Sleep(ms) => unreachable!("still have {ms:?} to go"),
232 _ => unreachable!(),
233 }
234
235 assert_eq!(queue.len(), 1);
236 assert!(!queue.is_empty());
237
238 std::thread::sleep(Duration::from_secs(1));
239 match queue.pop() {
240 PopResult::Items(items) => assert_eq!(items, vec![&item2]),
241 _ => unreachable!(),
242 }
243
244 assert_eq!(queue.len(), 0);
245 assert!(queue.is_empty());
246 }
247}