spool/
local_disk.rs

1use crate::{Spool, SpoolEntry, SpoolId};
2use anyhow::Context;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use flume::Sender;
6use std::fs::File;
7use std::io::Write;
8use std::os::fd::AsRawFd;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Instant;
12use tempfile::NamedTempFile;
13use tokio::runtime::Handle;
14
15pub struct LocalDiskSpool {
16    path: PathBuf,
17    flush: bool,
18    _pid_file: File,
19    runtime: Handle,
20}
21
22impl LocalDiskSpool {
23    pub fn new(path: &Path, flush: bool, runtime: Handle) -> anyhow::Result<Self> {
24        let pid_file_path = path.join("lock");
25        let _pid_file = lock_pid_file(pid_file_path)?;
26
27        Self::create_dir_structure(path)?;
28
29        Ok(Self {
30            path: path.to_path_buf(),
31            flush,
32            _pid_file,
33            runtime,
34        })
35    }
36
37    fn create_dir_structure(path: &Path) -> anyhow::Result<()> {
38        std::fs::create_dir_all(path.join("new"))?;
39        std::fs::create_dir_all(path.join("data"))?;
40        Ok(())
41    }
42
43    fn compute_path(&self, id: SpoolId) -> PathBuf {
44        id.compute_path(&self.path.join("data"))
45    }
46
47    fn cleanup_dirs(path: &Path) {
48        let new_dir = path.join("new");
49        for entry in jwalk::WalkDir::new(new_dir) {
50            if let Ok(entry) = entry {
51                if !entry.file_type().is_file() {
52                    continue;
53                }
54                let path = entry.path();
55                if let Err(err) = std::fs::remove_file(&path) {
56                    eprintln!("Failed to remove {path:?}: {err:#}");
57                }
58            }
59        }
60
61        let data_dir = path.join("data");
62        Self::cleanup_data(&data_dir);
63    }
64
65    fn cleanup_data(data_dir: &Path) {
66        for entry in jwalk::WalkDir::new(data_dir) {
67            if let Ok(entry) = entry {
68                if !entry.file_type().is_dir() {
69                    continue;
70                }
71                let path = entry.path();
72                // Speculatively try removing the directory; it will
73                // only succeed if it is empty. We don't need to check
74                // for that first, and we don't care if it fails.
75                std::fs::remove_dir(&path).ok();
76            }
77        }
78    }
79}
80
81#[async_trait]
82impl Spool for LocalDiskSpool {
83    async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>> {
84        let path = self.compute_path(id);
85        tokio::fs::read(&path)
86            .await
87            .with_context(|| format!("failed to load {id} from {path:?}"))
88    }
89
90    async fn remove(&self, id: SpoolId) -> anyhow::Result<()> {
91        let path = self.compute_path(id);
92        tokio::fs::remove_file(&path)
93            .await
94            .with_context(|| format!("failed to remove {id} from {path:?}"))
95    }
96
97    async fn store(
98        &self,
99        id: SpoolId,
100        data: Arc<Box<[u8]>>,
101        force_sync: bool,
102        _deadline: Option<Instant>,
103    ) -> anyhow::Result<()> {
104        let path = self.compute_path(id);
105        let new_dir = self.path.join("new");
106        let flush = force_sync || self.flush;
107        tokio::task::Builder::new()
108            .name("LocalDiskSpool store")
109            .spawn_blocking_on(
110                move || {
111                    let mut temp = NamedTempFile::new_in(new_dir).with_context(|| {
112                        format!("failed to create a temporary file to store {id}")
113                    })?;
114
115                    temp.write_all(&data)
116                        .with_context(|| format!("failed to write data for {id}"))?;
117
118                    if flush {
119                        temp.as_file_mut()
120                            .sync_data()
121                            .with_context(|| format!("failed to sync data for {id}"))?;
122                    }
123
124                    std::fs::create_dir_all(path.parent().unwrap()).with_context(|| {
125                        format!("failed to create dir structure for {id} {path:?}")
126                    })?;
127
128                    temp.persist(&path).with_context(|| {
129                        format!("failed to move temp file for {id} to {path:?}")
130                    })?;
131                    Ok(())
132                },
133                &self.runtime,
134            )?
135            .await?
136    }
137
138    fn enumerate(
139        &self,
140        sender: Sender<SpoolEntry>,
141        start_time: DateTime<Utc>,
142    ) -> anyhow::Result<()> {
143        let path = self.path.clone();
144        tokio::task::Builder::new()
145            .name("LocalDiskSpool enumerate")
146            .spawn_blocking_on(
147                move || -> anyhow::Result<()> {
148                    Self::cleanup_dirs(&path);
149
150                    for entry in jwalk::WalkDir::new(path.join("data")) {
151                        if let Ok(entry) = entry {
152                            if !entry.file_type().is_file() {
153                                continue;
154                            }
155                            let path = entry.path();
156                            if let Some(id) = SpoolId::from_path(&path) {
157                                if id.created() >= start_time {
158                                    // Entries created since we started must have
159                                    // landed there after we started and are thus
160                                    // not eligible for discovery via enumeration
161                                    continue;
162                                }
163                                match std::fs::read(&path) {
164                                    Ok(data) => sender
165                                        .send(SpoolEntry::Item { id, data })
166                                        .map_err(|err| {
167                                            anyhow::anyhow!("failed to send data for {id}: {err:#}")
168                                        })?,
169                                    Err(err) => sender
170                                        .send(SpoolEntry::Corrupt {
171                                            id,
172                                            error: format!("{err:#}"),
173                                        })
174                                        .map_err(|err| {
175                                            anyhow::anyhow!(
176                                                "failed to send SpoolEntry for {id}: {err:#}"
177                                            )
178                                        })?,
179                                };
180                            } else {
181                                eprintln!("{} is not a spool id", path.display());
182                            }
183                        }
184                    }
185                    anyhow::Result::Ok(())
186                },
187                &self.runtime,
188            )?;
189        Ok(())
190    }
191
192    async fn cleanup(&self) -> anyhow::Result<()> {
193        let data_dir = self.path.join("data");
194        Ok(tokio::task::Builder::new()
195            .name("LocalDiskSpool cleanup")
196            .spawn_blocking_on(
197                move || {
198                    Self::cleanup_data(&data_dir);
199                },
200                &self.runtime,
201            )?
202            .await?)
203    }
204
205    async fn shutdown(&self) -> anyhow::Result<()> {
206        Ok(())
207    }
208
209    async fn advise_low_memory(&self) -> anyhow::Result<isize> {
210        Ok(0)
211    }
212}
213
214/// Set the sticky bit on path.
215/// This prevents tmpwatch from removing the lock file.
216pub fn set_sticky_bit(path: &Path) {
217    #[cfg(unix)]
218    {
219        use std::os::unix::fs::PermissionsExt;
220        if let Ok(metadata) = path.metadata() {
221            let mut perms = metadata.permissions();
222            let mode = perms.mode();
223            perms.set_mode(mode | libc::S_ISVTX);
224            let _ = std::fs::set_permissions(path, perms);
225        }
226    }
227
228    #[cfg(windows)]
229    {
230        let _ = path;
231    }
232}
233
234fn lock_pid_file(pid_file: PathBuf) -> anyhow::Result<std::fs::File> {
235    let pid_file_dir = pid_file
236        .parent()
237        .ok_or_else(|| anyhow::anyhow!("{} has no parent?", pid_file.display()))?;
238    std::fs::create_dir_all(pid_file_dir).with_context(|| {
239        format!(
240            "while creating directory structure: {}",
241            pid_file_dir.display()
242        )
243    })?;
244
245    #[allow(clippy::suspicious_open_options)]
246    let mut file = std::fs::OpenOptions::new()
247        .create(true)
248        .write(true)
249        .open(&pid_file)
250        .with_context(|| format!("opening pid file {}", pid_file.display()))?;
251    set_sticky_bit(&pid_file);
252    let res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
253    if res != 0 {
254        let err = std::io::Error::last_os_error();
255
256        let owner = match std::fs::read_to_string(&pid_file) {
257            Ok(pid) => format!(". Owned by pid {}.", pid.trim()),
258            Err(_) => "".to_string(),
259        };
260
261        anyhow::bail!(
262            "unable to lock pid file {}: {}{owner}",
263            pid_file.display(),
264            err
265        );
266    }
267
268    unsafe { libc::ftruncate(file.as_raw_fd(), 0) };
269    writeln!(file, "{}", unsafe { libc::getpid() }).ok();
270
271    Ok(file)
272}
273
274#[cfg(test)]
275mod test {
276    use super::*;
277
278    #[tokio::test]
279    async fn basic_spool() -> anyhow::Result<()> {
280        let location = tempfile::tempdir()?;
281        let spool = LocalDiskSpool::new(location.path(), false, Handle::current())?;
282        let data_dir = location.path().join("data");
283
284        {
285            let id1 = SpoolId::new();
286            let id1_path = id1.compute_path(&data_dir).display().to_string();
287
288            // Can't load an entry that doesn't exist
289            assert_eq!(
290                format!("{:#}", spool.load(id1).await.unwrap_err()),
291                format!(
292                    "failed to load {id1} from \"{id1_path}\": \
293                    No such file or directory (os error 2)"
294                )
295            );
296        }
297
298        // Insert some entries
299        let mut ids = vec![];
300        for i in 0..100 {
301            let id = SpoolId::new();
302            spool
303                .store(
304                    id,
305                    Arc::new(format!("I am {i}").as_bytes().to_vec().into_boxed_slice()),
306                    false,
307                    None,
308                )
309                .await?;
310            ids.push(id);
311        }
312
313        // Verify that we can load those entries
314        for (i, &id) in ids.iter().enumerate() {
315            let data = spool.load(id).await?;
316            let text = String::from_utf8(data)?;
317            assert_eq!(text, format!("I am {i}"));
318        }
319
320        {
321            // Verify that we can enumerate them
322            let (tx, rx) = flume::bounded(32);
323            spool.enumerate(tx, Utc::now())?;
324            let mut count = 0;
325
326            while let Ok(item) = rx.recv_async().await {
327                match item {
328                    SpoolEntry::Item { id, data } => {
329                        let i = ids
330                            .iter()
331                            .position(|&item| item == id)
332                            .ok_or_else(|| anyhow::anyhow!("{id} not found in ids!"))?;
333
334                        let text = String::from_utf8(data)?;
335                        assert_eq!(text, format!("I am {i}"));
336
337                        spool.remove(id).await?;
338                        // Can't load an entry that we just removed
339                        let id_path = id.compute_path(&data_dir).display().to_string();
340                        assert_eq!(
341                            format!("{:#}", spool.load(id).await.unwrap_err()),
342                            format!(
343                                "failed to load {id} from \"{id_path}\": \
344                                No such file or directory (os error 2)"
345                            )
346                        );
347                        count += 1;
348                    }
349                    SpoolEntry::Corrupt { id, error } => {
350                        anyhow::bail!("Corrupt: {id}: {error}");
351                    }
352                }
353            }
354
355            assert_eq!(count, 100);
356        }
357
358        // Now that we've removed the files, try enumerating again.
359        // We expect to receive no entries.
360        // Do it a couple of times to verify that none of the cleanup
361        // stuff that happens in enumerate breaks the directory
362        // structure
363        for _ in 0..2 {
364            // Verify that we can enumerate them
365            let (tx, rx) = flume::bounded(32);
366            spool.enumerate(tx, Utc::now())?;
367            let mut unexpected = vec![];
368
369            while let Ok(item) = rx.recv_async().await {
370                match item {
371                    SpoolEntry::Item { id, .. } | SpoolEntry::Corrupt { id, .. } => {
372                        unexpected.push(id)
373                    }
374                }
375            }
376
377            assert_eq!(unexpected.len(), 0);
378        }
379
380        Ok(())
381    }
382}