spool/
rocks.rs

1use crate::{
2    Spool, SpoolBackpressureTimeout, SpoolCallerDeadlineExceeded, SpoolEntry, SpoolId,
3    SpoolUnhealthyError,
4};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use flume::Sender;
8use kumo_prometheus::declare_metric;
9use rocksdb::perf::get_memory_usage_stats;
10use rocksdb::properties::{
11    ACTUAL_DELAYED_WRITE_RATE, BACKGROUND_ERRORS, COMPACTION_PENDING,
12    ESTIMATE_PENDING_COMPACTION_BYTES, IS_WRITE_STOPPED, NUM_RUNNING_COMPACTIONS,
13};
14use rocksdb::{
15    BottommostLevelCompaction, CompactOptions, DBCompressionType, ErrorKind, IteratorMode,
16    LogLevel, Options, WaitForCompactOptions, WriteBatch, WriteOptions, DB,
17};
18use serde::{Deserialize, Serialize};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23use tokio::runtime::Handle;
24use tokio::sync::Semaphore;
25use tokio::time::{sleep, timeout_at};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct RocksSpoolParams {
29    pub increase_parallelism: Option<i32>,
30
31    pub optimize_level_style_compaction: Option<usize>,
32    pub optimize_universal_style_compaction: Option<usize>,
33    #[serde(default)]
34    pub paranoid_checks: bool,
35    #[serde(default)]
36    pub compression_type: DBCompressionTypeDef,
37
38    /// If non-zero, we perform bigger reads when doing compaction. If you’re running RocksDB on
39    /// spinning disks, you should set this to at least 2MB. That way RocksDB’s compaction is doing
40    /// sequential instead of random reads
41    pub compaction_readahead_size: Option<usize>,
42
43    #[serde(default)]
44    pub level_compaction_dynamic_level_bytes: bool,
45
46    #[serde(default)]
47    pub max_open_files: Option<usize>,
48
49    /// Size in bytes of the rocksdb memtable that buffers writes before
50    /// being flushed to disk as a new SST file.
51    ///
52    /// Smaller values produce smaller, more frequent SST files and
53    /// trigger compactions sooner -- useful in test setups that need
54    /// to force the storage through its full write/compact lifecycle
55    /// quickly.  Larger values amortize compaction overhead but
56    /// increase memory use and recovery time after restart.  Leave
57    /// unset to use the rocksdb default.
58    #[serde(default)]
59    pub write_buffer_size: Option<usize>,
60
61    /// Number of level-0 SST files at which rocksdb will stop
62    /// accepting writes.  Lower values transition the database into
63    /// the write-stopped state more quickly when background
64    /// compaction cannot keep up, which is useful for tests that
65    /// need to deterministically observe that condition.  Leave
66    /// unset to use the rocksdb default.
67    #[serde(default)]
68    pub level0_stop_writes_trigger: Option<i32>,
69
70    #[serde(default)]
71    pub log_level: LogLevelDef,
72
73    /// See:
74    /// <https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html#method.set_memtable_huge_page_size>
75    #[serde(default)]
76    pub memtable_huge_page_size: Option<usize>,
77
78    #[serde(
79        with = "duration_serde",
80        default = "RocksSpoolParams::default_log_file_time_to_roll"
81    )]
82    pub log_file_time_to_roll: Duration,
83
84    #[serde(
85        with = "duration_serde",
86        default = "RocksSpoolParams::default_obsolete_files_period"
87    )]
88    pub obsolete_files_period: Duration,
89
90    #[serde(default)]
91    pub limit_concurrent_stores: Option<usize>,
92    #[serde(default)]
93    pub limit_concurrent_loads: Option<usize>,
94    #[serde(default)]
95    pub limit_concurrent_removes: Option<usize>,
96
97    /// Upper bound on the wait that `store()` and `remove()` will
98    /// tolerate when rocksdb is applying backpressure.  Callers may
99    /// provide a shorter deadline (typically derived from an SMTP
100    /// client's idle timeout); the effective deadline is the minimum
101    /// of the two.  Going longer than the caller-provided value risks
102    /// the client timing out and retrying, which would produce
103    /// duplicate deliveries -- this option therefore only narrows the
104    /// effective deadline, it never extends it.
105    #[serde(
106        with = "duration_serde",
107        default = "RocksSpoolParams::default_store_deadline"
108    )]
109    pub store_deadline: Duration,
110
111    /// How long the composite "this database is wedged" signal must
112    /// hold continuously before the load-shedding gate latches.
113    /// The signal goes high whenever the rocksdb
114    /// `background-errors` counter has grown above the value
115    /// observed at process start, or any foreground spool operation
116    /// has returned a rocksdb error since process start.  Brief
117    /// blips that recover within this window do not latch the gate.
118    #[serde(
119        with = "duration_serde",
120        default = "RocksSpoolParams::default_error_latch_duration"
121    )]
122    pub error_latch_duration: Duration,
123
124    /// How long the healthy state must hold continuously before the
125    /// load-shedding gate auto-unlatches.  Only consulted when
126    /// `allow_error_unlatch` is true.  A relatively long value
127    /// (minutes) gives operators time to inspect the database after a
128    /// brief failure window before the daemon starts accepting writes
129    /// again on its own.
130    #[serde(
131        with = "duration_serde",
132        default = "RocksSpoolParams::default_error_unlatch_duration"
133    )]
134    pub error_unlatch_duration: Duration,
135
136    /// When true (the default), the load-shedding gate clears itself
137    /// after `error_unlatch_duration` of observed recovery.  Set to
138    /// false to require an operator restart to clear the gate, which
139    /// is appropriate when you want a human to confirm the underlying
140    /// cause is resolved before accepting traffic again.
141    #[serde(default = "RocksSpoolParams::default_allow_error_unlatch")]
142    pub allow_error_unlatch: bool,
143}
144
145impl Default for RocksSpoolParams {
146    fn default() -> Self {
147        Self {
148            increase_parallelism: None,
149            optimize_level_style_compaction: None,
150            optimize_universal_style_compaction: None,
151            paranoid_checks: false,
152            compression_type: DBCompressionTypeDef::default(),
153            compaction_readahead_size: None,
154            level_compaction_dynamic_level_bytes: false,
155            max_open_files: None,
156            write_buffer_size: None,
157            level0_stop_writes_trigger: None,
158            log_level: LogLevelDef::default(),
159            memtable_huge_page_size: None,
160            log_file_time_to_roll: Self::default_log_file_time_to_roll(),
161            obsolete_files_period: Self::default_obsolete_files_period(),
162            limit_concurrent_stores: None,
163            limit_concurrent_loads: None,
164            limit_concurrent_removes: None,
165            store_deadline: Self::default_store_deadline(),
166            error_latch_duration: Self::default_error_latch_duration(),
167            error_unlatch_duration: Self::default_error_unlatch_duration(),
168            allow_error_unlatch: Self::default_allow_error_unlatch(),
169        }
170    }
171}
172
173impl RocksSpoolParams {
174    fn default_log_file_time_to_roll() -> Duration {
175        Duration::from_secs(86400)
176    }
177
178    fn default_obsolete_files_period() -> Duration {
179        Duration::from_secs(6 * 60 * 60)
180    }
181
182    fn default_store_deadline() -> Duration {
183        Duration::from_secs(30)
184    }
185
186    fn default_error_latch_duration() -> Duration {
187        Duration::from_secs(15)
188    }
189
190    fn default_error_unlatch_duration() -> Duration {
191        Duration::from_secs(5 * 60)
192    }
193
194    fn default_allow_error_unlatch() -> bool {
195        true
196    }
197}
198
199#[derive(Serialize, Deserialize, Debug)]
200pub enum DBCompressionTypeDef {
201    None,
202    Snappy,
203    Zlib,
204    Bz2,
205    Lz4,
206    Lz4hc,
207    Zstd,
208}
209
210impl From<DBCompressionTypeDef> for DBCompressionType {
211    fn from(val: DBCompressionTypeDef) -> Self {
212        match val {
213            DBCompressionTypeDef::None => DBCompressionType::None,
214            DBCompressionTypeDef::Snappy => DBCompressionType::Snappy,
215            DBCompressionTypeDef::Zlib => DBCompressionType::Zlib,
216            DBCompressionTypeDef::Bz2 => DBCompressionType::Bz2,
217            DBCompressionTypeDef::Lz4 => DBCompressionType::Lz4,
218            DBCompressionTypeDef::Lz4hc => DBCompressionType::Lz4hc,
219            DBCompressionTypeDef::Zstd => DBCompressionType::Zstd,
220        }
221    }
222}
223
224impl Default for DBCompressionTypeDef {
225    fn default() -> Self {
226        Self::Snappy
227    }
228}
229
230#[derive(Serialize, Deserialize, Debug)]
231pub enum LogLevelDef {
232    Debug,
233    Info,
234    Warn,
235    Error,
236    Fatal,
237    Header,
238}
239
240impl Default for LogLevelDef {
241    fn default() -> Self {
242        Self::Info
243    }
244}
245
246impl From<LogLevelDef> for LogLevel {
247    fn from(val: LogLevelDef) -> Self {
248        match val {
249            LogLevelDef::Debug => LogLevel::Debug,
250            LogLevelDef::Info => LogLevel::Info,
251            LogLevelDef::Warn => LogLevel::Warn,
252            LogLevelDef::Error => LogLevel::Error,
253            LogLevelDef::Fatal => LogLevel::Fatal,
254            LogLevelDef::Header => LogLevel::Header,
255        }
256    }
257}
258
259pub struct RocksSpool {
260    db: Arc<DB>,
261    runtime: Handle,
262    limit_concurrent_stores: Option<Arc<Semaphore>>,
263    limit_concurrent_loads: Option<Arc<Semaphore>>,
264    limit_concurrent_removes: Option<Arc<Semaphore>>,
265    /// Latched load-shedding gate driven by `metrics_monitor`.  When
266    /// set, foreground `store()`/`remove()` calls return an error
267    /// instead of waiting on rocksdb backpressure, and the ingress
268    /// paths refuse new traffic.  See `metrics_monitor` for the
269    /// composite signal that drives latch/unlatch transitions.
270    load_shed_active: Arc<AtomicBool>,
271    /// Count of foreground spool operations (load, enumerate,
272    /// store, remove) that have failed with a rocksdb error
273    /// since the last auto-unlatch (or since process start if no
274    /// auto-unlatch has happened).  `metrics_monitor` reads this
275    /// to detect failure modes that do not surface as background
276    /// errors -- notably a missing SST discovered during a
277    /// `get()`, which the C++ side reports to the caller but does
278    /// not feed into `rocksdb.background-errors`.  Reset to 0 by
279    /// the auto-unlatch path's compare-exchange so a subsequent
280    /// blip can be observed as fresh growth.
281    foreground_errors: Arc<AtomicU64>,
282    store_deadline: Duration,
283}
284
285/// Initial sleep interval for the `store`/`remove` backpressure loop.
286/// Chosen low enough that brief, sub-millisecond memtable backpressure
287/// is caught with negligible added latency on the slow path.
288const BACKOFF_INITIAL: Duration = Duration::from_micros(500);
289/// Upper bound on the backpressure loop sleep interval.  Keeps the
290/// load-shedding gate observable within a bounded window even during
291/// a long wedge.
292const BACKOFF_MAX: Duration = Duration::from_millis(50);
293
294impl RocksSpool {
295    /// Driver shared by `store()` and `remove()`.  Issues `write_opt`
296    /// with `no_slowdown=true` repeatedly with exponential backoff
297    /// until one of: the write succeeds, the effective deadline is
298    /// reached, the load-shedding gate latches, or rocksdb returns a
299    /// non-`Incomplete` error.
300    ///
301    /// This replaces an earlier design that used `spawn_blocking` with
302    /// `no_slowdown=false`.  That approach could not be cancelled, held
303    /// a blocking-pool worker per stalled call (risking pool
304    /// exhaustion during a wedge), and could not observe the latched
305    /// load-shedding gate.  The polling design preserves write
306    /// atomicity -- each iteration is a single rocksdb batch write,
307    /// which is atomic by construction -- while restoring
308    /// cancellation, gate observability, and bounded resource use.
309    async fn write_with_backpressure(
310        &self,
311        opts: WriteOptions,
312        caller_deadline: Option<Instant>,
313        permits: Option<Arc<Semaphore>>,
314        apply: impl Fn(&mut WriteBatch),
315    ) -> anyhow::Result<()> {
316        // Gate at the top so that the load-shedding mirror affects
317        // every store, not just those that happen to hit backpressure.
318        // A relaxed atomic load is essentially free compared to the
319        // rocksdb FFI write below; this preserves the healthy hot
320        // path's latency profile while giving the gate consistent
321        // semantics across the in-flight call sites that aren't
322        // covered by the per-connection ingress checks (notably,
323        // already-established SMTP connections doing new
324        // transactions).
325        if self.load_shed_active.load(Ordering::Relaxed) {
326            return Err(SpoolUnhealthyError.into());
327        }
328
329        let mut batch = WriteBatch::default();
330        apply(&mut batch);
331        match self.db.write_opt(batch, &opts) {
332            Ok(()) => return Ok(()),
333            Err(err) if err.kind() == ErrorKind::Incomplete => {}
334            Err(err) => {
335                record_foreground_error(
336                    &self.foreground_errors,
337                    &self.load_shed_active,
338                    self.db.path(),
339                    &err,
340                );
341                return Err(err.into());
342            }
343        }
344
345        let spool_deadline = Instant::now() + self.store_deadline;
346        // Decide upfront which side's deadline wins, so both the
347        // semaphore-acquisition timeout and the backpressure-loop
348        // timeout can surface the matching typed error.  Without
349        // this, the SMTP layer cannot tell a caller-provided
350        // `data_processing_timeout` from the spool's own
351        // `store_deadline` and would mis-label the wire response.
352        let (effective_deadline, caller_wins) = match caller_deadline {
353            Some(c) if c < spool_deadline => (c, true),
354            _ => (spool_deadline, false),
355        };
356        let timeout_err = || -> anyhow::Error {
357            if caller_wins {
358                SpoolCallerDeadlineExceeded.into()
359            } else {
360                SpoolBackpressureTimeout {
361                    deadline: self.store_deadline,
362                }
363                .into()
364            }
365        };
366
367        let _permit = match permits {
368            Some(s) => match timeout_at(effective_deadline.into(), s.acquire_owned()).await {
369                Ok(r) => Some(r?),
370                Err(_) => return Err(timeout_err()),
371            },
372            None => None,
373        };
374
375        let mut backoff = BACKOFF_INITIAL;
376        loop {
377            if self.load_shed_active.load(Ordering::Relaxed) {
378                return Err(SpoolUnhealthyError.into());
379            }
380            if Instant::now() >= effective_deadline {
381                // Sustained backpressure for the full deadline is
382                // itself a useful signal that the spool may be
383                // unhealthy.  Feed it into the foreground error
384                // machinery so the debounced latch path can react if
385                // we see this repeatedly; an occasional one-off
386                // (e.g. a brief load spike) gets washed out by the
387                // `error_latch_duration` window.  We do not
388                // immediate-latch because no rocksdb error has been
389                // returned -- the inability to make progress is
390                // ambiguous, not definitively bad.
391                self.foreground_errors.fetch_add(1, Ordering::Relaxed);
392                return Err(timeout_err());
393            }
394            sleep(backoff).await;
395            backoff = (backoff * 2).min(BACKOFF_MAX);
396
397            let mut batch = WriteBatch::default();
398            apply(&mut batch);
399            match self.db.write_opt(batch, &opts) {
400                Ok(()) => return Ok(()),
401                Err(err) if err.kind() == ErrorKind::Incomplete => continue,
402                Err(err) => {
403                    record_foreground_error(
404                        &self.foreground_errors,
405                        &self.load_shed_active,
406                        self.db.path(),
407                        &err,
408                    );
409                    return Err(err.into());
410                }
411            }
412        }
413    }
414
415    pub fn new(
416        path: &Path,
417        flush: bool,
418        params: Option<RocksSpoolParams>,
419        runtime: Handle,
420    ) -> anyhow::Result<Self> {
421        let mut opts = Options::default();
422        opts.set_use_fsync(flush);
423        opts.create_if_missing(true);
424        // The default is 1000, which is a bit high
425        opts.set_keep_log_file_num(10);
426
427        let p = params.unwrap_or_default();
428        if let Some(i) = p.increase_parallelism {
429            opts.increase_parallelism(i);
430        }
431        if let Some(i) = p.optimize_level_style_compaction {
432            opts.optimize_level_style_compaction(i);
433        }
434        if let Some(i) = p.optimize_universal_style_compaction {
435            opts.optimize_universal_style_compaction(i);
436        }
437        if let Some(i) = p.compaction_readahead_size {
438            opts.set_compaction_readahead_size(i);
439        }
440        if let Some(i) = p.max_open_files {
441            opts.set_max_open_files(i as _);
442        }
443        if let Some(i) = p.write_buffer_size {
444            opts.set_write_buffer_size(i);
445        }
446        if let Some(i) = p.level0_stop_writes_trigger {
447            opts.set_level_zero_stop_writes_trigger(i);
448        }
449        if let Some(i) = p.memtable_huge_page_size {
450            opts.set_memtable_huge_page_size(i);
451        }
452        opts.set_paranoid_checks(p.paranoid_checks);
453        opts.set_level_compaction_dynamic_level_bytes(p.level_compaction_dynamic_level_bytes);
454        opts.set_compression_type(p.compression_type.into());
455        opts.set_log_level(p.log_level.into());
456        opts.set_log_file_time_to_roll(p.log_file_time_to_roll.as_secs() as usize);
457        opts.set_delete_obsolete_files_period_micros(p.obsolete_files_period.as_micros() as u64);
458
459        let limit_concurrent_stores = p
460            .limit_concurrent_stores
461            .map(|n| Arc::new(Semaphore::new(n)));
462        let limit_concurrent_loads = p
463            .limit_concurrent_loads
464            .map(|n| Arc::new(Semaphore::new(n)));
465        let limit_concurrent_removes = p
466            .limit_concurrent_removes
467            .map(|n| Arc::new(Semaphore::new(n)));
468
469        let db = Arc::new(DB::open(&opts, path)?);
470        let load_shed_active = Arc::new(AtomicBool::new(false));
471        let foreground_errors = Arc::new(AtomicU64::new(0));
472        let store_deadline = p.store_deadline;
473
474        {
475            let weak_db = Arc::downgrade(&db);
476            let weak_mirror = Arc::downgrade(&load_shed_active);
477            let weak_fg_errors = Arc::downgrade(&foreground_errors);
478            tokio::spawn(metrics_monitor(
479                weak_db,
480                weak_mirror,
481                weak_fg_errors,
482                format!("{}", path.display()),
483                p.error_latch_duration,
484                p.error_unlatch_duration,
485                p.allow_error_unlatch,
486            ));
487        }
488
489        Ok(Self {
490            db,
491            runtime,
492            limit_concurrent_stores,
493            limit_concurrent_loads,
494            limit_concurrent_removes,
495            load_shed_active,
496            foreground_errors,
497            store_deadline,
498        })
499    }
500}
501
502#[async_trait]
503impl Spool for RocksSpool {
504    async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>> {
505        let permit = match self.limit_concurrent_loads.clone() {
506            Some(s) => Some(s.acquire_owned().await?),
507            None => None,
508        };
509        let db = self.db.clone();
510        let fg_errors = self.foreground_errors.clone();
511        let load_shed = self.load_shed_active.clone();
512        let db_path: PathBuf = self.db.path().to_owned();
513        tokio::task::Builder::new()
514            .name("rocksdb load")
515            .spawn_blocking_on(
516                move || {
517                    let result = match db.get(id.as_bytes()) {
518                        Ok(Some(v)) => v,
519                        Ok(None) => {
520                            drop(permit);
521                            anyhow::bail!("no such key {id}");
522                        }
523                        Err(err) => {
524                            // Rocksdb get errors (e.g. a missing SST
525                            // file discovered during the read) do not
526                            // increment rocksdb.background-errors.
527                            // Record them so the load-shedding gate
528                            // can react -- immediately for fatal
529                            // classes, or after debounce otherwise.
530                            record_foreground_error(&fg_errors, &load_shed, &db_path, &err);
531                            drop(permit);
532                            return Err(err.into());
533                        }
534                    };
535                    drop(permit);
536                    Ok(result)
537                },
538                &self.runtime,
539            )?
540            .await?
541    }
542
543    async fn store(
544        &self,
545        id: SpoolId,
546        data: Arc<Box<[u8]>>,
547        force_sync: bool,
548        deadline: Option<Instant>,
549    ) -> anyhow::Result<()> {
550        let mut opts = WriteOptions::default();
551        opts.set_sync(force_sync);
552        opts.set_no_slowdown(true);
553
554        self.write_with_backpressure(
555            opts,
556            deadline,
557            self.limit_concurrent_stores.clone(),
558            |batch| batch.put(id.as_bytes(), &*data),
559        )
560        .await
561    }
562
563    async fn remove(&self, id: SpoolId) -> anyhow::Result<()> {
564        let mut opts = WriteOptions::default();
565        opts.set_no_slowdown(true);
566
567        self.write_with_backpressure(opts, None, self.limit_concurrent_removes.clone(), |batch| {
568            batch.delete(id.as_bytes())
569        })
570        .await
571    }
572
573    async fn cleanup(&self) -> anyhow::Result<()> {
574        Ok(())
575    }
576
577    async fn compact(&self) -> anyhow::Result<()> {
578        let db = self.db.clone();
579        tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
580            db.flush()?;
581            // Force bottommost-level compaction so the entire keyspace
582            // is rewritten; without this, single-level layouts cause
583            // the call to be a no-op even when there are missing files
584            // that we'd want to surface as errors.
585            let mut opts = CompactOptions::default();
586            opts.set_bottommost_level_compaction(BottommostLevelCompaction::Force);
587            opts.set_exclusive_manual_compaction(true);
588            db.compact_range_opt::<&[u8], &[u8]>(None, None, &opts);
589            // compact_range itself does not return errors -- wait_for_compact
590            // does, and is what surfaces background failures (e.g. a
591            // missing SST encountered during compaction) to the caller.
592            let wait_opts = WaitForCompactOptions::default();
593            db.wait_for_compact(&wait_opts)?;
594            Ok(())
595        })
596        .await?
597    }
598
599    async fn shutdown(&self) -> anyhow::Result<()> {
600        let db = self.db.clone();
601        tokio::task::spawn_blocking(move || db.cancel_all_background_work(true)).await?;
602        Ok(())
603    }
604
605    fn unhealthy_reason(&self) -> Option<&'static str> {
606        if self.load_shed_active.load(Ordering::Relaxed) {
607            Some("the spool is not accepting writes")
608        } else {
609            None
610        }
611    }
612
613    async fn advise_low_memory(&self) -> anyhow::Result<isize> {
614        let db = self.db.clone();
615        tokio::task::spawn_blocking(move || {
616            let usage_before = match get_memory_usage_stats(Some(&[&db]), None) {
617                Ok(stats) => {
618                    let stats: Stats = stats.into();
619                    tracing::debug!("pre-flush: {stats:#?}");
620                    stats.total()
621                }
622                Err(err) => {
623                    tracing::error!("error getting stats: {err:#}");
624                    0
625                }
626            };
627
628            if let Err(err) = db.flush() {
629                tracing::error!("error flushing memory: {err:#}");
630            }
631
632            let usage_after = match get_memory_usage_stats(Some(&[&db]), None) {
633                Ok(stats) => {
634                    let stats: Stats = stats.into();
635                    tracing::debug!("post-flush: {stats:#?}");
636                    stats.total()
637                }
638                Err(err) => {
639                    tracing::error!("error getting stats: {err:#}");
640                    0
641                }
642            };
643
644            Ok(usage_before - usage_after)
645        })
646        .await?
647    }
648
649    fn enumerate(
650        &self,
651        sender: Sender<SpoolEntry>,
652        start_time: DateTime<Utc>,
653    ) -> anyhow::Result<()> {
654        let db = Arc::clone(&self.db);
655        let fg_errors = self.foreground_errors.clone();
656        let load_shed = self.load_shed_active.clone();
657        let db_path: PathBuf = self.db.path().to_owned();
658        tokio::task::Builder::new()
659            .name("rocksdb enumerate")
660            .spawn_blocking_on(
661                move || {
662                    let iter = db.iterator(IteratorMode::Start);
663                    for entry in iter {
664                        let (key, value) = match entry {
665                            Ok(e) => e,
666                            Err(err) => {
667                                // Iterator errors typically indicate a
668                                // missing or corrupt SST file
669                                // discovered while walking the
670                                // keyspace.  Feed into the foreground
671                                // error machinery so the gate latches
672                                // (immediately for IOError /
673                                // Corruption) and abort the
674                                // enumeration.
675                                record_foreground_error(&fg_errors, &load_shed, &db_path, &err);
676                                return Err(err.into());
677                            }
678                        };
679                        let id = SpoolId::from_slice(&key)
680                            .ok_or_else(|| anyhow::anyhow!("invalid spool id {key:?}"))?;
681
682                        if id.created() >= start_time {
683                            // Entries created since we started must have
684                            // landed there after we started and are thus
685                            // not eligible for discovery via enumeration
686                            continue;
687                        }
688
689                        sender
690                            .send(SpoolEntry::Item {
691                                id,
692                                data: value.to_vec(),
693                            })
694                            .map_err(|err| {
695                                anyhow::anyhow!("failed to send SpoolEntry for {id}: {err:#}")
696                            })?;
697                    }
698                    Ok::<(), anyhow::Error>(())
699                },
700                &self.runtime,
701            )?;
702        Ok(())
703    }
704}
705
706#[cfg(test)]
707mod test {
708    use super::*;
709
710    #[tokio::test]
711    async fn rocks_spool() -> anyhow::Result<()> {
712        let location = tempfile::tempdir()?;
713        let spool = RocksSpool::new(location.path(), false, None, Handle::current())?;
714
715        {
716            let id1 = SpoolId::new();
717
718            // Can't load an entry that doesn't exist
719            assert_eq!(
720                format!("{:#}", spool.load(id1).await.unwrap_err()),
721                format!("no such key {id1}")
722            );
723        }
724
725        // Insert some entries
726        let mut ids = vec![];
727        for i in 0..100 {
728            let id = SpoolId::new();
729            spool
730                .store(
731                    id,
732                    Arc::new(format!("I am {i}").as_bytes().to_vec().into_boxed_slice()),
733                    false,
734                    None,
735                )
736                .await?;
737            ids.push(id);
738        }
739
740        // Verify that we can load those entries
741        for (i, &id) in ids.iter().enumerate() {
742            let data = spool.load(id).await?;
743            let text = String::from_utf8(data)?;
744            assert_eq!(text, format!("I am {i}"));
745        }
746
747        {
748            // Verify that we can enumerate them
749            let (tx, rx) = flume::bounded(32);
750            spool.enumerate(tx, Utc::now())?;
751            let mut count = 0;
752
753            while let Ok(item) = rx.recv_async().await {
754                match item {
755                    SpoolEntry::Item { id, data } => {
756                        let i = ids
757                            .iter()
758                            .position(|&item| item == id)
759                            .ok_or_else(|| anyhow::anyhow!("{id} not found in ids!"))?;
760
761                        let text = String::from_utf8(data)?;
762                        assert_eq!(text, format!("I am {i}"));
763
764                        spool.remove(id).await?;
765                        // Can't load an entry that we just removed
766                        assert_eq!(
767                            format!("{:#}", spool.load(id).await.unwrap_err()),
768                            format!("no such key {id}")
769                        );
770                        count += 1;
771                    }
772                    SpoolEntry::Corrupt { id, error } => {
773                        anyhow::bail!("Corrupt: {id}: {error}");
774                    }
775                }
776            }
777
778            assert_eq!(count, 100);
779        }
780
781        // Now that we've removed the files, try enumerating again.
782        // We expect to receive no entries.
783        // Do it a couple of times to verify that none of the cleanup
784        // stuff that happens in enumerate breaks the directory
785        // structure
786        for _ in 0..2 {
787            // Verify that we can enumerate them
788            let (tx, rx) = flume::bounded(32);
789            spool.enumerate(tx, Utc::now())?;
790            let mut unexpected = vec![];
791
792            while let Ok(item) = rx.recv_async().await {
793                match item {
794                    SpoolEntry::Item { id, .. } | SpoolEntry::Corrupt { id, .. } => {
795                        unexpected.push(id)
796                    }
797                }
798            }
799
800            assert_eq!(unexpected.len(), 0);
801        }
802
803        Ok(())
804    }
805}
806
807/// The rocksdb type doesn't impl Debug, so we get to do it
808#[allow(unused)]
809#[derive(Debug)]
810struct Stats {
811    pub mem_table_total: u64,
812    pub mem_table_unflushed: u64,
813    pub mem_table_readers_total: u64,
814    pub cache_total: u64,
815}
816
817impl Stats {
818    fn total(&self) -> isize {
819        (self.mem_table_total + self.mem_table_readers_total + self.cache_total) as isize
820    }
821}
822
823impl From<rocksdb::perf::MemoryUsageStats> for Stats {
824    fn from(s: rocksdb::perf::MemoryUsageStats) -> Self {
825        Self {
826            mem_table_total: s.mem_table_total,
827            mem_table_unflushed: s.mem_table_unflushed,
828            mem_table_readers_total: s.mem_table_readers_total,
829            cache_total: s.cache_total,
830        }
831    }
832}
833
834/// Read an integer-valued rocksdb property, returning 0 if the property
835/// is missing or cannot be parsed.  Used for hot-path checks and metrics
836/// gathering; callers that want to distinguish "missing" from "zero"
837/// should call `property_int_value` directly.
838fn property_u64(db: &DB, name: &rocksdb::properties::PropName) -> u64 {
839    db.property_int_value(name).ok().flatten().unwrap_or(0)
840}
841
842/// Classify a rocksdb error returned from a foreground spool
843/// operation.  `Corruption` and `IOError` are returned by rocksdb
844/// when the underlying database state is observably wrong (a missing
845/// or corrupt SST file, a checksum mismatch, etc.) -- conditions
846/// that do not have any transient interpretation.  We use this to
847/// latch the load-shedding gate immediately rather than waiting out
848/// the normal debounce window.
849fn is_definitively_bad(err: &rocksdb::Error) -> bool {
850    matches!(err.kind(), ErrorKind::Corruption | ErrorKind::IOError)
851}
852
853/// Record a foreground spool error: always increment the counter so
854/// the metrics monitor can observe sustained-failure patterns, and
855/// for errors classified as definitively bad, latch the gate now.
856/// Logs once on the false-to-true transition of the gate.
857fn record_foreground_error(
858    fg_errors: &AtomicU64,
859    load_shed: &AtomicBool,
860    path: &Path,
861    err: &rocksdb::Error,
862) {
863    fg_errors.fetch_add(1, Ordering::Relaxed);
864    if is_definitively_bad(err) && !load_shed.swap(true, Ordering::Relaxed) {
865        tracing::error!(
866            "rocksdb at {}: fatal foreground error ({:?}); load-shedding \
867             gate latched immediately. Underlying error: {}",
868            path.display(),
869            err.kind(),
870            err.as_ref(),
871        );
872    }
873}
874
875declare_metric! {
876/// Approximate memory usage (bytes) of all the mem-tables.
877///
878/// This may be useful when understanding the memory usage of
879/// the system.
880static MEM_TABLE_TOTAL: IntGaugeVec(
881        "rocks_spool_mem_table_total",
882        &["path"]
883    );
884}
885
886declare_metric! {
887/// Approximate memory usage (bytes) of un-flushed mem-tables.
888///
889/// This may be useful when understanding the memory usage of
890/// the system.
891static MEM_TABLE_UNFLUSHED: IntGaugeVec(
892        "rocks_spool_mem_table_unflushed",
893        &["path"]
894    );
895}
896
897declare_metric! {
898/// Approximate memory usage (bytes) of all the table readers.
899///
900/// This may be useful when understanding the memory usage of
901/// the system.
902static MEM_TABLE_READERS_TOTAL: IntGaugeVec(
903        "rocks_spool_mem_table_readers_total",
904        &["path"]
905    );
906}
907
908declare_metric! {
909/// Approximate memory (bytes) usage by cache.
910///
911/// This may be useful when understanding the memory usage of
912/// the system.
913static CACHE_TOTAL: IntGaugeVec(
914        "rocks_spool_cache_total",
915        &["path"]
916    );
917}
918
919declare_metric! {
920/// Accumulated count of background errors encountered by the rocksdb
921/// instance (failed flushes or compactions, typically caused by I/O
922/// errors such as missing or corrupt SST files, ENOSPC, or permission
923/// problems).
924///
925/// {{since('dev')}}
926///
927/// This counter is **monotonic** for the lifetime of the process: it
928/// does not decrease when rocksdb auto-resumes from transient errors
929/// such as a brief ENOSPC.  A non-zero value therefore does not
930/// necessarily mean the database is currently wedged; it means at
931/// least one background error has occurred since the process started.
932///
933/// For SRE monitoring, alert on the **rate of change** (e.g.
934/// `increase(rocks_spool_background_errors[5m]) > 0`) to catch new
935/// occurrences.  For the actionable "the database is wedged right
936/// now and we are shedding load" signal, page on
937/// `rocks_spool_load_shed_active` instead, which combines this
938/// counter, foreground read/write errors, and rocksdb error
939/// severity into a single latched indicator.
940static BACKGROUND_ERRORS_METRIC: IntGaugeVec(
941        "rocks_spool_background_errors",
942        &["path"]
943    );
944}
945
946declare_metric! {
947/// Set to 1 when the rocksdb instance is currently refusing writes
948/// at the WriteController layer (memtable count or L0 file count
949/// reached the stop threshold), 0 otherwise.
950///
951/// {{since('dev')}}
952///
953/// This reflects rocksdb's own `is-write-stopped` property and
954/// indicates backpressure rather than a fatal background error.
955/// Healthy databases under bursty load may briefly report 1 here.
956/// For the "the database is wedged due to a background error"
957/// signal, see `rocks_spool_load_shed_active` instead.
958static WRITE_STOPPED: IntGaugeVec(
959        "rocks_spool_write_stopped",
960        &["path"]
961    );
962}
963
964declare_metric! {
965/// Set to 1 when this spool's load-shedding gate is latched, 0
966/// otherwise.  When set, ingress paths (SMTP, HTTP inject) reject
967/// traffic and foreground store/remove operations fail fast rather
968/// than stall.
969///
970/// {{since('dev')}}
971///
972/// The gate latches in either of two ways:
973///
974/// * **Immediate**: a foreground spool operation (load, store,
975///   remove) returns a rocksdb error classified as definitively
976///   bad (`Corruption` or `IOError` -- e.g. a missing or corrupt
977///   SST file discovered during a read).  These conditions have
978///   no transient interpretation, so the gate latches on the
979///   first such observation.
980/// * **Debounced**: less specific failure signals --
981///   `background-errors` has grown since this process started, or
982///   foreground operations have returned non-fatal errors --
983///   sustained continuously for the configured
984///   `error_latch_duration` (default 15s).  This filters out
985///   brief auto-resumed errors.
986///
987/// If `allow_error_unlatch` is enabled (the default), the gate
988/// auto-clears after `error_unlatch_duration` of observed recovery
989/// (default 5 minutes) with no new errors of either class.
990/// Otherwise it stays set until the process is restarted.
991///
992/// SREs should treat any sustained non-zero value as an
993/// operator-actionable incident; pair this metric with
994/// `rocks_spool_background_errors` to understand why.
995static LOAD_SHED_ACTIVE: IntGaugeVec(
996        "rocks_spool_load_shed_active",
997        &["path"]
998    );
999}
1000
1001declare_metric! {
1002/// Number of background compactions currently running for this
1003/// rocksdb instance.
1004///
1005/// {{since('dev')}}
1006///
1007/// In a healthy, actively-written spool this is typically non-zero
1008/// in bursts.  A value persistently stuck at 0 while
1009/// `rocks_spool_compaction_pending` or
1010/// `rocks_spool_estimate_pending_compaction_bytes` is growing is a
1011/// strong indicator that the background worker is wedged --
1012/// cross-reference `rocks_spool_write_stopped` and
1013/// `rocks_spool_background_errors`.
1014static NUM_RUNNING_COMPACTIONS_METRIC: IntGaugeVec(
1015        "rocks_spool_num_running_compactions",
1016        &["path"]
1017    );
1018}
1019
1020declare_metric! {
1021/// Set to 1 when at least one compaction is pending for this rocksdb
1022/// instance, 0 otherwise.
1023///
1024/// {{since('dev')}}
1025///
1026/// Brief flapping is normal under write load.  A value of 1 that
1027/// persists alongside `rocks_spool_num_running_compactions == 0` is
1028/// suspicious and suggests the compaction worker is not making
1029/// progress.
1030static COMPACTION_PENDING_METRIC: IntGaugeVec(
1031        "rocks_spool_compaction_pending",
1032        &["path"]
1033    );
1034}
1035
1036declare_metric! {
1037/// Estimated total bytes that compaction needs to rewrite to bring
1038/// all levels back under their target sizes.
1039///
1040/// {{since('dev')}}
1041///
1042/// This is a backlog indicator.  Steady-state values depend heavily
1043/// on write rate, compression, and the configured compaction style,
1044/// so absolute thresholds should be derived from each deployment's
1045/// baseline.  Unbounded growth over a multi-hour window indicates
1046/// that compaction cannot keep up with the write rate, which
1047/// eventually leads to write slowdown
1048/// (`rocks_spool_actual_delayed_write_rate` becomes non-zero) and
1049/// then to write stop (`rocks_spool_write_stopped` becomes 1).
1050///
1051/// Only meaningful for level-style compaction.
1052static ESTIMATE_PENDING_COMPACTION_BYTES_METRIC: IntGaugeVec(
1053        "rocks_spool_estimate_pending_compaction_bytes",
1054        &["path"]
1055    );
1056}
1057
1058declare_metric! {
1059/// Current delayed write rate (bytes/second) applied by rocksdb to
1060/// throttle foreground writers.  0 means no slowdown is in effect.
1061///
1062/// {{since('dev')}}
1063///
1064/// A non-zero value means rocksdb is intentionally slowing writers
1065/// down because compaction or flush is falling behind.  This is the
1066/// early-warning signal that precedes a full write stop: if this
1067/// remains non-zero for an extended period, investigate the
1068/// compaction backlog
1069/// (`rocks_spool_estimate_pending_compaction_bytes`) and underlying
1070/// disk throughput before the database transitions to
1071/// `rocks_spool_write_stopped == 1`.
1072static ACTUAL_DELAYED_WRITE_RATE_METRIC: IntGaugeVec(
1073        "rocks_spool_actual_delayed_write_rate",
1074        &["path"]
1075    );
1076}
1077
1078/// Internal state for the load-shedding latch state machine.  See the
1079/// per-tick logic in `metrics_monitor`.
1080struct HealthState {
1081    /// `background-errors` count observed on the first monitor tick.
1082    /// Only growth above this baseline counts toward latching, so a
1083    /// daemon restarted against a DB whose accumulated count is
1084    /// already non-zero does not immediately latch.
1085    initial_bg_errors: u64,
1086    /// `background-errors` count from the previous monitor tick.
1087    /// Used both for once-per-transition logging and to detect
1088    /// quiet windows when deciding whether to auto-unlatch.
1089    prev_bg_errors: u64,
1090    /// Foreground spool error count from the previous monitor tick.
1091    /// The counter itself starts at zero per process, so unlike
1092    /// `initial_bg_errors` there is no separate baseline -- any
1093    /// non-zero observation reflects errors in the current run.
1094    prev_fg_errors: u64,
1095    /// Instant we first observed an unhealthy signal (bg above
1096    /// baseline OR any foreground errors) in the current run.
1097    unhealthy_since: Option<Instant>,
1098    /// Instant of the most recent monitor tick where bg_errors
1099    /// increased over the previous tick.
1100    last_bg_growth_at: Option<Instant>,
1101    /// Instant of the most recent monitor tick where the
1102    /// foreground error counter increased over the previous tick.
1103    last_fg_growth_at: Option<Instant>,
1104    latched: bool,
1105}
1106
1107async fn metrics_monitor(
1108    db: Weak<DB>,
1109    mirror: Weak<AtomicBool>,
1110    foreground_errors: Weak<AtomicU64>,
1111    path: String,
1112    latch_duration: Duration,
1113    unlatch_duration: Duration,
1114    allow_unlatch: bool,
1115) {
1116    let mem_table_total = MEM_TABLE_TOTAL
1117        .get_metric_with_label_values(&[path.as_str()])
1118        .unwrap();
1119    let mem_table_unflushed = MEM_TABLE_UNFLUSHED
1120        .get_metric_with_label_values(&[path.as_str()])
1121        .unwrap();
1122    let mem_table_readers_total = MEM_TABLE_READERS_TOTAL
1123        .get_metric_with_label_values(&[path.as_str()])
1124        .unwrap();
1125    let cache_total = CACHE_TOTAL
1126        .get_metric_with_label_values(&[path.as_str()])
1127        .unwrap();
1128    let background_errors = BACKGROUND_ERRORS_METRIC
1129        .get_metric_with_label_values(&[path.as_str()])
1130        .unwrap();
1131    let write_stopped = WRITE_STOPPED
1132        .get_metric_with_label_values(&[path.as_str()])
1133        .unwrap();
1134    let load_shed_active = LOAD_SHED_ACTIVE
1135        .get_metric_with_label_values(&[path.as_str()])
1136        .unwrap();
1137    let num_running_compactions = NUM_RUNNING_COMPACTIONS_METRIC
1138        .get_metric_with_label_values(&[path.as_str()])
1139        .unwrap();
1140    let compaction_pending = COMPACTION_PENDING_METRIC
1141        .get_metric_with_label_values(&[path.as_str()])
1142        .unwrap();
1143    let estimate_pending_compaction_bytes = ESTIMATE_PENDING_COMPACTION_BYTES_METRIC
1144        .get_metric_with_label_values(&[path.as_str()])
1145        .unwrap();
1146    let actual_delayed_write_rate = ACTUAL_DELAYED_WRITE_RATE_METRIC
1147        .get_metric_with_label_values(&[path.as_str()])
1148        .unwrap();
1149
1150    // Initial bg_errors observation anchors the latch logic against
1151    // pre-existing accumulated errors from prior process lifetimes,
1152    // so a restart against a DB with a historical count does not
1153    // immediately latch.  If the DB has already been dropped before
1154    // we get here, exit silently -- there is nothing to monitor.
1155    let Some(db_init) = db.upgrade() else {
1156        return;
1157    };
1158    let initial_bg = property_u64(&db_init, BACKGROUND_ERRORS);
1159    drop(db_init);
1160    let mut state = HealthState {
1161        initial_bg_errors: initial_bg,
1162        prev_bg_errors: initial_bg,
1163        prev_fg_errors: 0,
1164        unhealthy_since: None,
1165        last_bg_growth_at: None,
1166        last_fg_growth_at: None,
1167        latched: false,
1168    };
1169
1170    loop {
1171        match db.upgrade() {
1172            Some(db) => {
1173                match get_memory_usage_stats(Some(&[&db]), None) {
1174                    Ok(stats) => {
1175                        mem_table_total.set(stats.mem_table_total as i64);
1176                        mem_table_unflushed.set(stats.mem_table_unflushed as i64);
1177                        mem_table_readers_total.set(stats.mem_table_readers_total as i64);
1178                        cache_total.set(stats.cache_total as i64);
1179                    }
1180                    Err(err) => {
1181                        tracing::error!("error getting stats: {err:#}");
1182                    }
1183                };
1184
1185                let bg = property_u64(&db, BACKGROUND_ERRORS);
1186                let stopped = property_u64(&db, IS_WRITE_STOPPED);
1187                let compaction_pending_now = property_u64(&db, COMPACTION_PENDING);
1188                let num_running = property_u64(&db, NUM_RUNNING_COMPACTIONS);
1189                background_errors.set(bg as i64);
1190                write_stopped.set(stopped as i64);
1191                num_running_compactions.set(num_running as i64);
1192                compaction_pending.set(compaction_pending_now as i64);
1193                estimate_pending_compaction_bytes
1194                    .set(property_u64(&db, ESTIMATE_PENDING_COMPACTION_BYTES) as i64);
1195                actual_delayed_write_rate.set(property_u64(&db, ACTUAL_DELAYED_WRITE_RATE) as i64);
1196
1197                let now = Instant::now();
1198                let fg = foreground_errors
1199                    .upgrade()
1200                    .map(|c| c.load(Ordering::Relaxed))
1201                    .unwrap_or(0);
1202
1203                // The foreground error path may have latched the gate
1204                // directly on a fatal error (Corruption/IOError) since
1205                // our last tick.  Reconcile our internal state so the
1206                // unlatch logic sees the gate as latched without
1207                // duplicating the "gate latched" log.
1208                if let Some(m) = mirror.upgrade() {
1209                    if m.load(Ordering::Relaxed) && !state.latched {
1210                        state.latched = true;
1211                        state.unhealthy_since = Some(now);
1212                    }
1213                }
1214
1215                if bg > state.prev_bg_errors {
1216                    tracing::error!(
1217                        "rocksdb at {path}: background error count \
1218                         increased from {prev} to {bg}; check the LOG \
1219                         file in that directory for details",
1220                        prev = state.prev_bg_errors,
1221                    );
1222                    state.last_bg_growth_at = Some(now);
1223                }
1224                state.prev_bg_errors = bg;
1225
1226                if fg > state.prev_fg_errors {
1227                    tracing::error!(
1228                        "rocksdb at {path}: foreground spool error count \
1229                         increased from {prev} to {fg}; this typically \
1230                         indicates a missing or corrupt SST file \
1231                         discovered during a read",
1232                        prev = state.prev_fg_errors,
1233                    );
1234                    state.last_fg_growth_at = Some(now);
1235                }
1236                state.prev_fg_errors = fg;
1237
1238                // Latch signal: background errors have grown since this
1239                // process started, OR any foreground errors have been
1240                // observed.  We cannot use rocksdb's
1241                // `compaction-pending` or `is-write-stopped` properties
1242                // to refine this -- when paranoid_checks fires,
1243                // rocksdb pauses background scheduling and both
1244                // properties drop to 0 even though the DB is wedged.
1245                // The sustained-for-latch_duration window is what
1246                // filters out brief auto-resumed blips.
1247                let unhealthy_now = bg > state.initial_bg_errors || fg > 0;
1248
1249                if unhealthy_now {
1250                    let since = *state.unhealthy_since.get_or_insert(now);
1251                    if !state.latched && now.duration_since(since) >= latch_duration {
1252                        state.latched = true;
1253                        if let Some(m) = mirror.upgrade() {
1254                            m.store(true, Ordering::Relaxed);
1255                        }
1256                        tracing::error!(
1257                            "rocksdb at {path}: load-shedding gate latched \
1258                             after {latch_duration:?} of sustained background \
1259                             errors (accumulated count: {bg}). Ingress paths \
1260                             will now reject traffic. Inspect the LOG file \
1261                             for the underlying cause.",
1262                        );
1263                    }
1264                } else if !state.latched {
1265                    // Healthy tick before latch: reset the debounce
1266                    // window so a future blip gets its full
1267                    // latch_duration grace, not a stale baseline
1268                    // from an earlier, separate transient.
1269                    state.unhealthy_since = None;
1270                }
1271
1272                // Auto-unlatch: neither bg_errors nor fg_errors have
1273                // grown for `unlatch_duration`.  This catches
1274                // self-healed transients (one blip, then quiet).  It
1275                // does NOT distinguish a self-healed transient from a
1276                // truly wedged DB where compactions have been
1277                // abandoned and simply stopped producing further
1278                // errors.  Operators who require a stronger
1279                // guarantee should set `allow_error_unlatch = false`.
1280                if state.latched && allow_unlatch {
1281                    let bg_stable_since = state.last_bg_growth_at.unwrap_or(now);
1282                    let fg_stable_since = state.last_fg_growth_at.unwrap_or(now);
1283                    let stable_since = bg_stable_since.max(fg_stable_since);
1284                    if now.duration_since(stable_since) >= unlatch_duration {
1285                        // CAS the foreground counter from our tick
1286                        // snapshot to 0.  This is the synchronization
1287                        // point that prevents an auto-unlatch from
1288                        // racing with a concurrent
1289                        // record_foreground_error: if a fresh fatal
1290                        // error landed mid-tick, the CAS fails and we
1291                        // defer the unlatch to the next tick.
1292                        let cleared = match foreground_errors.upgrade() {
1293                            Some(c) => c
1294                                .compare_exchange(fg, 0, Ordering::Relaxed, Ordering::Relaxed)
1295                                .is_ok(),
1296                            // Process is shutting down; skip.
1297                            None => false,
1298                        };
1299                        if cleared {
1300                            state.latched = false;
1301                            // Re-anchor the baselines at the current
1302                            // counts so that we only re-latch on *new*
1303                            // growth above this point; otherwise the
1304                            // static post-transient counts would keep us
1305                            // permanently unhealthy.
1306                            state.initial_bg_errors = bg;
1307                            state.prev_fg_errors = 0;
1308                            state.unhealthy_since = None;
1309                            if let Some(m) = mirror.upgrade() {
1310                                m.store(false, Ordering::Relaxed);
1311                            }
1312                            tracing::info!(
1313                                "rocksdb at {path}: load-shedding gate cleared \
1314                                 after {unlatch_duration:?} without new errors \
1315                                 (bg baseline re-anchored at {bg}); ingress \
1316                                 paths will accept traffic again",
1317                            );
1318                        }
1319                    }
1320                }
1321                load_shed_active.set(if state.latched { 1 } else { 0 });
1322            }
1323            None => {
1324                // Dead
1325                return;
1326            }
1327        }
1328        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1329    }
1330}