spool/
lib.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use flume::Sender;
4use std::sync::{Arc, OnceLock};
5use std::time::{Duration, Instant};
6
7pub mod local_disk;
8#[cfg(feature = "rocksdb")]
9pub mod rocks;
10pub mod spool_id;
11
12pub use spool_id::SpoolId;
13
14/// The spool's load-shedding gate is currently latched.  Callers can
15/// recognize this via `anyhow::Error::root_cause` (e.g. in the SMTP
16/// server) to produce a peer-safe, actionable response instead of a
17/// generic internal-error message.  The `Display` impl intentionally
18/// matches the bland summary returned by `Spool::unhealthy_reason()`
19/// so that the wire-facing text is identical regardless of which
20/// layer observes the condition.
21#[derive(thiserror::Error, Debug)]
22#[error("the spool is not accepting writes")]
23pub struct SpoolUnhealthyError;
24
25/// The deadline supplied by the caller (e.g. an SMTP
26/// `data_processing_timeout`) elapsed before the spool accepted the
27/// write.  Distinguished from [`SpoolBackpressureTimeout`] so that
28/// the SMTP server can surface a peer response that accurately
29/// blames the caller-side deadline.
30#[derive(thiserror::Error, Debug)]
31#[error("the caller-provided deadline was reached before the spool accepted the write")]
32pub struct SpoolCallerDeadlineExceeded;
33
34/// The spool's own internal backpressure deadline (see
35/// `RocksSpoolParams::store_deadline`) elapsed before the write was
36/// accepted.  Indicates that the spool itself is the slow component,
37/// independently of any caller-supplied deadline; distinguished from
38/// [`SpoolCallerDeadlineExceeded`] so the SMTP server can produce a
39/// peer response that points the operator at spool health rather
40/// than at their own timeout configuration.
41#[derive(thiserror::Error, Debug)]
42#[error("the spool did not accept the write within {deadline:?}")]
43pub struct SpoolBackpressureTimeout {
44    pub deadline: Duration,
45}
46
47#[derive(Debug)]
48pub enum SpoolEntry {
49    Item { id: SpoolId, data: Vec<u8> },
50    Corrupt { id: SpoolId, error: String },
51}
52
53#[async_trait]
54pub trait Spool: Send + Sync {
55    /// Load the data corresponding to the provided Id
56    async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>>;
57
58    /// Remove the data associated with the provided Id
59    async fn remove(&self, id: SpoolId) -> anyhow::Result<()>;
60
61    /// Write/Replace the data associated with the provided Id
62    async fn store(
63        &self,
64        id: SpoolId,
65        data: Arc<Box<[u8]>>,
66        force_sync: bool,
67        deadline: Option<Instant>,
68    ) -> anyhow::Result<()>;
69
70    /// Scan the contents of the spool, and emit a SpoolEntry for each item
71    /// to the provided channel sender.
72    /// The items are enumerated in an unspecified order.
73    /// It is recommended that you use a bounded channel.
74    ///
75    /// The results are undefined if you enumerate concurrently with
76    /// load/remove/store operations.
77    fn enumerate(
78        &self,
79        sender: Sender<SpoolEntry>,
80        start_time: DateTime<Utc>,
81    ) -> anyhow::Result<()>;
82
83    /// Perform some periodic cleanup/maintenance
84    async fn cleanup(&self) -> anyhow::Result<()>;
85
86    /// Shutdown the store
87    async fn shutdown(&self) -> anyhow::Result<()>;
88
89    /// Called when system memory is low.
90    /// The spool module should flush and drop caches.
91    /// Returns the number of bytes that were saved,
92    /// which might be negative if the flush actually
93    /// increased the total.
94    async fn advise_low_memory(&self) -> anyhow::Result<isize>;
95
96    /// Synchronously flush in-memory buffers and run a full compaction
97    /// of the underlying storage.
98    ///
99    /// Intended for operational diagnostics and for tests that need to
100    /// drive the storage into a deterministic state.  Storage backends
101    /// that do not have any concept of compaction may leave the default
102    /// no-op implementation in place.
103    ///
104    /// Errors are propagated to the caller.  In particular, for rocksdb,
105    /// a missing or corrupt SST file encountered during the operation
106    /// surfaces as an `Err`.
107    async fn compact(&self) -> anyhow::Result<()> {
108        Ok(())
109    }
110
111    /// Returns `None` when the spool is healthy.
112    /// Returns `Some(reason)` when the spool is in a state that should
113    /// cause ingress paths to shed load.
114    ///
115    /// This is called from hot load-shedding paths and must be cheap:
116    /// no I/O, no awaits, no allocation.  The returned reason is the
117    /// externally visible explanation and should be intentionally
118    /// bland; it must not leak implementation details.
119    fn unhealthy_reason(&self) -> Option<&'static str> {
120        None
121    }
122}
123
124static DATA: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
125static META: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
126
127pub fn get_meta_spool() -> &'static Arc<dyn Spool + Send + Sync> {
128    META.get().expect("set_meta_spool has not been called")
129}
130
131pub fn get_data_spool() -> &'static Arc<dyn Spool + Send + Sync> {
132    DATA.get().expect("set_data_spool has not been called")
133}
134
135pub fn set_meta_spool(meta: Arc<dyn Spool + Send + Sync>) {
136    META.set(meta)
137        .map_err(|_| "set_meta_spool has already been called")
138        .unwrap();
139}
140
141pub fn set_data_spool(data: Arc<dyn Spool + Send + Sync>) {
142    DATA.set(data)
143        .map_err(|_| "set_data_spool has already been called")
144        .unwrap();
145}