spool/rocks.rs
1use crate::{
2 Spool, SpoolBackpressureTimeout, SpoolCallerDeadlineExceeded, SpoolEntry, SpoolId,
3 SpoolUnhealthyError,
4};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use flume::Sender;
8use kumo_prometheus::declare_metric;
9use rocksdb::perf::get_memory_usage_stats;
10use rocksdb::properties::{
11 ACTUAL_DELAYED_WRITE_RATE, BACKGROUND_ERRORS, COMPACTION_PENDING,
12 ESTIMATE_PENDING_COMPACTION_BYTES, IS_WRITE_STOPPED, NUM_RUNNING_COMPACTIONS,
13};
14use rocksdb::{
15 BottommostLevelCompaction, CompactOptions, DBCompressionType, ErrorKind, IteratorMode,
16 LogLevel, Options, WaitForCompactOptions, WriteBatch, WriteOptions, DB,
17};
18use serde::{Deserialize, Serialize};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23use tokio::runtime::Handle;
24use tokio::sync::Semaphore;
25use tokio::time::{sleep, timeout_at};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct RocksSpoolParams {
29 pub increase_parallelism: Option<i32>,
30
31 pub optimize_level_style_compaction: Option<usize>,
32 pub optimize_universal_style_compaction: Option<usize>,
33 #[serde(default)]
34 pub paranoid_checks: bool,
35 #[serde(default)]
36 pub compression_type: DBCompressionTypeDef,
37
38 /// If non-zero, we perform bigger reads when doing compaction. If you’re running RocksDB on
39 /// spinning disks, you should set this to at least 2MB. That way RocksDB’s compaction is doing
40 /// sequential instead of random reads
41 pub compaction_readahead_size: Option<usize>,
42
43 #[serde(default)]
44 pub level_compaction_dynamic_level_bytes: bool,
45
46 #[serde(default)]
47 pub max_open_files: Option<usize>,
48
49 /// Size in bytes of the rocksdb memtable that buffers writes before
50 /// being flushed to disk as a new SST file.
51 ///
52 /// Smaller values produce smaller, more frequent SST files and
53 /// trigger compactions sooner -- useful in test setups that need
54 /// to force the storage through its full write/compact lifecycle
55 /// quickly. Larger values amortize compaction overhead but
56 /// increase memory use and recovery time after restart. Leave
57 /// unset to use the rocksdb default.
58 #[serde(default)]
59 pub write_buffer_size: Option<usize>,
60
61 /// Number of level-0 SST files at which rocksdb will stop
62 /// accepting writes. Lower values transition the database into
63 /// the write-stopped state more quickly when background
64 /// compaction cannot keep up, which is useful for tests that
65 /// need to deterministically observe that condition. Leave
66 /// unset to use the rocksdb default.
67 #[serde(default)]
68 pub level0_stop_writes_trigger: Option<i32>,
69
70 #[serde(default)]
71 pub log_level: LogLevelDef,
72
73 /// See:
74 /// <https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html#method.set_memtable_huge_page_size>
75 #[serde(default)]
76 pub memtable_huge_page_size: Option<usize>,
77
78 #[serde(
79 with = "duration_serde",
80 default = "RocksSpoolParams::default_log_file_time_to_roll"
81 )]
82 pub log_file_time_to_roll: Duration,
83
84 #[serde(
85 with = "duration_serde",
86 default = "RocksSpoolParams::default_obsolete_files_period"
87 )]
88 pub obsolete_files_period: Duration,
89
90 #[serde(default)]
91 pub limit_concurrent_stores: Option<usize>,
92 #[serde(default)]
93 pub limit_concurrent_loads: Option<usize>,
94 #[serde(default)]
95 pub limit_concurrent_removes: Option<usize>,
96
97 /// Upper bound on the wait that `store()` and `remove()` will
98 /// tolerate when rocksdb is applying backpressure. Callers may
99 /// provide a shorter deadline (typically derived from an SMTP
100 /// client's idle timeout); the effective deadline is the minimum
101 /// of the two. Going longer than the caller-provided value risks
102 /// the client timing out and retrying, which would produce
103 /// duplicate deliveries -- this option therefore only narrows the
104 /// effective deadline, it never extends it.
105 #[serde(
106 with = "duration_serde",
107 default = "RocksSpoolParams::default_store_deadline"
108 )]
109 pub store_deadline: Duration,
110
111 /// How long the composite "this database is wedged" signal must
112 /// hold continuously before the load-shedding gate latches.
113 /// The signal goes high whenever the rocksdb
114 /// `background-errors` counter has grown above the value
115 /// observed at process start, or any foreground spool operation
116 /// has returned a rocksdb error since process start. Brief
117 /// blips that recover within this window do not latch the gate.
118 #[serde(
119 with = "duration_serde",
120 default = "RocksSpoolParams::default_error_latch_duration"
121 )]
122 pub error_latch_duration: Duration,
123
124 /// How long the healthy state must hold continuously before the
125 /// load-shedding gate auto-unlatches. Only consulted when
126 /// `allow_error_unlatch` is true. A relatively long value
127 /// (minutes) gives operators time to inspect the database after a
128 /// brief failure window before the daemon starts accepting writes
129 /// again on its own.
130 #[serde(
131 with = "duration_serde",
132 default = "RocksSpoolParams::default_error_unlatch_duration"
133 )]
134 pub error_unlatch_duration: Duration,
135
136 /// When true (the default), the load-shedding gate clears itself
137 /// after `error_unlatch_duration` of observed recovery. Set to
138 /// false to require an operator restart to clear the gate, which
139 /// is appropriate when you want a human to confirm the underlying
140 /// cause is resolved before accepting traffic again.
141 #[serde(default = "RocksSpoolParams::default_allow_error_unlatch")]
142 pub allow_error_unlatch: bool,
143}
144
145impl Default for RocksSpoolParams {
146 fn default() -> Self {
147 Self {
148 increase_parallelism: None,
149 optimize_level_style_compaction: None,
150 optimize_universal_style_compaction: None,
151 paranoid_checks: false,
152 compression_type: DBCompressionTypeDef::default(),
153 compaction_readahead_size: None,
154 level_compaction_dynamic_level_bytes: false,
155 max_open_files: None,
156 write_buffer_size: None,
157 level0_stop_writes_trigger: None,
158 log_level: LogLevelDef::default(),
159 memtable_huge_page_size: None,
160 log_file_time_to_roll: Self::default_log_file_time_to_roll(),
161 obsolete_files_period: Self::default_obsolete_files_period(),
162 limit_concurrent_stores: None,
163 limit_concurrent_loads: None,
164 limit_concurrent_removes: None,
165 store_deadline: Self::default_store_deadline(),
166 error_latch_duration: Self::default_error_latch_duration(),
167 error_unlatch_duration: Self::default_error_unlatch_duration(),
168 allow_error_unlatch: Self::default_allow_error_unlatch(),
169 }
170 }
171}
172
173impl RocksSpoolParams {
174 fn default_log_file_time_to_roll() -> Duration {
175 Duration::from_secs(86400)
176 }
177
178 fn default_obsolete_files_period() -> Duration {
179 Duration::from_secs(6 * 60 * 60)
180 }
181
182 fn default_store_deadline() -> Duration {
183 Duration::from_secs(30)
184 }
185
186 fn default_error_latch_duration() -> Duration {
187 Duration::from_secs(15)
188 }
189
190 fn default_error_unlatch_duration() -> Duration {
191 Duration::from_secs(5 * 60)
192 }
193
194 fn default_allow_error_unlatch() -> bool {
195 true
196 }
197}
198
199#[derive(Serialize, Deserialize, Debug)]
200pub enum DBCompressionTypeDef {
201 None,
202 Snappy,
203 Zlib,
204 Bz2,
205 Lz4,
206 Lz4hc,
207 Zstd,
208}
209
210impl From<DBCompressionTypeDef> for DBCompressionType {
211 fn from(val: DBCompressionTypeDef) -> Self {
212 match val {
213 DBCompressionTypeDef::None => DBCompressionType::None,
214 DBCompressionTypeDef::Snappy => DBCompressionType::Snappy,
215 DBCompressionTypeDef::Zlib => DBCompressionType::Zlib,
216 DBCompressionTypeDef::Bz2 => DBCompressionType::Bz2,
217 DBCompressionTypeDef::Lz4 => DBCompressionType::Lz4,
218 DBCompressionTypeDef::Lz4hc => DBCompressionType::Lz4hc,
219 DBCompressionTypeDef::Zstd => DBCompressionType::Zstd,
220 }
221 }
222}
223
224impl Default for DBCompressionTypeDef {
225 fn default() -> Self {
226 Self::Snappy
227 }
228}
229
230#[derive(Serialize, Deserialize, Debug)]
231pub enum LogLevelDef {
232 Debug,
233 Info,
234 Warn,
235 Error,
236 Fatal,
237 Header,
238}
239
240impl Default for LogLevelDef {
241 fn default() -> Self {
242 Self::Info
243 }
244}
245
246impl From<LogLevelDef> for LogLevel {
247 fn from(val: LogLevelDef) -> Self {
248 match val {
249 LogLevelDef::Debug => LogLevel::Debug,
250 LogLevelDef::Info => LogLevel::Info,
251 LogLevelDef::Warn => LogLevel::Warn,
252 LogLevelDef::Error => LogLevel::Error,
253 LogLevelDef::Fatal => LogLevel::Fatal,
254 LogLevelDef::Header => LogLevel::Header,
255 }
256 }
257}
258
259pub struct RocksSpool {
260 db: Arc<DB>,
261 runtime: Handle,
262 limit_concurrent_stores: Option<Arc<Semaphore>>,
263 limit_concurrent_loads: Option<Arc<Semaphore>>,
264 limit_concurrent_removes: Option<Arc<Semaphore>>,
265 /// Latched load-shedding gate driven by `metrics_monitor`. When
266 /// set, foreground `store()`/`remove()` calls return an error
267 /// instead of waiting on rocksdb backpressure, and the ingress
268 /// paths refuse new traffic. See `metrics_monitor` for the
269 /// composite signal that drives latch/unlatch transitions.
270 load_shed_active: Arc<AtomicBool>,
271 /// Count of foreground spool operations (load, enumerate,
272 /// store, remove) that have failed with a rocksdb error
273 /// since the last auto-unlatch (or since process start if no
274 /// auto-unlatch has happened). `metrics_monitor` reads this
275 /// to detect failure modes that do not surface as background
276 /// errors -- notably a missing SST discovered during a
277 /// `get()`, which the C++ side reports to the caller but does
278 /// not feed into `rocksdb.background-errors`. Reset to 0 by
279 /// the auto-unlatch path's compare-exchange so a subsequent
280 /// blip can be observed as fresh growth.
281 foreground_errors: Arc<AtomicU64>,
282 store_deadline: Duration,
283}
284
285/// Initial sleep interval for the `store`/`remove` backpressure loop.
286/// Chosen low enough that brief, sub-millisecond memtable backpressure
287/// is caught with negligible added latency on the slow path.
288const BACKOFF_INITIAL: Duration = Duration::from_micros(500);
289/// Upper bound on the backpressure loop sleep interval. Keeps the
290/// load-shedding gate observable within a bounded window even during
291/// a long wedge.
292const BACKOFF_MAX: Duration = Duration::from_millis(50);
293
294impl RocksSpool {
295 /// Driver shared by `store()` and `remove()`. Issues `write_opt`
296 /// with `no_slowdown=true` repeatedly with exponential backoff
297 /// until one of: the write succeeds, the effective deadline is
298 /// reached, the load-shedding gate latches, or rocksdb returns a
299 /// non-`Incomplete` error.
300 ///
301 /// This replaces an earlier design that used `spawn_blocking` with
302 /// `no_slowdown=false`. That approach could not be cancelled, held
303 /// a blocking-pool worker per stalled call (risking pool
304 /// exhaustion during a wedge), and could not observe the latched
305 /// load-shedding gate. The polling design preserves write
306 /// atomicity -- each iteration is a single rocksdb batch write,
307 /// which is atomic by construction -- while restoring
308 /// cancellation, gate observability, and bounded resource use.
309 async fn write_with_backpressure(
310 &self,
311 opts: WriteOptions,
312 caller_deadline: Option<Instant>,
313 permits: Option<Arc<Semaphore>>,
314 apply: impl Fn(&mut WriteBatch),
315 ) -> anyhow::Result<()> {
316 // Gate at the top so that the load-shedding mirror affects
317 // every store, not just those that happen to hit backpressure.
318 // A relaxed atomic load is essentially free compared to the
319 // rocksdb FFI write below; this preserves the healthy hot
320 // path's latency profile while giving the gate consistent
321 // semantics across the in-flight call sites that aren't
322 // covered by the per-connection ingress checks (notably,
323 // already-established SMTP connections doing new
324 // transactions).
325 if self.load_shed_active.load(Ordering::Relaxed) {
326 return Err(SpoolUnhealthyError.into());
327 }
328
329 let mut batch = WriteBatch::default();
330 apply(&mut batch);
331 match self.db.write_opt(batch, &opts) {
332 Ok(()) => return Ok(()),
333 Err(err) if err.kind() == ErrorKind::Incomplete => {}
334 Err(err) => {
335 record_foreground_error(
336 &self.foreground_errors,
337 &self.load_shed_active,
338 self.db.path(),
339 &err,
340 );
341 return Err(err.into());
342 }
343 }
344
345 let spool_deadline = Instant::now() + self.store_deadline;
346 // Decide upfront which side's deadline wins, so both the
347 // semaphore-acquisition timeout and the backpressure-loop
348 // timeout can surface the matching typed error. Without
349 // this, the SMTP layer cannot tell a caller-provided
350 // `data_processing_timeout` from the spool's own
351 // `store_deadline` and would mis-label the wire response.
352 let (effective_deadline, caller_wins) = match caller_deadline {
353 Some(c) if c < spool_deadline => (c, true),
354 _ => (spool_deadline, false),
355 };
356 let timeout_err = || -> anyhow::Error {
357 if caller_wins {
358 SpoolCallerDeadlineExceeded.into()
359 } else {
360 SpoolBackpressureTimeout {
361 deadline: self.store_deadline,
362 }
363 .into()
364 }
365 };
366
367 let _permit = match permits {
368 Some(s) => match timeout_at(effective_deadline.into(), s.acquire_owned()).await {
369 Ok(r) => Some(r?),
370 Err(_) => return Err(timeout_err()),
371 },
372 None => None,
373 };
374
375 let mut backoff = BACKOFF_INITIAL;
376 loop {
377 if self.load_shed_active.load(Ordering::Relaxed) {
378 return Err(SpoolUnhealthyError.into());
379 }
380 if Instant::now() >= effective_deadline {
381 // Sustained backpressure for the full deadline is
382 // itself a useful signal that the spool may be
383 // unhealthy. Feed it into the foreground error
384 // machinery so the debounced latch path can react if
385 // we see this repeatedly; an occasional one-off
386 // (e.g. a brief load spike) gets washed out by the
387 // `error_latch_duration` window. We do not
388 // immediate-latch because no rocksdb error has been
389 // returned -- the inability to make progress is
390 // ambiguous, not definitively bad.
391 self.foreground_errors.fetch_add(1, Ordering::Relaxed);
392 return Err(timeout_err());
393 }
394 sleep(backoff).await;
395 backoff = (backoff * 2).min(BACKOFF_MAX);
396
397 let mut batch = WriteBatch::default();
398 apply(&mut batch);
399 match self.db.write_opt(batch, &opts) {
400 Ok(()) => return Ok(()),
401 Err(err) if err.kind() == ErrorKind::Incomplete => continue,
402 Err(err) => {
403 record_foreground_error(
404 &self.foreground_errors,
405 &self.load_shed_active,
406 self.db.path(),
407 &err,
408 );
409 return Err(err.into());
410 }
411 }
412 }
413 }
414
415 pub fn new(
416 path: &Path,
417 flush: bool,
418 params: Option<RocksSpoolParams>,
419 runtime: Handle,
420 ) -> anyhow::Result<Self> {
421 let mut opts = Options::default();
422 opts.set_use_fsync(flush);
423 opts.create_if_missing(true);
424 // The default is 1000, which is a bit high
425 opts.set_keep_log_file_num(10);
426
427 let p = params.unwrap_or_default();
428 if let Some(i) = p.increase_parallelism {
429 opts.increase_parallelism(i);
430 }
431 if let Some(i) = p.optimize_level_style_compaction {
432 opts.optimize_level_style_compaction(i);
433 }
434 if let Some(i) = p.optimize_universal_style_compaction {
435 opts.optimize_universal_style_compaction(i);
436 }
437 if let Some(i) = p.compaction_readahead_size {
438 opts.set_compaction_readahead_size(i);
439 }
440 if let Some(i) = p.max_open_files {
441 opts.set_max_open_files(i as _);
442 }
443 if let Some(i) = p.write_buffer_size {
444 opts.set_write_buffer_size(i);
445 }
446 if let Some(i) = p.level0_stop_writes_trigger {
447 opts.set_level_zero_stop_writes_trigger(i);
448 }
449 if let Some(i) = p.memtable_huge_page_size {
450 opts.set_memtable_huge_page_size(i);
451 }
452 opts.set_paranoid_checks(p.paranoid_checks);
453 opts.set_level_compaction_dynamic_level_bytes(p.level_compaction_dynamic_level_bytes);
454 opts.set_compression_type(p.compression_type.into());
455 opts.set_log_level(p.log_level.into());
456 opts.set_log_file_time_to_roll(p.log_file_time_to_roll.as_secs() as usize);
457 opts.set_delete_obsolete_files_period_micros(p.obsolete_files_period.as_micros() as u64);
458
459 let limit_concurrent_stores = p
460 .limit_concurrent_stores
461 .map(|n| Arc::new(Semaphore::new(n)));
462 let limit_concurrent_loads = p
463 .limit_concurrent_loads
464 .map(|n| Arc::new(Semaphore::new(n)));
465 let limit_concurrent_removes = p
466 .limit_concurrent_removes
467 .map(|n| Arc::new(Semaphore::new(n)));
468
469 let db = Arc::new(DB::open(&opts, path)?);
470 let load_shed_active = Arc::new(AtomicBool::new(false));
471 let foreground_errors = Arc::new(AtomicU64::new(0));
472 let store_deadline = p.store_deadline;
473
474 {
475 let weak_db = Arc::downgrade(&db);
476 let weak_mirror = Arc::downgrade(&load_shed_active);
477 let weak_fg_errors = Arc::downgrade(&foreground_errors);
478 tokio::spawn(metrics_monitor(
479 weak_db,
480 weak_mirror,
481 weak_fg_errors,
482 format!("{}", path.display()),
483 p.error_latch_duration,
484 p.error_unlatch_duration,
485 p.allow_error_unlatch,
486 ));
487 }
488
489 Ok(Self {
490 db,
491 runtime,
492 limit_concurrent_stores,
493 limit_concurrent_loads,
494 limit_concurrent_removes,
495 load_shed_active,
496 foreground_errors,
497 store_deadline,
498 })
499 }
500}
501
502#[async_trait]
503impl Spool for RocksSpool {
504 async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>> {
505 let permit = match self.limit_concurrent_loads.clone() {
506 Some(s) => Some(s.acquire_owned().await?),
507 None => None,
508 };
509 let db = self.db.clone();
510 let fg_errors = self.foreground_errors.clone();
511 let load_shed = self.load_shed_active.clone();
512 let db_path: PathBuf = self.db.path().to_owned();
513 tokio::task::Builder::new()
514 .name("rocksdb load")
515 .spawn_blocking_on(
516 move || {
517 let result = match db.get(id.as_bytes()) {
518 Ok(Some(v)) => v,
519 Ok(None) => {
520 drop(permit);
521 anyhow::bail!("no such key {id}");
522 }
523 Err(err) => {
524 // Rocksdb get errors (e.g. a missing SST
525 // file discovered during the read) do not
526 // increment rocksdb.background-errors.
527 // Record them so the load-shedding gate
528 // can react -- immediately for fatal
529 // classes, or after debounce otherwise.
530 record_foreground_error(&fg_errors, &load_shed, &db_path, &err);
531 drop(permit);
532 return Err(err.into());
533 }
534 };
535 drop(permit);
536 Ok(result)
537 },
538 &self.runtime,
539 )?
540 .await?
541 }
542
543 async fn store(
544 &self,
545 id: SpoolId,
546 data: Arc<Box<[u8]>>,
547 force_sync: bool,
548 deadline: Option<Instant>,
549 ) -> anyhow::Result<()> {
550 let mut opts = WriteOptions::default();
551 opts.set_sync(force_sync);
552 opts.set_no_slowdown(true);
553
554 self.write_with_backpressure(
555 opts,
556 deadline,
557 self.limit_concurrent_stores.clone(),
558 |batch| batch.put(id.as_bytes(), &*data),
559 )
560 .await
561 }
562
563 async fn remove(&self, id: SpoolId) -> anyhow::Result<()> {
564 let mut opts = WriteOptions::default();
565 opts.set_no_slowdown(true);
566
567 self.write_with_backpressure(opts, None, self.limit_concurrent_removes.clone(), |batch| {
568 batch.delete(id.as_bytes())
569 })
570 .await
571 }
572
573 async fn cleanup(&self) -> anyhow::Result<()> {
574 Ok(())
575 }
576
577 async fn compact(&self) -> anyhow::Result<()> {
578 let db = self.db.clone();
579 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
580 db.flush()?;
581 // Force bottommost-level compaction so the entire keyspace
582 // is rewritten; without this, single-level layouts cause
583 // the call to be a no-op even when there are missing files
584 // that we'd want to surface as errors.
585 let mut opts = CompactOptions::default();
586 opts.set_bottommost_level_compaction(BottommostLevelCompaction::Force);
587 opts.set_exclusive_manual_compaction(true);
588 db.compact_range_opt::<&[u8], &[u8]>(None, None, &opts);
589 // compact_range itself does not return errors -- wait_for_compact
590 // does, and is what surfaces background failures (e.g. a
591 // missing SST encountered during compaction) to the caller.
592 let wait_opts = WaitForCompactOptions::default();
593 db.wait_for_compact(&wait_opts)?;
594 Ok(())
595 })
596 .await?
597 }
598
599 async fn shutdown(&self) -> anyhow::Result<()> {
600 let db = self.db.clone();
601 tokio::task::spawn_blocking(move || db.cancel_all_background_work(true)).await?;
602 Ok(())
603 }
604
605 fn unhealthy_reason(&self) -> Option<&'static str> {
606 if self.load_shed_active.load(Ordering::Relaxed) {
607 Some("the spool is not accepting writes")
608 } else {
609 None
610 }
611 }
612
613 async fn advise_low_memory(&self) -> anyhow::Result<isize> {
614 let db = self.db.clone();
615 tokio::task::spawn_blocking(move || {
616 let usage_before = match get_memory_usage_stats(Some(&[&db]), None) {
617 Ok(stats) => {
618 let stats: Stats = stats.into();
619 tracing::debug!("pre-flush: {stats:#?}");
620 stats.total()
621 }
622 Err(err) => {
623 tracing::error!("error getting stats: {err:#}");
624 0
625 }
626 };
627
628 if let Err(err) = db.flush() {
629 tracing::error!("error flushing memory: {err:#}");
630 }
631
632 let usage_after = match get_memory_usage_stats(Some(&[&db]), None) {
633 Ok(stats) => {
634 let stats: Stats = stats.into();
635 tracing::debug!("post-flush: {stats:#?}");
636 stats.total()
637 }
638 Err(err) => {
639 tracing::error!("error getting stats: {err:#}");
640 0
641 }
642 };
643
644 Ok(usage_before - usage_after)
645 })
646 .await?
647 }
648
649 fn enumerate(
650 &self,
651 sender: Sender<SpoolEntry>,
652 start_time: DateTime<Utc>,
653 ) -> anyhow::Result<()> {
654 let db = Arc::clone(&self.db);
655 let fg_errors = self.foreground_errors.clone();
656 let load_shed = self.load_shed_active.clone();
657 let db_path: PathBuf = self.db.path().to_owned();
658 tokio::task::Builder::new()
659 .name("rocksdb enumerate")
660 .spawn_blocking_on(
661 move || {
662 let iter = db.iterator(IteratorMode::Start);
663 for entry in iter {
664 let (key, value) = match entry {
665 Ok(e) => e,
666 Err(err) => {
667 // Iterator errors typically indicate a
668 // missing or corrupt SST file
669 // discovered while walking the
670 // keyspace. Feed into the foreground
671 // error machinery so the gate latches
672 // (immediately for IOError /
673 // Corruption) and abort the
674 // enumeration.
675 record_foreground_error(&fg_errors, &load_shed, &db_path, &err);
676 return Err(err.into());
677 }
678 };
679 let id = SpoolId::from_slice(&key)
680 .ok_or_else(|| anyhow::anyhow!("invalid spool id {key:?}"))?;
681
682 if id.created() >= start_time {
683 // Entries created since we started must have
684 // landed there after we started and are thus
685 // not eligible for discovery via enumeration
686 continue;
687 }
688
689 sender
690 .send(SpoolEntry::Item {
691 id,
692 data: value.to_vec(),
693 })
694 .map_err(|err| {
695 anyhow::anyhow!("failed to send SpoolEntry for {id}: {err:#}")
696 })?;
697 }
698 Ok::<(), anyhow::Error>(())
699 },
700 &self.runtime,
701 )?;
702 Ok(())
703 }
704}
705
706#[cfg(test)]
707mod test {
708 use super::*;
709
710 #[tokio::test]
711 async fn rocks_spool() -> anyhow::Result<()> {
712 let location = tempfile::tempdir()?;
713 let spool = RocksSpool::new(location.path(), false, None, Handle::current())?;
714
715 {
716 let id1 = SpoolId::new();
717
718 // Can't load an entry that doesn't exist
719 assert_eq!(
720 format!("{:#}", spool.load(id1).await.unwrap_err()),
721 format!("no such key {id1}")
722 );
723 }
724
725 // Insert some entries
726 let mut ids = vec![];
727 for i in 0..100 {
728 let id = SpoolId::new();
729 spool
730 .store(
731 id,
732 Arc::new(format!("I am {i}").as_bytes().to_vec().into_boxed_slice()),
733 false,
734 None,
735 )
736 .await?;
737 ids.push(id);
738 }
739
740 // Verify that we can load those entries
741 for (i, &id) in ids.iter().enumerate() {
742 let data = spool.load(id).await?;
743 let text = String::from_utf8(data)?;
744 assert_eq!(text, format!("I am {i}"));
745 }
746
747 {
748 // Verify that we can enumerate them
749 let (tx, rx) = flume::bounded(32);
750 spool.enumerate(tx, Utc::now())?;
751 let mut count = 0;
752
753 while let Ok(item) = rx.recv_async().await {
754 match item {
755 SpoolEntry::Item { id, data } => {
756 let i = ids
757 .iter()
758 .position(|&item| item == id)
759 .ok_or_else(|| anyhow::anyhow!("{id} not found in ids!"))?;
760
761 let text = String::from_utf8(data)?;
762 assert_eq!(text, format!("I am {i}"));
763
764 spool.remove(id).await?;
765 // Can't load an entry that we just removed
766 assert_eq!(
767 format!("{:#}", spool.load(id).await.unwrap_err()),
768 format!("no such key {id}")
769 );
770 count += 1;
771 }
772 SpoolEntry::Corrupt { id, error } => {
773 anyhow::bail!("Corrupt: {id}: {error}");
774 }
775 }
776 }
777
778 assert_eq!(count, 100);
779 }
780
781 // Now that we've removed the files, try enumerating again.
782 // We expect to receive no entries.
783 // Do it a couple of times to verify that none of the cleanup
784 // stuff that happens in enumerate breaks the directory
785 // structure
786 for _ in 0..2 {
787 // Verify that we can enumerate them
788 let (tx, rx) = flume::bounded(32);
789 spool.enumerate(tx, Utc::now())?;
790 let mut unexpected = vec![];
791
792 while let Ok(item) = rx.recv_async().await {
793 match item {
794 SpoolEntry::Item { id, .. } | SpoolEntry::Corrupt { id, .. } => {
795 unexpected.push(id)
796 }
797 }
798 }
799
800 assert_eq!(unexpected.len(), 0);
801 }
802
803 Ok(())
804 }
805}
806
807/// The rocksdb type doesn't impl Debug, so we get to do it
808#[allow(unused)]
809#[derive(Debug)]
810struct Stats {
811 pub mem_table_total: u64,
812 pub mem_table_unflushed: u64,
813 pub mem_table_readers_total: u64,
814 pub cache_total: u64,
815}
816
817impl Stats {
818 fn total(&self) -> isize {
819 (self.mem_table_total + self.mem_table_readers_total + self.cache_total) as isize
820 }
821}
822
823impl From<rocksdb::perf::MemoryUsageStats> for Stats {
824 fn from(s: rocksdb::perf::MemoryUsageStats) -> Self {
825 Self {
826 mem_table_total: s.mem_table_total,
827 mem_table_unflushed: s.mem_table_unflushed,
828 mem_table_readers_total: s.mem_table_readers_total,
829 cache_total: s.cache_total,
830 }
831 }
832}
833
834/// Read an integer-valued rocksdb property, returning 0 if the property
835/// is missing or cannot be parsed. Used for hot-path checks and metrics
836/// gathering; callers that want to distinguish "missing" from "zero"
837/// should call `property_int_value` directly.
838fn property_u64(db: &DB, name: &rocksdb::properties::PropName) -> u64 {
839 db.property_int_value(name).ok().flatten().unwrap_or(0)
840}
841
842/// Classify a rocksdb error returned from a foreground spool
843/// operation. `Corruption` and `IOError` are returned by rocksdb
844/// when the underlying database state is observably wrong (a missing
845/// or corrupt SST file, a checksum mismatch, etc.) -- conditions
846/// that do not have any transient interpretation. We use this to
847/// latch the load-shedding gate immediately rather than waiting out
848/// the normal debounce window.
849fn is_definitively_bad(err: &rocksdb::Error) -> bool {
850 matches!(err.kind(), ErrorKind::Corruption | ErrorKind::IOError)
851}
852
853/// Record a foreground spool error: always increment the counter so
854/// the metrics monitor can observe sustained-failure patterns, and
855/// for errors classified as definitively bad, latch the gate now.
856/// Logs once on the false-to-true transition of the gate.
857fn record_foreground_error(
858 fg_errors: &AtomicU64,
859 load_shed: &AtomicBool,
860 path: &Path,
861 err: &rocksdb::Error,
862) {
863 fg_errors.fetch_add(1, Ordering::Relaxed);
864 if is_definitively_bad(err) && !load_shed.swap(true, Ordering::Relaxed) {
865 tracing::error!(
866 "rocksdb at {}: fatal foreground error ({:?}); load-shedding \
867 gate latched immediately. Underlying error: {}",
868 path.display(),
869 err.kind(),
870 err.as_ref(),
871 );
872 }
873}
874
875declare_metric! {
876/// Approximate memory usage (bytes) of all the mem-tables.
877///
878/// This may be useful when understanding the memory usage of
879/// the system.
880static MEM_TABLE_TOTAL: IntGaugeVec(
881 "rocks_spool_mem_table_total",
882 &["path"]
883 );
884}
885
886declare_metric! {
887/// Approximate memory usage (bytes) of un-flushed mem-tables.
888///
889/// This may be useful when understanding the memory usage of
890/// the system.
891static MEM_TABLE_UNFLUSHED: IntGaugeVec(
892 "rocks_spool_mem_table_unflushed",
893 &["path"]
894 );
895}
896
897declare_metric! {
898/// Approximate memory usage (bytes) of all the table readers.
899///
900/// This may be useful when understanding the memory usage of
901/// the system.
902static MEM_TABLE_READERS_TOTAL: IntGaugeVec(
903 "rocks_spool_mem_table_readers_total",
904 &["path"]
905 );
906}
907
908declare_metric! {
909/// Approximate memory (bytes) usage by cache.
910///
911/// This may be useful when understanding the memory usage of
912/// the system.
913static CACHE_TOTAL: IntGaugeVec(
914 "rocks_spool_cache_total",
915 &["path"]
916 );
917}
918
919declare_metric! {
920/// Accumulated count of background errors encountered by the rocksdb
921/// instance (failed flushes or compactions, typically caused by I/O
922/// errors such as missing or corrupt SST files, ENOSPC, or permission
923/// problems).
924///
925/// {{since('dev')}}
926///
927/// This counter is **monotonic** for the lifetime of the process: it
928/// does not decrease when rocksdb auto-resumes from transient errors
929/// such as a brief ENOSPC. A non-zero value therefore does not
930/// necessarily mean the database is currently wedged; it means at
931/// least one background error has occurred since the process started.
932///
933/// For SRE monitoring, alert on the **rate of change** (e.g.
934/// `increase(rocks_spool_background_errors[5m]) > 0`) to catch new
935/// occurrences. For the actionable "the database is wedged right
936/// now and we are shedding load" signal, page on
937/// `rocks_spool_load_shed_active` instead, which combines this
938/// counter, foreground read/write errors, and rocksdb error
939/// severity into a single latched indicator.
940static BACKGROUND_ERRORS_METRIC: IntGaugeVec(
941 "rocks_spool_background_errors",
942 &["path"]
943 );
944}
945
946declare_metric! {
947/// Set to 1 when the rocksdb instance is currently refusing writes
948/// at the WriteController layer (memtable count or L0 file count
949/// reached the stop threshold), 0 otherwise.
950///
951/// {{since('dev')}}
952///
953/// This reflects rocksdb's own `is-write-stopped` property and
954/// indicates backpressure rather than a fatal background error.
955/// Healthy databases under bursty load may briefly report 1 here.
956/// For the "the database is wedged due to a background error"
957/// signal, see `rocks_spool_load_shed_active` instead.
958static WRITE_STOPPED: IntGaugeVec(
959 "rocks_spool_write_stopped",
960 &["path"]
961 );
962}
963
964declare_metric! {
965/// Set to 1 when this spool's load-shedding gate is latched, 0
966/// otherwise. When set, ingress paths (SMTP, HTTP inject) reject
967/// traffic and foreground store/remove operations fail fast rather
968/// than stall.
969///
970/// {{since('dev')}}
971///
972/// The gate latches in either of two ways:
973///
974/// * **Immediate**: a foreground spool operation (load, store,
975/// remove) returns a rocksdb error classified as definitively
976/// bad (`Corruption` or `IOError` -- e.g. a missing or corrupt
977/// SST file discovered during a read). These conditions have
978/// no transient interpretation, so the gate latches on the
979/// first such observation.
980/// * **Debounced**: less specific failure signals --
981/// `background-errors` has grown since this process started, or
982/// foreground operations have returned non-fatal errors --
983/// sustained continuously for the configured
984/// `error_latch_duration` (default 15s). This filters out
985/// brief auto-resumed errors.
986///
987/// If `allow_error_unlatch` is enabled (the default), the gate
988/// auto-clears after `error_unlatch_duration` of observed recovery
989/// (default 5 minutes) with no new errors of either class.
990/// Otherwise it stays set until the process is restarted.
991///
992/// SREs should treat any sustained non-zero value as an
993/// operator-actionable incident; pair this metric with
994/// `rocks_spool_background_errors` to understand why.
995static LOAD_SHED_ACTIVE: IntGaugeVec(
996 "rocks_spool_load_shed_active",
997 &["path"]
998 );
999}
1000
1001declare_metric! {
1002/// Number of background compactions currently running for this
1003/// rocksdb instance.
1004///
1005/// {{since('dev')}}
1006///
1007/// In a healthy, actively-written spool this is typically non-zero
1008/// in bursts. A value persistently stuck at 0 while
1009/// `rocks_spool_compaction_pending` or
1010/// `rocks_spool_estimate_pending_compaction_bytes` is growing is a
1011/// strong indicator that the background worker is wedged --
1012/// cross-reference `rocks_spool_write_stopped` and
1013/// `rocks_spool_background_errors`.
1014static NUM_RUNNING_COMPACTIONS_METRIC: IntGaugeVec(
1015 "rocks_spool_num_running_compactions",
1016 &["path"]
1017 );
1018}
1019
1020declare_metric! {
1021/// Set to 1 when at least one compaction is pending for this rocksdb
1022/// instance, 0 otherwise.
1023///
1024/// {{since('dev')}}
1025///
1026/// Brief flapping is normal under write load. A value of 1 that
1027/// persists alongside `rocks_spool_num_running_compactions == 0` is
1028/// suspicious and suggests the compaction worker is not making
1029/// progress.
1030static COMPACTION_PENDING_METRIC: IntGaugeVec(
1031 "rocks_spool_compaction_pending",
1032 &["path"]
1033 );
1034}
1035
1036declare_metric! {
1037/// Estimated total bytes that compaction needs to rewrite to bring
1038/// all levels back under their target sizes.
1039///
1040/// {{since('dev')}}
1041///
1042/// This is a backlog indicator. Steady-state values depend heavily
1043/// on write rate, compression, and the configured compaction style,
1044/// so absolute thresholds should be derived from each deployment's
1045/// baseline. Unbounded growth over a multi-hour window indicates
1046/// that compaction cannot keep up with the write rate, which
1047/// eventually leads to write slowdown
1048/// (`rocks_spool_actual_delayed_write_rate` becomes non-zero) and
1049/// then to write stop (`rocks_spool_write_stopped` becomes 1).
1050///
1051/// Only meaningful for level-style compaction.
1052static ESTIMATE_PENDING_COMPACTION_BYTES_METRIC: IntGaugeVec(
1053 "rocks_spool_estimate_pending_compaction_bytes",
1054 &["path"]
1055 );
1056}
1057
1058declare_metric! {
1059/// Current delayed write rate (bytes/second) applied by rocksdb to
1060/// throttle foreground writers. 0 means no slowdown is in effect.
1061///
1062/// {{since('dev')}}
1063///
1064/// A non-zero value means rocksdb is intentionally slowing writers
1065/// down because compaction or flush is falling behind. This is the
1066/// early-warning signal that precedes a full write stop: if this
1067/// remains non-zero for an extended period, investigate the
1068/// compaction backlog
1069/// (`rocks_spool_estimate_pending_compaction_bytes`) and underlying
1070/// disk throughput before the database transitions to
1071/// `rocks_spool_write_stopped == 1`.
1072static ACTUAL_DELAYED_WRITE_RATE_METRIC: IntGaugeVec(
1073 "rocks_spool_actual_delayed_write_rate",
1074 &["path"]
1075 );
1076}
1077
1078/// Internal state for the load-shedding latch state machine. See the
1079/// per-tick logic in `metrics_monitor`.
1080struct HealthState {
1081 /// `background-errors` count observed on the first monitor tick.
1082 /// Only growth above this baseline counts toward latching, so a
1083 /// daemon restarted against a DB whose accumulated count is
1084 /// already non-zero does not immediately latch.
1085 initial_bg_errors: u64,
1086 /// `background-errors` count from the previous monitor tick.
1087 /// Used both for once-per-transition logging and to detect
1088 /// quiet windows when deciding whether to auto-unlatch.
1089 prev_bg_errors: u64,
1090 /// Foreground spool error count from the previous monitor tick.
1091 /// The counter itself starts at zero per process, so unlike
1092 /// `initial_bg_errors` there is no separate baseline -- any
1093 /// non-zero observation reflects errors in the current run.
1094 prev_fg_errors: u64,
1095 /// Instant we first observed an unhealthy signal (bg above
1096 /// baseline OR any foreground errors) in the current run.
1097 unhealthy_since: Option<Instant>,
1098 /// Instant of the most recent monitor tick where bg_errors
1099 /// increased over the previous tick.
1100 last_bg_growth_at: Option<Instant>,
1101 /// Instant of the most recent monitor tick where the
1102 /// foreground error counter increased over the previous tick.
1103 last_fg_growth_at: Option<Instant>,
1104 latched: bool,
1105}
1106
1107async fn metrics_monitor(
1108 db: Weak<DB>,
1109 mirror: Weak<AtomicBool>,
1110 foreground_errors: Weak<AtomicU64>,
1111 path: String,
1112 latch_duration: Duration,
1113 unlatch_duration: Duration,
1114 allow_unlatch: bool,
1115) {
1116 let mem_table_total = MEM_TABLE_TOTAL
1117 .get_metric_with_label_values(&[path.as_str()])
1118 .unwrap();
1119 let mem_table_unflushed = MEM_TABLE_UNFLUSHED
1120 .get_metric_with_label_values(&[path.as_str()])
1121 .unwrap();
1122 let mem_table_readers_total = MEM_TABLE_READERS_TOTAL
1123 .get_metric_with_label_values(&[path.as_str()])
1124 .unwrap();
1125 let cache_total = CACHE_TOTAL
1126 .get_metric_with_label_values(&[path.as_str()])
1127 .unwrap();
1128 let background_errors = BACKGROUND_ERRORS_METRIC
1129 .get_metric_with_label_values(&[path.as_str()])
1130 .unwrap();
1131 let write_stopped = WRITE_STOPPED
1132 .get_metric_with_label_values(&[path.as_str()])
1133 .unwrap();
1134 let load_shed_active = LOAD_SHED_ACTIVE
1135 .get_metric_with_label_values(&[path.as_str()])
1136 .unwrap();
1137 let num_running_compactions = NUM_RUNNING_COMPACTIONS_METRIC
1138 .get_metric_with_label_values(&[path.as_str()])
1139 .unwrap();
1140 let compaction_pending = COMPACTION_PENDING_METRIC
1141 .get_metric_with_label_values(&[path.as_str()])
1142 .unwrap();
1143 let estimate_pending_compaction_bytes = ESTIMATE_PENDING_COMPACTION_BYTES_METRIC
1144 .get_metric_with_label_values(&[path.as_str()])
1145 .unwrap();
1146 let actual_delayed_write_rate = ACTUAL_DELAYED_WRITE_RATE_METRIC
1147 .get_metric_with_label_values(&[path.as_str()])
1148 .unwrap();
1149
1150 // Initial bg_errors observation anchors the latch logic against
1151 // pre-existing accumulated errors from prior process lifetimes,
1152 // so a restart against a DB with a historical count does not
1153 // immediately latch. If the DB has already been dropped before
1154 // we get here, exit silently -- there is nothing to monitor.
1155 let Some(db_init) = db.upgrade() else {
1156 return;
1157 };
1158 let initial_bg = property_u64(&db_init, BACKGROUND_ERRORS);
1159 drop(db_init);
1160 let mut state = HealthState {
1161 initial_bg_errors: initial_bg,
1162 prev_bg_errors: initial_bg,
1163 prev_fg_errors: 0,
1164 unhealthy_since: None,
1165 last_bg_growth_at: None,
1166 last_fg_growth_at: None,
1167 latched: false,
1168 };
1169
1170 loop {
1171 match db.upgrade() {
1172 Some(db) => {
1173 match get_memory_usage_stats(Some(&[&db]), None) {
1174 Ok(stats) => {
1175 mem_table_total.set(stats.mem_table_total as i64);
1176 mem_table_unflushed.set(stats.mem_table_unflushed as i64);
1177 mem_table_readers_total.set(stats.mem_table_readers_total as i64);
1178 cache_total.set(stats.cache_total as i64);
1179 }
1180 Err(err) => {
1181 tracing::error!("error getting stats: {err:#}");
1182 }
1183 };
1184
1185 let bg = property_u64(&db, BACKGROUND_ERRORS);
1186 let stopped = property_u64(&db, IS_WRITE_STOPPED);
1187 let compaction_pending_now = property_u64(&db, COMPACTION_PENDING);
1188 let num_running = property_u64(&db, NUM_RUNNING_COMPACTIONS);
1189 background_errors.set(bg as i64);
1190 write_stopped.set(stopped as i64);
1191 num_running_compactions.set(num_running as i64);
1192 compaction_pending.set(compaction_pending_now as i64);
1193 estimate_pending_compaction_bytes
1194 .set(property_u64(&db, ESTIMATE_PENDING_COMPACTION_BYTES) as i64);
1195 actual_delayed_write_rate.set(property_u64(&db, ACTUAL_DELAYED_WRITE_RATE) as i64);
1196
1197 let now = Instant::now();
1198 let fg = foreground_errors
1199 .upgrade()
1200 .map(|c| c.load(Ordering::Relaxed))
1201 .unwrap_or(0);
1202
1203 // The foreground error path may have latched the gate
1204 // directly on a fatal error (Corruption/IOError) since
1205 // our last tick. Reconcile our internal state so the
1206 // unlatch logic sees the gate as latched without
1207 // duplicating the "gate latched" log.
1208 if let Some(m) = mirror.upgrade() {
1209 if m.load(Ordering::Relaxed) && !state.latched {
1210 state.latched = true;
1211 state.unhealthy_since = Some(now);
1212 }
1213 }
1214
1215 if bg > state.prev_bg_errors {
1216 tracing::error!(
1217 "rocksdb at {path}: background error count \
1218 increased from {prev} to {bg}; check the LOG \
1219 file in that directory for details",
1220 prev = state.prev_bg_errors,
1221 );
1222 state.last_bg_growth_at = Some(now);
1223 }
1224 state.prev_bg_errors = bg;
1225
1226 if fg > state.prev_fg_errors {
1227 tracing::error!(
1228 "rocksdb at {path}: foreground spool error count \
1229 increased from {prev} to {fg}; this typically \
1230 indicates a missing or corrupt SST file \
1231 discovered during a read",
1232 prev = state.prev_fg_errors,
1233 );
1234 state.last_fg_growth_at = Some(now);
1235 }
1236 state.prev_fg_errors = fg;
1237
1238 // Latch signal: background errors have grown since this
1239 // process started, OR any foreground errors have been
1240 // observed. We cannot use rocksdb's
1241 // `compaction-pending` or `is-write-stopped` properties
1242 // to refine this -- when paranoid_checks fires,
1243 // rocksdb pauses background scheduling and both
1244 // properties drop to 0 even though the DB is wedged.
1245 // The sustained-for-latch_duration window is what
1246 // filters out brief auto-resumed blips.
1247 let unhealthy_now = bg > state.initial_bg_errors || fg > 0;
1248
1249 if unhealthy_now {
1250 let since = *state.unhealthy_since.get_or_insert(now);
1251 if !state.latched && now.duration_since(since) >= latch_duration {
1252 state.latched = true;
1253 if let Some(m) = mirror.upgrade() {
1254 m.store(true, Ordering::Relaxed);
1255 }
1256 tracing::error!(
1257 "rocksdb at {path}: load-shedding gate latched \
1258 after {latch_duration:?} of sustained background \
1259 errors (accumulated count: {bg}). Ingress paths \
1260 will now reject traffic. Inspect the LOG file \
1261 for the underlying cause.",
1262 );
1263 }
1264 } else if !state.latched {
1265 // Healthy tick before latch: reset the debounce
1266 // window so a future blip gets its full
1267 // latch_duration grace, not a stale baseline
1268 // from an earlier, separate transient.
1269 state.unhealthy_since = None;
1270 }
1271
1272 // Auto-unlatch: neither bg_errors nor fg_errors have
1273 // grown for `unlatch_duration`. This catches
1274 // self-healed transients (one blip, then quiet). It
1275 // does NOT distinguish a self-healed transient from a
1276 // truly wedged DB where compactions have been
1277 // abandoned and simply stopped producing further
1278 // errors. Operators who require a stronger
1279 // guarantee should set `allow_error_unlatch = false`.
1280 if state.latched && allow_unlatch {
1281 let bg_stable_since = state.last_bg_growth_at.unwrap_or(now);
1282 let fg_stable_since = state.last_fg_growth_at.unwrap_or(now);
1283 let stable_since = bg_stable_since.max(fg_stable_since);
1284 if now.duration_since(stable_since) >= unlatch_duration {
1285 // CAS the foreground counter from our tick
1286 // snapshot to 0. This is the synchronization
1287 // point that prevents an auto-unlatch from
1288 // racing with a concurrent
1289 // record_foreground_error: if a fresh fatal
1290 // error landed mid-tick, the CAS fails and we
1291 // defer the unlatch to the next tick.
1292 let cleared = match foreground_errors.upgrade() {
1293 Some(c) => c
1294 .compare_exchange(fg, 0, Ordering::Relaxed, Ordering::Relaxed)
1295 .is_ok(),
1296 // Process is shutting down; skip.
1297 None => false,
1298 };
1299 if cleared {
1300 state.latched = false;
1301 // Re-anchor the baselines at the current
1302 // counts so that we only re-latch on *new*
1303 // growth above this point; otherwise the
1304 // static post-transient counts would keep us
1305 // permanently unhealthy.
1306 state.initial_bg_errors = bg;
1307 state.prev_fg_errors = 0;
1308 state.unhealthy_since = None;
1309 if let Some(m) = mirror.upgrade() {
1310 m.store(false, Ordering::Relaxed);
1311 }
1312 tracing::info!(
1313 "rocksdb at {path}: load-shedding gate cleared \
1314 after {unlatch_duration:?} without new errors \
1315 (bg baseline re-anchored at {bg}); ingress \
1316 paths will accept traffic again",
1317 );
1318 }
1319 }
1320 }
1321 load_shed_active.set(if state.latched { 1 } else { 0 });
1322 }
1323 None => {
1324 // Dead
1325 return;
1326 }
1327 }
1328 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1329 }
1330}