1use crate::metrics::*;
2use dashmap::DashMap;
3use kumo_server_memory::subscribe_to_memory_status_changes_async;
4use parking_lot::Mutex;
5use prometheus::{IntCounter, IntGauge};
6use scopeguard::defer;
7use std::borrow::Borrow;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, LazyLock, Weak};
14use tokio::sync::Semaphore;
15use tokio::time::{timeout_at, Duration, Instant};
16pub use {linkme, paste};
17
18mod metrics;
19
20static CACHES: LazyLock<Mutex<Vec<Weak<dyn CachePurger + Send + Sync>>>> =
21 LazyLock::new(Mutex::default);
22
23struct Inner<K: Clone + Hash + Eq + Debug, V: Clone + Send + Sync + Debug> {
24 name: String,
25 tick: AtomicUsize,
26 capacity: AtomicUsize,
27 allow_stale_reads: AtomicBool,
28 cache: DashMap<K, Item<V>>,
29 lru_samples: AtomicUsize,
30 sema_timeout_milliseconds: AtomicUsize,
31 lookup_counter: IntCounter,
32 evict_counter: IntCounter,
33 expire_counter: IntCounter,
34 hit_counter: IntCounter,
35 miss_counter: IntCounter,
36 populate_counter: IntCounter,
37 insert_counter: IntCounter,
38 stale_counter: IntCounter,
39 error_counter: IntCounter,
40 wait_gauge: IntGauge,
41 size_gauge: IntGauge,
42}
43
44impl<
45 K: Clone + Debug + Send + Sync + Hash + Eq + 'static,
46 V: Clone + Debug + Send + Sync + 'static,
47 > Inner<K, V>
48{
49 pub fn clear(&self) -> usize {
50 let num_entries = self.cache.len();
51
52 self.cache.retain(|_k, item| {
57 if let ItemState::Pending(sema) = &item.item {
58 sema.close();
60 }
61 false
62 });
63
64 self.size_gauge.set(self.cache.len() as i64);
65 num_entries
66 }
67
68 pub fn evict_some(&self, target: usize) -> usize {
87 let now = Instant::now();
88
89 let cache_size = self.cache.len();
92 let num_samples = self.lru_samples.load(Ordering::Relaxed).min(cache_size);
94
95 let mut expired_keys = vec![];
97 let mut samples = vec![];
99
100 {
115 let mut rng = rand::thread_rng();
116 let mut indices =
117 rand::seq::index::sample(&mut rng, cache_size, num_samples).into_vec();
118 indices.sort();
119 let mut iter = self.cache.iter();
120 let mut current_idx = 0;
121
122 fn advance_by(iter: &mut impl Iterator, skip_amount: usize) {
127 for _ in 0..skip_amount {
128 if iter.next().is_none() {
129 return;
130 }
131 }
132 }
133
134 for idx in indices {
135 let skip_amount = idx - current_idx;
139 advance_by(&mut iter, skip_amount);
140
141 match iter.next() {
142 Some(map_entry) => {
143 current_idx = idx + 1;
144 let item = map_entry.value();
145 match &item.item {
146 ItemState::Pending(_) | ItemState::Refreshing { .. } => {
147 }
149 ItemState::Present(_) | ItemState::Failed(_) => {
150 if now >= item.expiration {
151 expired_keys.push(map_entry.key().clone());
152 } else {
153 let last_tick = item.last_tick.load(Ordering::Relaxed);
154 samples.push((map_entry.key().clone(), last_tick));
155 }
156 }
157 }
158 }
159 None => {
160 break;
161 }
162 }
163 }
164 }
165
166 let mut num_removed = 0;
167 for key in expired_keys {
168 let removed = self
172 .cache
173 .remove_if(&key, |_k, entry| now >= entry.expiration)
174 .is_some();
175 if removed {
176 tracing::trace!("{} expired {key:?}", self.name);
177 num_removed += 1;
178 self.expire_counter.inc();
179 }
180 }
181
182 let target = target.min(samples.len() / 2).max(1);
189
190 if num_removed >= target {
192 self.size_gauge.set(self.cache.len() as i64);
193 tracing::trace!("{} expired {num_removed} of target {target}", self.name);
194 return num_removed;
195 }
196
197 samples.sort_by(|(_ka, tick_a), (_kb, tick_b)| tick_a.cmp(tick_b));
200
201 for (key, tick) in samples {
202 if self
206 .cache
207 .remove_if(&key, |_k, item| {
208 item.last_tick.load(Ordering::Relaxed) == tick
209 })
210 .is_some()
211 {
212 tracing::debug!("{} evicted {key:?}", self.name);
213 num_removed += 1;
214 self.evict_counter.inc();
215 self.size_gauge.set(self.cache.len() as i64);
216 if num_removed >= target {
217 return num_removed;
218 }
219 }
220 }
221
222 if num_removed == 0 {
223 tracing::debug!(
224 "{} did not find anything to evict, target was {target}",
225 self.name
226 );
227 }
228
229 tracing::trace!("{} removed {num_removed} of target {target}", self.name);
230
231 num_removed
232 }
233
234 pub fn maybe_evict(&self) -> usize {
237 let cache_size = self.cache.len();
238 let capacity = self.capacity.load(Ordering::Relaxed);
239 if cache_size > capacity {
240 self.evict_some(cache_size - capacity)
241 } else {
242 0
243 }
244 }
245}
246
247trait CachePurger {
248 fn name(&self) -> &str;
249 fn purge(&self) -> usize;
250 fn process_expirations(&self) -> usize;
251 fn update_capacity(&self, capacity: usize);
252}
253
254impl<
255 K: Clone + Debug + Send + Sync + Hash + Eq + 'static,
256 V: Clone + Debug + Send + Sync + 'static,
257 > CachePurger for Inner<K, V>
258{
259 fn name(&self) -> &str {
260 &self.name
261 }
262 fn purge(&self) -> usize {
263 self.clear()
264 }
265 fn process_expirations(&self) -> usize {
266 let now = Instant::now();
267 let mut expired_keys = vec![];
268 for map_entry in self.cache.iter() {
269 let item = map_entry.value();
270 match &item.item {
271 ItemState::Pending(_) | ItemState::Refreshing { .. } => {
272 }
274 ItemState::Present(_) | ItemState::Failed(_) => {
275 if now >= item.expiration {
276 expired_keys.push(map_entry.key().clone());
277 }
278 }
279 }
280 }
281
282 let mut num_removed = 0;
283 for key in expired_keys {
284 let removed = self
288 .cache
289 .remove_if(&key, |_k, entry| now >= entry.expiration)
290 .is_some();
291 if removed {
292 num_removed += 1;
293 self.expire_counter.inc();
294 self.size_gauge.set(self.cache.len() as i64);
295 }
296 }
297
298 num_removed + self.maybe_evict()
299 }
300
301 fn update_capacity(&self, capacity: usize) {
302 self.capacity.store(capacity, Ordering::Relaxed);
303 self.process_expirations();
308 }
309}
310
311fn all_caches() -> Vec<Arc<dyn CachePurger + Send + Sync>> {
312 let mut result = vec![];
313 let mut caches = CACHES.lock();
314 caches.retain(|entry| match entry.upgrade() {
315 Some(purger) => {
316 result.push(purger);
317 true
318 }
319 None => false,
320 });
321 result
322}
323
324pub fn purge_all_caches() {
325 let purgers = all_caches();
326
327 tracing::error!("purging {} caches", purgers.len());
328 for purger in purgers {
329 let name = purger.name();
330 let num_entries = purger.purge();
331 tracing::error!("cleared {num_entries} entries from cache {name}");
332 }
333}
334
335async fn prune_expired_caches() {
336 loop {
337 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
338 let purgers = all_caches();
339
340 for p in purgers {
341 let n = p.process_expirations();
342 if n > 0 {
343 tracing::debug!("expired {n} entries from cache {}", p.name());
344 }
345 }
346 }
347}
348
349#[linkme::distributed_slice]
350pub static LRUTTL_VIVIFY: [fn() -> &'static str];
351
352#[macro_export]
359macro_rules! declare_cache {
360 ($vis:vis
361 static $sym:ident:
362 LruCacheWithTtl<$key:ty, $value:ty>::new($name:expr, $capacity:expr);
363 ) => {
364 $vis static $sym: ::std::sync::LazyLock<$crate::LruCacheWithTtl<$key, $value>> =
365 ::std::sync::LazyLock::new(
366 || $crate::LruCacheWithTtl::new($name, $capacity));
367
368 $crate::paste::paste! {
370 #[linkme::distributed_slice($crate::LRUTTL_VIVIFY)]
371 static [<VIVIFY_ $sym>]: fn() -> &'static str = || {
372 ::std::sync::LazyLock::force(&$sym);
373 $name
374 };
375 }
376 };
377}
378
379fn vivify() {
382 LazyLock::force(&PREDEFINED_NAMES);
383}
384
385fn vivify_impl() -> HashSet<&'static str> {
386 let mut set = HashSet::new();
387
388 for vivify_func in LRUTTL_VIVIFY {
389 let name = vivify_func();
390 assert!(!set.contains(name), "duplicate cache name {name}");
391 set.insert(name);
392 }
393
394 set
395}
396
397static PREDEFINED_NAMES: LazyLock<HashSet<&'static str>> = LazyLock::new(vivify_impl);
398
399pub fn is_name_available(name: &str) -> bool {
400 !PREDEFINED_NAMES.contains(name)
401}
402
403pub fn set_cache_capacity(name: &str, capacity: usize) -> bool {
405 if !PREDEFINED_NAMES.contains(name) {
406 return false;
407 }
408 let caches = all_caches();
409 match caches.iter().find(|p| p.name() == name) {
410 Some(p) => {
411 p.update_capacity(capacity);
412 true
413 }
414 None => false,
415 }
416}
417
418pub fn spawn_memory_monitor() {
419 vivify();
420 tokio::spawn(purge_caches_on_memory_shortage());
421 tokio::spawn(prune_expired_caches());
422}
423
424async fn purge_caches_on_memory_shortage() {
425 tracing::debug!("starting memory monitor");
426 let mut memory_status = subscribe_to_memory_status_changes_async().await;
427 while let Ok(()) = memory_status.changed().await {
428 if kumo_server_memory::get_headroom() == 0 {
429 purge_all_caches();
430
431 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
435 }
436 }
437}
438
439#[derive(Debug, Clone)]
440enum ItemState<V>
441where
442 V: Send,
443 V: Sync,
444{
445 Present(V),
446 Pending(Arc<Semaphore>),
447 Failed(Arc<anyhow::Error>),
448 Refreshing {
449 stale_value: V,
450 pending: Arc<Semaphore>,
451 },
452}
453
454#[derive(Debug)]
455struct Item<V>
456where
457 V: Send,
458 V: Sync,
459{
460 item: ItemState<V>,
461 expiration: Instant,
462 last_tick: AtomicUsize,
463}
464
465impl<V: Clone + Send + Sync> Clone for Item<V> {
466 fn clone(&self) -> Self {
467 Self {
468 item: self.item.clone(),
469 expiration: self.expiration,
470 last_tick: self.last_tick.load(Ordering::Relaxed).into(),
471 }
472 }
473}
474
475#[derive(Debug)]
476pub struct ItemLookup<V: Debug> {
477 pub item: V,
479 pub is_fresh: bool,
482 pub expiration: Instant,
484}
485
486pub struct LruCacheWithTtl<K: Clone + Debug + Hash + Eq, V: Clone + Debug + Send + Sync> {
487 inner: Arc<Inner<K, V>>,
488}
489
490impl<
491 K: Clone + Debug + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
492 V: Clone + Debug + Send + Sync + 'static,
493 > LruCacheWithTtl<K, V>
494{
495 pub fn new<S: Into<String>>(name: S, capacity: usize) -> Self {
496 let name = name.into();
497 let cache = DashMap::new();
498
499 let lookup_counter = CACHE_LOOKUP
500 .get_metric_with_label_values(&[&name])
501 .expect("failed to get counter");
502 let hit_counter = CACHE_HIT
503 .get_metric_with_label_values(&[&name])
504 .expect("failed to get counter");
505 let stale_counter = CACHE_STALE
506 .get_metric_with_label_values(&[&name])
507 .expect("failed to get counter");
508 let evict_counter = CACHE_EVICT
509 .get_metric_with_label_values(&[&name])
510 .expect("failed to get counter");
511 let expire_counter = CACHE_EXPIRE
512 .get_metric_with_label_values(&[&name])
513 .expect("failed to get counter");
514 let miss_counter = CACHE_MISS
515 .get_metric_with_label_values(&[&name])
516 .expect("failed to get counter");
517 let populate_counter = CACHE_POPULATED
518 .get_metric_with_label_values(&[&name])
519 .expect("failed to get counter");
520 let insert_counter = CACHE_INSERT
521 .get_metric_with_label_values(&[&name])
522 .expect("failed to get counter");
523 let error_counter = CACHE_ERROR
524 .get_metric_with_label_values(&[&name])
525 .expect("failed to get counter");
526 let wait_gauge = CACHE_WAIT
527 .get_metric_with_label_values(&[&name])
528 .expect("failed to get counter");
529 let size_gauge = CACHE_SIZE
530 .get_metric_with_label_values(&[&name])
531 .expect("failed to get counter");
532
533 let inner = Arc::new(Inner {
534 name,
535 cache,
536 tick: AtomicUsize::new(0),
537 allow_stale_reads: AtomicBool::new(false),
538 capacity: AtomicUsize::new(capacity),
539 lru_samples: AtomicUsize::new(10),
540 sema_timeout_milliseconds: AtomicUsize::new(120_000),
541 lookup_counter,
542 evict_counter,
543 expire_counter,
544 hit_counter,
545 miss_counter,
546 populate_counter,
547 error_counter,
548 wait_gauge,
549 insert_counter,
550 stale_counter,
551 size_gauge,
552 });
553
554 {
558 let generic: Arc<dyn CachePurger + Send + Sync> = inner.clone();
559 CACHES.lock().push(Arc::downgrade(&generic));
560 tracing::debug!(
561 "registered cache {} with capacity {capacity}",
562 generic.name()
563 );
564 }
565
566 Self { inner }
567 }
568
569 fn allow_stale_reads(&self) -> bool {
570 self.inner.allow_stale_reads.load(Ordering::Relaxed)
571 }
572
573 pub fn set_allow_stale_reads(&self, value: bool) {
574 self.inner.allow_stale_reads.store(value, Ordering::Relaxed);
575 }
576
577 pub fn set_sema_timeout(&self, duration: Duration) {
578 self.inner
579 .sema_timeout_milliseconds
580 .store(duration.as_millis() as usize, Ordering::Relaxed);
581 }
582
583 pub fn clear(&self) -> usize {
584 self.inner.clear()
585 }
586
587 fn inc_tick(&self) -> usize {
588 self.inner.tick.fetch_add(1, Ordering::Relaxed) + 1
589 }
590
591 fn update_tick(&self, item: &Item<V>) {
592 let v = self.inc_tick();
593 item.last_tick.store(v, Ordering::Relaxed);
594 }
595
596 pub fn lookup<Q: ?Sized>(&self, name: &Q) -> Option<ItemLookup<V>>
597 where
598 K: Borrow<Q>,
599 Q: Hash + Eq,
600 {
601 self.inner.lookup_counter.inc();
602 match self.inner.cache.get_mut(name) {
603 None => {
604 self.inner.miss_counter.inc();
605 None
606 }
607 Some(entry) => {
608 match &entry.item {
609 ItemState::Present(item) => {
610 let now = Instant::now();
611 if now >= entry.expiration {
612 if self.allow_stale_reads() {
614 self.inner.miss_counter.inc();
621 return None;
622 }
623
624 drop(entry);
628 if self
629 .inner
630 .cache
631 .remove_if(name, |_k, entry| now >= entry.expiration)
632 .is_some()
633 {
634 self.inner.expire_counter.inc();
635 self.inner.size_gauge.set(self.inner.cache.len() as i64);
636 }
637 self.inner.miss_counter.inc();
638 return None;
639 }
640 self.inner.hit_counter.inc();
641 self.update_tick(&entry);
642 Some(ItemLookup {
643 item: item.clone(),
644 expiration: entry.expiration,
645 is_fresh: false,
646 })
647 }
648 ItemState::Refreshing { .. } | ItemState::Pending(_) | ItemState::Failed(_) => {
649 self.inner.miss_counter.inc();
650 None
651 }
652 }
653 }
654 }
655 }
656
657 pub fn get<Q: ?Sized>(&self, name: &Q) -> Option<V>
658 where
659 K: Borrow<Q>,
660 Q: Hash + Eq,
661 {
662 self.lookup(name).map(|lookup| lookup.item)
663 }
664
665 pub async fn insert(&self, name: K, item: V, expiration: Instant) -> V {
666 self.inner.cache.insert(
667 name,
668 Item {
669 item: ItemState::Present(item.clone()),
670 expiration,
671 last_tick: self.inc_tick().into(),
672 },
673 );
674
675 self.inner.insert_counter.inc();
676 self.inner.size_gauge.set(self.inner.cache.len() as i64);
677 self.inner.maybe_evict();
678
679 item
680 }
681
682 fn clone_item_state(&self, name: &K) -> (ItemState<V>, Instant) {
683 let mut is_new = false;
684 let mut entry = self.inner.cache.entry(name.clone()).or_insert_with(|| {
685 is_new = true;
686 Item {
687 item: ItemState::Pending(Arc::new(Semaphore::new(1))),
688 expiration: Instant::now() + Duration::from_secs(60),
689 last_tick: self.inc_tick().into(),
690 }
691 });
692
693 match &entry.value().item {
694 ItemState::Pending(sema) => {
695 if sema.is_closed() {
696 entry.value_mut().item = ItemState::Pending(Arc::new(Semaphore::new(1)));
697 }
698 }
699 ItemState::Refreshing {
700 stale_value,
701 pending,
702 } => {
703 if pending.is_closed() {
704 entry.value_mut().item = ItemState::Refreshing {
705 stale_value: stale_value.clone(),
706 pending: Arc::new(Semaphore::new(1)),
707 };
708 }
709 }
710 ItemState::Present(item) => {
711 let now = Instant::now();
712 if now >= entry.expiration {
713 let pending = Arc::new(Semaphore::new(1));
715 if self.allow_stale_reads() {
716 entry.value_mut().item = ItemState::Refreshing {
717 stale_value: item.clone(),
718 pending,
719 };
720 } else {
721 entry.value_mut().item = ItemState::Pending(pending);
722 }
723 }
724 }
725 ItemState::Failed(_) => {
726 let now = Instant::now();
727 if now >= entry.expiration {
728 entry.value_mut().item = ItemState::Pending(Arc::new(Semaphore::new(1)));
730 }
731 }
732 }
733
734 self.update_tick(&entry);
735 let item = entry.value();
736 let result = (item.item.clone(), entry.expiration);
737 drop(entry);
738
739 if is_new {
740 self.inner.size_gauge.set(self.inner.cache.len() as i64);
741 self.inner.maybe_evict();
742 }
743
744 result
745 }
746
747 pub async fn get_or_try_insert<E: Into<anyhow::Error>, TTL: FnOnce(&V) -> Duration>(
753 &self,
754 name: &K,
755 ttl_func: TTL,
756 fut: impl Future<Output = Result<V, E>>,
757 ) -> Result<ItemLookup<V>, Arc<anyhow::Error>> {
758 if let Some(entry) = self.lookup(name) {
760 return Ok(entry);
761 }
762
763 let timeout_duration = Duration::from_millis(
764 self.inner.sema_timeout_milliseconds.load(Ordering::Relaxed) as u64,
765 );
766 let start = Instant::now();
767 let deadline = start + timeout_duration;
768
769 'retry: loop {
771 let (stale_value, sema) = match self.clone_item_state(name) {
772 (ItemState::Present(item), expiration) => {
773 return Ok(ItemLookup {
774 item,
775 expiration,
776 is_fresh: false,
777 });
778 }
779 (ItemState::Failed(error), _) => {
780 return Err(error);
781 }
782 (
783 ItemState::Refreshing {
784 stale_value,
785 pending,
786 },
787 expiration,
788 ) => (Some((stale_value, expiration)), pending),
789 (ItemState::Pending(sema), _) => (None, sema),
790 };
791
792 let wait_result = {
793 self.inner.wait_gauge.inc();
794 defer! {
795 self.inner.wait_gauge.dec();
796 }
797
798 match timeout_at(deadline, sema.acquire_owned()).await {
799 Err(_) => {
800 if let Some((item, expiration)) = stale_value {
801 tracing::debug!(
802 "{} semaphore acquire for {name:?} timed out after \
803 {timeout_duration:?}, allowing stale value to satisfy the lookup",
804 self.inner.name
805 );
806 self.inner.stale_counter.inc();
807 return Ok(ItemLookup {
808 item,
809 expiration,
810 is_fresh: false,
811 });
812 }
813 tracing::debug!(
814 "{} semaphore acquire for {name:?} timed out after \
815 {timeout_duration:?}, returning error",
816 self.inner.name
817 );
818
819 self.inner.error_counter.inc();
820 return Err(Arc::new(anyhow::anyhow!(
821 "{} lookup for {name:?} \
822 timed out after {timeout_duration:?} \
823 on semaphore acquire while waiting for cache to populate",
824 self.inner.name
825 )));
826 }
827 Ok(r) => r,
828 }
829 };
830
831 let current_sema = match self.clone_item_state(name) {
834 (ItemState::Present(item), expiration) => {
835 return Ok(ItemLookup {
836 item,
837 expiration,
838 is_fresh: false,
839 });
840 }
841 (ItemState::Failed(error), _) => {
842 self.inner.hit_counter.inc();
843 return Err(error);
844 }
845 (
846 ItemState::Refreshing {
847 stale_value: _,
848 pending,
849 },
850 _,
851 ) => pending,
852 (ItemState::Pending(current_sema), _) => current_sema,
853 };
854
855 match wait_result {
857 Ok(permit) => {
858 defer! {
866 permit.semaphore().close();
867 }
868
869 if !Arc::ptr_eq(¤t_sema, permit.semaphore()) {
870 self.inner.error_counter.inc();
871 tracing::warn!(
872 "{} mismatched semaphores for {name:?}, \
873 will restart cache resolve.",
874 self.inner.name
875 );
876 continue 'retry;
877 }
878
879 self.inner.populate_counter.inc();
880 let mut ttl = Duration::from_secs(60);
881 let future_result = fut.await;
882 let now = Instant::now();
883
884 let (item_result, return_value) = match future_result {
885 Ok(item) => {
886 ttl = ttl_func(&item);
887 (
888 ItemState::Present(item.clone()),
889 Ok(ItemLookup {
890 item,
891 expiration: now + ttl,
892 is_fresh: true,
893 }),
894 )
895 }
896 Err(err) => {
897 self.inner.error_counter.inc();
898 let err = Arc::new(err.into());
899 (ItemState::Failed(err.clone()), Err(err))
900 }
901 };
902
903 self.inner.cache.insert(
904 name.clone(),
905 Item {
906 item: item_result,
907 expiration: Instant::now() + ttl,
908 last_tick: self.inc_tick().into(),
909 },
910 );
911 self.inner.maybe_evict();
912 return return_value;
913 }
914 Err(_) => {
915 self.inner.error_counter.inc();
916
917 tracing::debug!(
920 "{} lookup for {name:?} woke up semaphores \
921 but is still marked pending, \
922 will restart cache lookup",
923 self.inner.name
924 );
925 continue 'retry;
926 }
927 }
928 }
929 }
930}
931
932#[cfg(test)]
933mod test {
934 use super::*;
935 use test_log::test; #[test(tokio::test)]
938 async fn test_capacity() {
939 let cache = LruCacheWithTtl::new("test_capacity", 40);
940
941 let expiration = Instant::now() + Duration::from_secs(60);
942 for i in 0..100 {
943 cache.insert(i, i, expiration).await;
944 }
945
946 assert_eq!(cache.inner.cache.len(), 40, "capacity is respected");
947 }
948
949 #[test(tokio::test)]
950 async fn test_expiration() {
951 let cache = LruCacheWithTtl::new("test_expiration", 1);
952
953 tokio::time::pause();
954 let expiration = Instant::now() + Duration::from_secs(1);
955 cache.insert(0, 0, expiration).await;
956
957 cache.get(&0).expect("still in cache");
958 tokio::time::advance(Duration::from_secs(2)).await;
959 assert!(cache.get(&0).is_none(), "evicted due to ttl");
960 }
961
962 #[test(tokio::test)]
963 async fn test_over_capacity_slow_resolve() {
964 let cache = Arc::new(LruCacheWithTtl::<String, u64>::new(
965 "test_over_capacity_slow_resolve",
966 1,
967 ));
968
969 let mut foos = vec![];
970 for idx in 0..2 {
971 let cache = cache.clone();
972 foos.push(tokio::spawn(async move {
973 eprintln!("spawned task {idx} is running");
974 cache
975 .get_or_try_insert(&"foo".to_string(), |_| Duration::from_secs(86400), async {
976 if idx == 0 {
977 eprintln!("foo {idx} getter sleeping");
978 tokio::time::sleep(Duration::from_secs(300)).await;
979 }
980 eprintln!("foo {idx} getter done");
981 Ok::<_, anyhow::Error>(idx)
982 })
983 .await
984 }));
985 }
986
987 tokio::task::yield_now().await;
988
989 eprintln!("calling again with immediate getter");
990 let result = cache
991 .get_or_try_insert(&"bar".to_string(), |_| Duration::from_secs(60), async {
992 eprintln!("bar immediate getter running");
993 Ok::<_, anyhow::Error>(42)
994 })
995 .await
996 .unwrap();
997
998 assert_eq!(result.item, 42);
999 assert_eq!(cache.inner.cache.len(), 1);
1000
1001 eprintln!("aborting first one");
1002 foos.remove(0).abort();
1003
1004 eprintln!("try new key");
1005 let result = cache
1006 .get_or_try_insert(&"baz".to_string(), |_| Duration::from_secs(60), async {
1007 eprintln!("baz immediate getter running");
1008 Ok::<_, anyhow::Error>(32)
1009 })
1010 .await
1011 .unwrap();
1012 assert_eq!(result.item, 32);
1013 assert_eq!(cache.inner.cache.len(), 1);
1014
1015 eprintln!("waiting second one");
1016 assert_eq!(1, foos.pop().unwrap().await.unwrap().unwrap().item);
1017
1018 assert_eq!(cache.inner.cache.len(), 1);
1019 }
1020}