lruttl/
lib.rs

1use crate::metrics::*;
2use dashmap::DashMap;
3use kumo_server_memory::subscribe_to_memory_status_changes_async;
4use parking_lot::Mutex;
5use prometheus::{IntCounter, IntGauge};
6use scopeguard::defer;
7use std::borrow::Borrow;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::sync::{Arc, LazyLock, Weak};
14use tokio::sync::Semaphore;
15use tokio::time::{timeout_at, Duration, Instant};
16pub use {linkme, paste};
17
18mod metrics;
19
20static CACHES: LazyLock<Mutex<Vec<Weak<dyn CachePurger + Send + Sync>>>> =
21    LazyLock::new(Mutex::default);
22
23struct Inner<K: Clone + Hash + Eq + Debug, V: Clone + Send + Sync + Debug> {
24    name: String,
25    tick: AtomicUsize,
26    capacity: AtomicUsize,
27    allow_stale_reads: AtomicBool,
28    cache: DashMap<K, Item<V>>,
29    lru_samples: AtomicUsize,
30    sema_timeout_milliseconds: AtomicUsize,
31    lookup_counter: IntCounter,
32    evict_counter: IntCounter,
33    expire_counter: IntCounter,
34    hit_counter: IntCounter,
35    miss_counter: IntCounter,
36    populate_counter: IntCounter,
37    insert_counter: IntCounter,
38    stale_counter: IntCounter,
39    error_counter: IntCounter,
40    wait_gauge: IntGauge,
41    size_gauge: IntGauge,
42}
43
44impl<
45        K: Clone + Debug + Send + Sync + Hash + Eq + 'static,
46        V: Clone + Debug + Send + Sync + 'static,
47    > Inner<K, V>
48{
49    pub fn clear(&self) -> usize {
50        let num_entries = self.cache.len();
51
52        // We don't simply clear all elements here, as any pending
53        // items will be trying to wait to coordinate; we need
54        // to aggressively close the semaphore and wake them all up
55        // before we remove those entries.
56        self.cache.retain(|_k, item| {
57            if let ItemState::Pending(sema) = &item.item {
58                // Force everyone to wakeup and error out
59                sema.close();
60            }
61            false
62        });
63
64        self.size_gauge.set(self.cache.len() as i64);
65        num_entries
66    }
67
68    /// Evict up to target entries.
69    ///
70    /// We use a probablistic approach to the LRU, because
71    /// it is challenging to safely thread the classic doubly-linked-list
72    /// through dashmap.
73    ///
74    /// target is bounded to half of number of selected samples, in
75    /// order to ensure that we don't randomly pick the newest element
76    /// from the set when under pressure.
77    ///
78    /// Redis uses a similar technique for its LRU as described
79    /// in <https://redis.io/docs/latest/develop/reference/eviction/#apx-lru>
80    /// which suggests that sampling 10 keys at random to them compare
81    /// their recency yields a reasonably close approximation to the
82    /// 100% precise LRU.
83    ///
84    /// Since we also support TTLs, we'll just go ahead and remove
85    /// any expired keys that show up in the sampled set.
86    pub fn evict_some(&self, target: usize) -> usize {
87        let now = Instant::now();
88
89        // Approximate (since it could change immediately after reading)
90        // cache size
91        let cache_size = self.cache.len();
92        // How many keys to sample
93        let num_samples = self.lru_samples.load(Ordering::Relaxed).min(cache_size);
94
95        // a list of keys which have expired
96        let mut expired_keys = vec![];
97        // a random selection of up to num_samples (key, tick) tuples
98        let mut samples = vec![];
99
100        // Pick some random keys.
101        // The rand crate has some helpers for working with iterators,
102        // but they appear to copy many elements into an internal buffer
103        // in order to make a selection, and we want to avoid directly
104        // considering every possible element because some users have
105        // very large capacity caches.
106        //
107        // The approach taken here is to produce a random list of iterator
108        // offsets so that we can skim across the iterator in a single
109        // pass and pull out a random selection of elements.
110        // The sample function provides a randomized list of indices that
111        // we can use for this; we need to sort it first, but the cost
112        // should be reasonably low as num_samples should be ~10 or so
113        // in the most common configuration.
114        {
115            let mut rng = rand::thread_rng();
116            let mut indices =
117                rand::seq::index::sample(&mut rng, cache_size, num_samples).into_vec();
118            indices.sort();
119            let mut iter = self.cache.iter();
120            let mut current_idx = 0;
121
122            /// Advance an iterator by skip_amount.
123            /// Ideally we'd use Iterator::advance_by for this, but at the
124            /// time of writing that method is nightly only.
125            /// Note that it also uses next() internally anyway
126            fn advance_by(iter: &mut impl Iterator, skip_amount: usize) {
127                for _ in 0..skip_amount {
128                    if iter.next().is_none() {
129                        return;
130                    }
131                }
132            }
133
134            for idx in indices {
135                // idx is the index we want to be on; we'll need to skip ahead
136                // by some number of slots based on the current one. skip_amount
137                // is that number.
138                let skip_amount = idx - current_idx;
139                advance_by(&mut iter, skip_amount);
140
141                match iter.next() {
142                    Some(map_entry) => {
143                        current_idx = idx + 1;
144                        let item = map_entry.value();
145                        match &item.item {
146                            ItemState::Pending(_) | ItemState::Refreshing { .. } => {
147                                // Cannot evict a pending lookup
148                            }
149                            ItemState::Present(_) | ItemState::Failed(_) => {
150                                if now >= item.expiration {
151                                    expired_keys.push(map_entry.key().clone());
152                                } else {
153                                    let last_tick = item.last_tick.load(Ordering::Relaxed);
154                                    samples.push((map_entry.key().clone(), last_tick));
155                                }
156                            }
157                        }
158                    }
159                    None => {
160                        break;
161                    }
162                }
163            }
164        }
165
166        let mut num_removed = 0;
167        for key in expired_keys {
168            // Sanity check that it is still expired before removing it,
169            // because it would be a shame to remove it if another actor
170            // has just updated it
171            let removed = self
172                .cache
173                .remove_if(&key, |_k, entry| now >= entry.expiration)
174                .is_some();
175            if removed {
176                tracing::trace!("{} expired {key:?}", self.name);
177                num_removed += 1;
178                self.expire_counter.inc();
179            }
180        }
181
182        // Since we're picking random elements, we want to ensure that
183        // we never pick the newest element from the set to evict because
184        // that is likely the wrong choice. We need enough samples to
185        // know that the lowest number we picked is representative
186        // of the eldest element in the map overall.
187        // We limit ourselves to half of the number of selected samples.
188        let target = target.min(samples.len() / 2).max(1);
189
190        // If we met our target, skip the extra work below
191        if num_removed >= target {
192            self.size_gauge.set(self.cache.len() as i64);
193            tracing::trace!("{} expired {num_removed} of target {target}", self.name);
194            return num_removed;
195        }
196
197        // Sort by ascending tick, which is equivalent to having the
198        // LRU within that set towards the front of the vec
199        samples.sort_by(|(_ka, tick_a), (_kb, tick_b)| tick_a.cmp(tick_b));
200
201        for (key, tick) in samples {
202            // Sanity check that the tick value is the same as we expect.
203            // If it has changed since we sampled it, then that element
204            // is no longer a good candidate for LRU eviction.
205            if self
206                .cache
207                .remove_if(&key, |_k, item| {
208                    item.last_tick.load(Ordering::Relaxed) == tick
209                })
210                .is_some()
211            {
212                tracing::debug!("{} evicted {key:?}", self.name);
213                num_removed += 1;
214                self.evict_counter.inc();
215                self.size_gauge.set(self.cache.len() as i64);
216                if num_removed >= target {
217                    return num_removed;
218                }
219            }
220        }
221
222        if num_removed == 0 {
223            tracing::debug!(
224                "{} did not find anything to evict, target was {target}",
225                self.name
226            );
227        }
228
229        tracing::trace!("{} removed {num_removed} of target {target}", self.name);
230
231        num_removed
232    }
233
234    /// Potentially make some progress to get back under
235    /// budget on the cache capacity
236    pub fn maybe_evict(&self) -> usize {
237        let cache_size = self.cache.len();
238        let capacity = self.capacity.load(Ordering::Relaxed);
239        if cache_size > capacity {
240            self.evict_some(cache_size - capacity)
241        } else {
242            0
243        }
244    }
245}
246
247trait CachePurger {
248    fn name(&self) -> &str;
249    fn purge(&self) -> usize;
250    fn process_expirations(&self) -> usize;
251    fn update_capacity(&self, capacity: usize);
252}
253
254impl<
255        K: Clone + Debug + Send + Sync + Hash + Eq + 'static,
256        V: Clone + Debug + Send + Sync + 'static,
257    > CachePurger for Inner<K, V>
258{
259    fn name(&self) -> &str {
260        &self.name
261    }
262    fn purge(&self) -> usize {
263        self.clear()
264    }
265    fn process_expirations(&self) -> usize {
266        let now = Instant::now();
267        let mut expired_keys = vec![];
268        for map_entry in self.cache.iter() {
269            let item = map_entry.value();
270            match &item.item {
271                ItemState::Pending(_) | ItemState::Refreshing { .. } => {
272                    // Cannot evict a pending lookup
273                }
274                ItemState::Present(_) | ItemState::Failed(_) => {
275                    if now >= item.expiration {
276                        expired_keys.push(map_entry.key().clone());
277                    }
278                }
279            }
280        }
281
282        let mut num_removed = 0;
283        for key in expired_keys {
284            // Sanity check that it is still expired before removing it,
285            // because it would be a shame to remove it if another actor
286            // has just updated it
287            let removed = self
288                .cache
289                .remove_if(&key, |_k, entry| now >= entry.expiration)
290                .is_some();
291            if removed {
292                num_removed += 1;
293                self.expire_counter.inc();
294                self.size_gauge.set(self.cache.len() as i64);
295            }
296        }
297
298        num_removed + self.maybe_evict()
299    }
300
301    fn update_capacity(&self, capacity: usize) {
302        self.capacity.store(capacity, Ordering::Relaxed);
303        // Bring it within capacity.
304        // At the time of writing this is a bit half-hearted,
305        // but we'll eventually trim down via ongoing process_expirations()
306        // calls
307        self.process_expirations();
308    }
309}
310
311fn all_caches() -> Vec<Arc<dyn CachePurger + Send + Sync>> {
312    let mut result = vec![];
313    let mut caches = CACHES.lock();
314    caches.retain(|entry| match entry.upgrade() {
315        Some(purger) => {
316            result.push(purger);
317            true
318        }
319        None => false,
320    });
321    result
322}
323
324pub fn purge_all_caches() {
325    let purgers = all_caches();
326
327    tracing::error!("purging {} caches", purgers.len());
328    for purger in purgers {
329        let name = purger.name();
330        let num_entries = purger.purge();
331        tracing::error!("cleared {num_entries} entries from cache {name}");
332    }
333}
334
335async fn prune_expired_caches() {
336    loop {
337        tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
338        let purgers = all_caches();
339
340        for p in purgers {
341            let n = p.process_expirations();
342            if n > 0 {
343                tracing::debug!("expired {n} entries from cache {}", p.name());
344            }
345        }
346    }
347}
348
349#[linkme::distributed_slice]
350pub static LRUTTL_VIVIFY: [fn() -> &'static str];
351
352/// Declare a cache as a static, and link it into the list of possible
353/// pre-defined caches.
354///
355/// Due to a limitation in implementation details, you must also add
356/// `linkme.workspace = true` to the manifest of the crate where you
357/// use this macro.
358#[macro_export]
359macro_rules! declare_cache {
360    ($vis:vis
361        static $sym:ident:
362        LruCacheWithTtl<$key:ty, $value:ty>::new($name:expr, $capacity:expr);
363    ) => {
364        $vis static $sym: ::std::sync::LazyLock<$crate::LruCacheWithTtl<$key, $value>> =
365            ::std::sync::LazyLock::new(
366                || $crate::LruCacheWithTtl::new($name, $capacity));
367
368        // Link into LRUTTL_VIVIFY
369        $crate::paste::paste! {
370            #[linkme::distributed_slice($crate::LRUTTL_VIVIFY)]
371            static [<VIVIFY_ $sym>]: fn() -> &'static str = || {
372                ::std::sync::LazyLock::force(&$sym);
373                $name
374            };
375        }
376    };
377}
378
379/// Ensure that all caches declared via declare_cache!
380/// have been instantiated and returns the set of names.
381fn vivify() {
382    LazyLock::force(&PREDEFINED_NAMES);
383}
384
385fn vivify_impl() -> HashSet<&'static str> {
386    let mut set = HashSet::new();
387
388    for vivify_func in LRUTTL_VIVIFY {
389        let name = vivify_func();
390        assert!(!set.contains(name), "duplicate cache name {name}");
391        set.insert(name);
392    }
393
394    set
395}
396
397static PREDEFINED_NAMES: LazyLock<HashSet<&'static str>> = LazyLock::new(vivify_impl);
398
399pub fn is_name_available(name: &str) -> bool {
400    !PREDEFINED_NAMES.contains(name)
401}
402
403/// Update the capacity value for a pre-defined cache
404pub fn set_cache_capacity(name: &str, capacity: usize) -> bool {
405    if !PREDEFINED_NAMES.contains(name) {
406        return false;
407    }
408    let caches = all_caches();
409    match caches.iter().find(|p| p.name() == name) {
410        Some(p) => {
411            p.update_capacity(capacity);
412            true
413        }
414        None => false,
415    }
416}
417
418pub fn spawn_memory_monitor() {
419    vivify();
420    tokio::spawn(purge_caches_on_memory_shortage());
421    tokio::spawn(prune_expired_caches());
422}
423
424async fn purge_caches_on_memory_shortage() {
425    tracing::debug!("starting memory monitor");
426    let mut memory_status = subscribe_to_memory_status_changes_async().await;
427    while let Ok(()) = memory_status.changed().await {
428        if kumo_server_memory::get_headroom() == 0 {
429            purge_all_caches();
430
431            // Wait a little bit so that we can debounce
432            // in the case where we're riding the cusp of
433            // the limit and would thrash the caches
434            tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
435        }
436    }
437}
438
439#[derive(Debug, Clone)]
440enum ItemState<V>
441where
442    V: Send,
443    V: Sync,
444{
445    Present(V),
446    Pending(Arc<Semaphore>),
447    Failed(Arc<anyhow::Error>),
448    Refreshing {
449        stale_value: V,
450        pending: Arc<Semaphore>,
451    },
452}
453
454#[derive(Debug)]
455struct Item<V>
456where
457    V: Send,
458    V: Sync,
459{
460    item: ItemState<V>,
461    expiration: Instant,
462    last_tick: AtomicUsize,
463}
464
465impl<V: Clone + Send + Sync> Clone for Item<V> {
466    fn clone(&self) -> Self {
467        Self {
468            item: self.item.clone(),
469            expiration: self.expiration,
470            last_tick: self.last_tick.load(Ordering::Relaxed).into(),
471        }
472    }
473}
474
475#[derive(Debug)]
476pub struct ItemLookup<V: Debug> {
477    /// A copy of the item
478    pub item: V,
479    /// If true, the get_or_try_insert operation populated the entry;
480    /// the operation was a cache miss
481    pub is_fresh: bool,
482    /// The instant at which this entry will expire
483    pub expiration: Instant,
484}
485
486pub struct LruCacheWithTtl<K: Clone + Debug + Hash + Eq, V: Clone + Debug + Send + Sync> {
487    inner: Arc<Inner<K, V>>,
488}
489
490impl<
491        K: Clone + Debug + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
492        V: Clone + Debug + Send + Sync + 'static,
493    > LruCacheWithTtl<K, V>
494{
495    pub fn new<S: Into<String>>(name: S, capacity: usize) -> Self {
496        let name = name.into();
497        let cache = DashMap::new();
498
499        let lookup_counter = CACHE_LOOKUP
500            .get_metric_with_label_values(&[&name])
501            .expect("failed to get counter");
502        let hit_counter = CACHE_HIT
503            .get_metric_with_label_values(&[&name])
504            .expect("failed to get counter");
505        let stale_counter = CACHE_STALE
506            .get_metric_with_label_values(&[&name])
507            .expect("failed to get counter");
508        let evict_counter = CACHE_EVICT
509            .get_metric_with_label_values(&[&name])
510            .expect("failed to get counter");
511        let expire_counter = CACHE_EXPIRE
512            .get_metric_with_label_values(&[&name])
513            .expect("failed to get counter");
514        let miss_counter = CACHE_MISS
515            .get_metric_with_label_values(&[&name])
516            .expect("failed to get counter");
517        let populate_counter = CACHE_POPULATED
518            .get_metric_with_label_values(&[&name])
519            .expect("failed to get counter");
520        let insert_counter = CACHE_INSERT
521            .get_metric_with_label_values(&[&name])
522            .expect("failed to get counter");
523        let error_counter = CACHE_ERROR
524            .get_metric_with_label_values(&[&name])
525            .expect("failed to get counter");
526        let wait_gauge = CACHE_WAIT
527            .get_metric_with_label_values(&[&name])
528            .expect("failed to get counter");
529        let size_gauge = CACHE_SIZE
530            .get_metric_with_label_values(&[&name])
531            .expect("failed to get counter");
532
533        let inner = Arc::new(Inner {
534            name,
535            cache,
536            tick: AtomicUsize::new(0),
537            allow_stale_reads: AtomicBool::new(false),
538            capacity: AtomicUsize::new(capacity),
539            lru_samples: AtomicUsize::new(10),
540            sema_timeout_milliseconds: AtomicUsize::new(120_000),
541            lookup_counter,
542            evict_counter,
543            expire_counter,
544            hit_counter,
545            miss_counter,
546            populate_counter,
547            error_counter,
548            wait_gauge,
549            insert_counter,
550            stale_counter,
551            size_gauge,
552        });
553
554        // Register with the global list of caches using a weak reference.
555        // We need to "erase" the K/V types in order to do that, so we
556        // use the CachePurger trait for this purpose.
557        {
558            let generic: Arc<dyn CachePurger + Send + Sync> = inner.clone();
559            CACHES.lock().push(Arc::downgrade(&generic));
560            tracing::debug!(
561                "registered cache {} with capacity {capacity}",
562                generic.name()
563            );
564        }
565
566        Self { inner }
567    }
568
569    fn allow_stale_reads(&self) -> bool {
570        self.inner.allow_stale_reads.load(Ordering::Relaxed)
571    }
572
573    pub fn set_allow_stale_reads(&self, value: bool) {
574        self.inner.allow_stale_reads.store(value, Ordering::Relaxed);
575    }
576
577    pub fn set_sema_timeout(&self, duration: Duration) {
578        self.inner
579            .sema_timeout_milliseconds
580            .store(duration.as_millis() as usize, Ordering::Relaxed);
581    }
582
583    pub fn clear(&self) -> usize {
584        self.inner.clear()
585    }
586
587    fn inc_tick(&self) -> usize {
588        self.inner.tick.fetch_add(1, Ordering::Relaxed) + 1
589    }
590
591    fn update_tick(&self, item: &Item<V>) {
592        let v = self.inc_tick();
593        item.last_tick.store(v, Ordering::Relaxed);
594    }
595
596    pub fn lookup<Q: ?Sized>(&self, name: &Q) -> Option<ItemLookup<V>>
597    where
598        K: Borrow<Q>,
599        Q: Hash + Eq,
600    {
601        self.inner.lookup_counter.inc();
602        match self.inner.cache.get_mut(name) {
603            None => {
604                self.inner.miss_counter.inc();
605                None
606            }
607            Some(entry) => {
608                match &entry.item {
609                    ItemState::Present(item) => {
610                        let now = Instant::now();
611                        if now >= entry.expiration {
612                            // Expired
613                            if self.allow_stale_reads() {
614                                // We don't furnish a result directly, but we
615                                // also do not want to remove it from the map
616                                // at this stage.
617                                // We're assuming that lookup() is called only
618                                // via get_or_try_insert when allow_stale_reads
619                                // is enabled.
620                                self.inner.miss_counter.inc();
621                                return None;
622                            }
623
624                            // otherwise: remove it from the map.
625                            // Take care to drop our ref first so that we don't
626                            // self-deadlock
627                            drop(entry);
628                            if self
629                                .inner
630                                .cache
631                                .remove_if(name, |_k, entry| now >= entry.expiration)
632                                .is_some()
633                            {
634                                self.inner.expire_counter.inc();
635                                self.inner.size_gauge.set(self.inner.cache.len() as i64);
636                            }
637                            self.inner.miss_counter.inc();
638                            return None;
639                        }
640                        self.inner.hit_counter.inc();
641                        self.update_tick(&entry);
642                        Some(ItemLookup {
643                            item: item.clone(),
644                            expiration: entry.expiration,
645                            is_fresh: false,
646                        })
647                    }
648                    ItemState::Refreshing { .. } | ItemState::Pending(_) | ItemState::Failed(_) => {
649                        self.inner.miss_counter.inc();
650                        None
651                    }
652                }
653            }
654        }
655    }
656
657    pub fn get<Q: ?Sized>(&self, name: &Q) -> Option<V>
658    where
659        K: Borrow<Q>,
660        Q: Hash + Eq,
661    {
662        self.lookup(name).map(|lookup| lookup.item)
663    }
664
665    pub async fn insert(&self, name: K, item: V, expiration: Instant) -> V {
666        self.inner.cache.insert(
667            name,
668            Item {
669                item: ItemState::Present(item.clone()),
670                expiration,
671                last_tick: self.inc_tick().into(),
672            },
673        );
674
675        self.inner.insert_counter.inc();
676        self.inner.size_gauge.set(self.inner.cache.len() as i64);
677        self.inner.maybe_evict();
678
679        item
680    }
681
682    fn clone_item_state(&self, name: &K) -> (ItemState<V>, Instant) {
683        let mut is_new = false;
684        let mut entry = self.inner.cache.entry(name.clone()).or_insert_with(|| {
685            is_new = true;
686            Item {
687                item: ItemState::Pending(Arc::new(Semaphore::new(1))),
688                expiration: Instant::now() + Duration::from_secs(60),
689                last_tick: self.inc_tick().into(),
690            }
691        });
692
693        match &entry.value().item {
694            ItemState::Pending(sema) => {
695                if sema.is_closed() {
696                    entry.value_mut().item = ItemState::Pending(Arc::new(Semaphore::new(1)));
697                }
698            }
699            ItemState::Refreshing {
700                stale_value,
701                pending,
702            } => {
703                if pending.is_closed() {
704                    entry.value_mut().item = ItemState::Refreshing {
705                        stale_value: stale_value.clone(),
706                        pending: Arc::new(Semaphore::new(1)),
707                    };
708                }
709            }
710            ItemState::Present(item) => {
711                let now = Instant::now();
712                if now >= entry.expiration {
713                    // Expired; we will need to fetch it
714                    let pending = Arc::new(Semaphore::new(1));
715                    if self.allow_stale_reads() {
716                        entry.value_mut().item = ItemState::Refreshing {
717                            stale_value: item.clone(),
718                            pending,
719                        };
720                    } else {
721                        entry.value_mut().item = ItemState::Pending(pending);
722                    }
723                }
724            }
725            ItemState::Failed(_) => {
726                let now = Instant::now();
727                if now >= entry.expiration {
728                    // Expired; we will need to fetch it
729                    entry.value_mut().item = ItemState::Pending(Arc::new(Semaphore::new(1)));
730                }
731            }
732        }
733
734        self.update_tick(&entry);
735        let item = entry.value();
736        let result = (item.item.clone(), entry.expiration);
737        drop(entry);
738
739        if is_new {
740            self.inner.size_gauge.set(self.inner.cache.len() as i64);
741            self.inner.maybe_evict();
742        }
743
744        result
745    }
746
747    /// Get an existing item, but if that item doesn't already exist,
748    /// execute the future `fut` to provide a value that will be inserted and then
749    /// returned.  This is done atomically wrt. other callers.
750    /// The TTL parameter is a function that can extract the TTL from the value type,
751    /// or just return a constant TTL.
752    pub async fn get_or_try_insert<E: Into<anyhow::Error>, TTL: FnOnce(&V) -> Duration>(
753        &self,
754        name: &K,
755        ttl_func: TTL,
756        fut: impl Future<Output = Result<V, E>>,
757    ) -> Result<ItemLookup<V>, Arc<anyhow::Error>> {
758        // Fast path avoids cloning the key
759        if let Some(entry) = self.lookup(name) {
760            return Ok(entry);
761        }
762
763        let timeout_duration = Duration::from_millis(
764            self.inner.sema_timeout_milliseconds.load(Ordering::Relaxed) as u64,
765        );
766        let start = Instant::now();
767        let deadline = start + timeout_duration;
768
769        // Note: the lookup call increments lookup_counter and miss_counter
770        'retry: loop {
771            let (stale_value, sema) = match self.clone_item_state(name) {
772                (ItemState::Present(item), expiration) => {
773                    return Ok(ItemLookup {
774                        item,
775                        expiration,
776                        is_fresh: false,
777                    });
778                }
779                (ItemState::Failed(error), _) => {
780                    return Err(error);
781                }
782                (
783                    ItemState::Refreshing {
784                        stale_value,
785                        pending,
786                    },
787                    expiration,
788                ) => (Some((stale_value, expiration)), pending),
789                (ItemState::Pending(sema), _) => (None, sema),
790            };
791
792            let wait_result = {
793                self.inner.wait_gauge.inc();
794                defer! {
795                    self.inner.wait_gauge.dec();
796                }
797
798                match timeout_at(deadline, sema.acquire_owned()).await {
799                    Err(_) => {
800                        if let Some((item, expiration)) = stale_value {
801                            tracing::debug!(
802                                "{} semaphore acquire for {name:?} timed out after \
803                                {timeout_duration:?}, allowing stale value to satisfy the lookup",
804                                self.inner.name
805                            );
806                            self.inner.stale_counter.inc();
807                            return Ok(ItemLookup {
808                                item,
809                                expiration,
810                                is_fresh: false,
811                            });
812                        }
813                        tracing::debug!(
814                            "{} semaphore acquire for {name:?} timed out after \
815                                {timeout_duration:?}, returning error",
816                            self.inner.name
817                        );
818
819                        self.inner.error_counter.inc();
820                        return Err(Arc::new(anyhow::anyhow!(
821                            "{} lookup for {name:?} \
822                            timed out after {timeout_duration:?} \
823                            on semaphore acquire while waiting for cache to populate",
824                            self.inner.name
825                        )));
826                    }
827                    Ok(r) => r,
828                }
829            };
830
831            // While we slept, someone else may have satisfied
832            // the lookup; check it
833            let current_sema = match self.clone_item_state(name) {
834                (ItemState::Present(item), expiration) => {
835                    return Ok(ItemLookup {
836                        item,
837                        expiration,
838                        is_fresh: false,
839                    });
840                }
841                (ItemState::Failed(error), _) => {
842                    self.inner.hit_counter.inc();
843                    return Err(error);
844                }
845                (
846                    ItemState::Refreshing {
847                        stale_value: _,
848                        pending,
849                    },
850                    _,
851                ) => pending,
852                (ItemState::Pending(current_sema), _) => current_sema,
853            };
854
855            // It's still outstanding
856            match wait_result {
857                Ok(permit) => {
858                    // We're responsible for resolving it.
859                    // We will always close the semaphore when
860                    // we're done with this logic (and when we unwind
861                    // or are cancelled) so that we can wake up any
862                    // waiters.
863                    // We use defer! for this so that if we are cancelled
864                    // at the await point below, others are still woken up.
865                    defer! {
866                        permit.semaphore().close();
867                    }
868
869                    if !Arc::ptr_eq(&current_sema, permit.semaphore()) {
870                        self.inner.error_counter.inc();
871                        tracing::warn!(
872                            "{} mismatched semaphores for {name:?}, \
873                                    will restart cache resolve.",
874                            self.inner.name
875                        );
876                        continue 'retry;
877                    }
878
879                    self.inner.populate_counter.inc();
880                    let mut ttl = Duration::from_secs(60);
881                    let future_result = fut.await;
882                    let now = Instant::now();
883
884                    let (item_result, return_value) = match future_result {
885                        Ok(item) => {
886                            ttl = ttl_func(&item);
887                            (
888                                ItemState::Present(item.clone()),
889                                Ok(ItemLookup {
890                                    item,
891                                    expiration: now + ttl,
892                                    is_fresh: true,
893                                }),
894                            )
895                        }
896                        Err(err) => {
897                            self.inner.error_counter.inc();
898                            let err = Arc::new(err.into());
899                            (ItemState::Failed(err.clone()), Err(err))
900                        }
901                    };
902
903                    self.inner.cache.insert(
904                        name.clone(),
905                        Item {
906                            item: item_result,
907                            expiration: Instant::now() + ttl,
908                            last_tick: self.inc_tick().into(),
909                        },
910                    );
911                    self.inner.maybe_evict();
912                    return return_value;
913                }
914                Err(_) => {
915                    self.inner.error_counter.inc();
916
917                    // semaphore was closed, but the status is
918                    // still somehow pending
919                    tracing::debug!(
920                        "{} lookup for {name:?} woke up semaphores \
921                                but is still marked pending, \
922                                will restart cache lookup",
923                        self.inner.name
924                    );
925                    continue 'retry;
926                }
927            }
928        }
929    }
930}
931
932#[cfg(test)]
933mod test {
934    use super::*;
935    use test_log::test; // run with RUST_LOG=lruttl=trace to trace
936
937    #[test(tokio::test)]
938    async fn test_capacity() {
939        let cache = LruCacheWithTtl::new("test_capacity", 40);
940
941        let expiration = Instant::now() + Duration::from_secs(60);
942        for i in 0..100 {
943            cache.insert(i, i, expiration).await;
944        }
945
946        assert_eq!(cache.inner.cache.len(), 40, "capacity is respected");
947    }
948
949    #[test(tokio::test)]
950    async fn test_expiration() {
951        let cache = LruCacheWithTtl::new("test_expiration", 1);
952
953        tokio::time::pause();
954        let expiration = Instant::now() + Duration::from_secs(1);
955        cache.insert(0, 0, expiration).await;
956
957        cache.get(&0).expect("still in cache");
958        tokio::time::advance(Duration::from_secs(2)).await;
959        assert!(cache.get(&0).is_none(), "evicted due to ttl");
960    }
961
962    #[test(tokio::test)]
963    async fn test_over_capacity_slow_resolve() {
964        let cache = Arc::new(LruCacheWithTtl::<String, u64>::new(
965            "test_over_capacity_slow_resolve",
966            1,
967        ));
968
969        let mut foos = vec![];
970        for idx in 0..2 {
971            let cache = cache.clone();
972            foos.push(tokio::spawn(async move {
973                eprintln!("spawned task {idx} is running");
974                cache
975                    .get_or_try_insert(&"foo".to_string(), |_| Duration::from_secs(86400), async {
976                        if idx == 0 {
977                            eprintln!("foo {idx} getter sleeping");
978                            tokio::time::sleep(Duration::from_secs(300)).await;
979                        }
980                        eprintln!("foo {idx} getter done");
981                        Ok::<_, anyhow::Error>(idx)
982                    })
983                    .await
984            }));
985        }
986
987        tokio::task::yield_now().await;
988
989        eprintln!("calling again with immediate getter");
990        let result = cache
991            .get_or_try_insert(&"bar".to_string(), |_| Duration::from_secs(60), async {
992                eprintln!("bar immediate getter running");
993                Ok::<_, anyhow::Error>(42)
994            })
995            .await
996            .unwrap();
997
998        assert_eq!(result.item, 42);
999        assert_eq!(cache.inner.cache.len(), 1);
1000
1001        eprintln!("aborting first one");
1002        foos.remove(0).abort();
1003
1004        eprintln!("try new key");
1005        let result = cache
1006            .get_or_try_insert(&"baz".to_string(), |_| Duration::from_secs(60), async {
1007                eprintln!("baz immediate getter running");
1008                Ok::<_, anyhow::Error>(32)
1009            })
1010            .await
1011            .unwrap();
1012        assert_eq!(result.item, 32);
1013        assert_eq!(cache.inner.cache.len(), 1);
1014
1015        eprintln!("waiting second one");
1016        assert_eq!(1, foos.pop().unwrap().await.unwrap().unwrap().item);
1017
1018        assert_eq!(cache.inner.cache.len(), 1);
1019    }
1020}