spool/
lib.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use flume::Sender;
4use std::sync::{Arc, OnceLock};
5use std::time::Instant;
6
7pub mod local_disk;
8#[cfg(feature = "rocksdb")]
9pub mod rocks;
10pub mod spool_id;
11
12pub use spool_id::SpoolId;
13
14#[derive(Debug)]
15pub enum SpoolEntry {
16    Item { id: SpoolId, data: Vec<u8> },
17    Corrupt { id: SpoolId, error: String },
18}
19
20#[async_trait]
21pub trait Spool: Send + Sync {
22    /// Load the data corresponding to the provided Id
23    async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>>;
24
25    /// Remove the data associated with the provided Id
26    async fn remove(&self, id: SpoolId) -> anyhow::Result<()>;
27
28    /// Write/Replace the data associated with the provided Id
29    async fn store(
30        &self,
31        id: SpoolId,
32        data: Arc<Box<[u8]>>,
33        force_sync: bool,
34        deadline: Option<Instant>,
35    ) -> anyhow::Result<()>;
36
37    /// Scan the contents of the spool, and emit a SpoolEntry for each item
38    /// to the provided channel sender.
39    /// The items are enumerated in an unspecified order.
40    /// It is recommended that you use a bounded channel.
41    ///
42    /// The results are undefined if you enumerate concurrently with
43    /// load/remove/store operations.
44    fn enumerate(
45        &self,
46        sender: Sender<SpoolEntry>,
47        start_time: DateTime<Utc>,
48    ) -> anyhow::Result<()>;
49
50    /// Perform some periodic cleanup/maintenance
51    async fn cleanup(&self) -> anyhow::Result<()>;
52
53    /// Shutdown the store
54    async fn shutdown(&self) -> anyhow::Result<()>;
55
56    /// Called when system memory is low.
57    /// The spool module should flush and drop caches.
58    /// Returns the number of bytes that were saved,
59    /// which might be negative if the flush actually
60    /// increased the total.
61    async fn advise_low_memory(&self) -> anyhow::Result<isize>;
62}
63
64static DATA: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
65static META: OnceLock<Arc<dyn Spool + Send + Sync>> = OnceLock::new();
66
67pub fn get_meta_spool() -> &'static Arc<dyn Spool + Send + Sync> {
68    META.get().expect("set_meta_spool has not been called")
69}
70
71pub fn get_data_spool() -> &'static Arc<dyn Spool + Send + Sync> {
72    DATA.get().expect("set_data_spool has not been called")
73}
74
75pub fn set_meta_spool(meta: Arc<dyn Spool + Send + Sync>) {
76    META.set(meta)
77        .map_err(|_| "set_meta_spool has already been called")
78        .unwrap();
79}
80
81pub fn set_data_spool(data: Arc<dyn Spool + Send + Sync>) {
82    DATA.set(data)
83        .map_err(|_| "set_data_spool has already been called")
84        .unwrap();
85}