spool/lib.rs
1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use flume::Sender;
4use std::sync::{Arc, OnceLock};
5use std::time::{Duration, Instant};
6
7pub mod local_disk;
8#[cfg(feature = "rocksdb")]
9pub mod rocks;
10pub mod spool_id;
11
12pub use spool_id::SpoolId;
13
14/// The spool's load-shedding gate is currently latched. Callers can
15/// recognize this via `anyhow::Error::root_cause` (e.g. in the SMTP
16/// server) to produce a peer-safe, actionable response instead of a
17/// generic internal-error message. The `Display` impl intentionally
18/// matches the bland summary returned by `Spool::unhealthy_reason()`
19/// so that the wire-facing text is identical regardless of which
20/// layer observes the condition.
21#[derive(thiserror::Error, Debug)]
22#[error("the spool is not accepting writes")]
23pub struct SpoolUnhealthyError;
24
25/// The deadline supplied by the caller (e.g. an SMTP
26/// `data_processing_timeout`) elapsed before the spool accepted the
27/// write. Distinguished from [`SpoolBackpressureTimeout`] so that
28/// the SMTP server can surface a peer response that accurately
29/// blames the caller-side deadline.
30#[derive(thiserror::Error, Debug)]
31#[error("the caller-provided deadline was reached before the spool accepted the write")]
32pub struct SpoolCallerDeadlineExceeded;
33
34/// The spool's own internal backpressure deadline (see
35/// `RocksSpoolParams::store_deadline`) elapsed before the write was
36/// accepted. Indicates that the spool itself is the slow component,
37/// independently of any caller-supplied deadline; distinguished from
38/// [`SpoolCallerDeadlineExceeded`] so the SMTP server can produce a
39/// peer response that points the operator at spool health rather
40/// than at their own timeout configuration.
41#[derive(thiserror::Error, Debug)]
42#[error("the spool did not accept the write within {deadline:?}")]
43pub struct SpoolBackpressureTimeout {
44 pub deadline: Duration,
45}
46
47#[derive(Debug)]
48pub enum SpoolEntry {
49 Item { id: SpoolId, data: Vec<u8> },
50 Corrupt { id: SpoolId, error: String },
51}
52
53#[async_trait]
54pub trait Spool: Send + Sync {
55 /// Load the data corresponding to the provided Id
56 async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>>;
57
58 /// Remove the data associated with the provided Id
59 async fn remove(&self, id: SpoolId) -> anyhow::Result<()>;
60
61 /// Write/Replace the data associated with the provided Id
62 async fn store(
63 &self,
64 id: SpoolId,
65 data: Arc<Box<[u8]>>,
66 force_sync: bool,
67 deadline: Option<Instant>,
68 ) -> anyhow::Result<()>;
69
70 /// Scan the contents of the spool, and emit a SpoolEntry for each item
71 /// to the provided channel sender.
72 /// The items are enumerated in an unspecified order.
73 /// It is recommended that you use a bounded channel.
74 ///
75 /// The results are undefined if you enumerate concurrently with
76 /// load/remove/store operations.
77 fn enumerate(
78 &self,
79 sender: Sender<SpoolEntry>,
80 start_time: DateTime<Utc>,
81 ) -> anyhow::Result<()>;
82
83 /// Perform some periodic cleanup/maintenance
84 async fn cleanup(&self) -> anyhow::Result<()>;
85
86 /// Shutdown the store
87 async fn shutdown(&self) -> anyhow::Result<()>;
88
89 /// Called when system memory is low.
90 /// The spool module should flush and drop caches.
91 /// Returns the number of bytes that were saved,
92 /// which might be negative if the flush actually
93 /// increased the total.
94 async fn advise_low_memory(&self) -> anyhow::Result<isize>;
95
96 /// Synchronously flush in-memory buffers and run a full compaction
97 /// of the underlying storage.
98 ///
99 /// Intended for operational diagnostics and for tests that need to
100 /// drive the storage into a deterministic state. Storage backends
101 /// that do not have any concept of compaction may leave the default
102 /// no-op implementation in place.
103 ///
104 /// Errors are propagated to the caller. In particular, for rocksdb,
105 /// a missing or corrupt SST file encountered during the operation
106 /// surfaces as an `Err`.
107 async fn compact(&self) -> anyhow::Result<()> {
108 Ok(())
109 }
110
111 /// Returns `None` when the spool is healthy.
112 /// Returns `Some(reason)` when the spool is in a state that should
113 /// cause ingress paths to shed load.
114 ///
115 /// This is called from hot load-shedding paths and must be cheap:
116 /// no I/O, no awaits, no allocation. The returned reason is the
117 /// externally visible explanation and should be intentionally
118 /// bland; it must not leak implementation details.
119 fn unhealthy_reason(&self) -> Option<&'static str> {
120 None
121 }
122}
123
124static DATA: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
125static META: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
126
127pub fn get_meta_spool() -> &'static Arc<dyn Spool + Send + Sync> {
128 META.get().expect("set_meta_spool has not been called")
129}
130
131pub fn get_data_spool() -> &'static Arc<dyn Spool + Send + Sync> {
132 DATA.get().expect("set_data_spool has not been called")
133}
134
135pub fn set_meta_spool(meta: Arc<dyn Spool + Send + Sync>) {
136 META.set(meta)
137 .map_err(|_| "set_meta_spool has already been called")
138 .unwrap();
139}
140
141pub fn set_data_spool(data: Arc<dyn Spool + Send + Sync>) {
142 DATA.set(data)
143 .map_err(|_| "set_data_spool has already been called")
144 .unwrap();
145}