1use crate::{Spool, SpoolEntry, SpoolId};
2use anyhow::Context;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use flume::Sender;
6use std::fs::File;
7use std::io::Write;
8use std::os::fd::AsRawFd;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Instant;
12use tempfile::NamedTempFile;
13use tokio::runtime::Handle;
14
15pub struct LocalDiskSpool {
16 path: PathBuf,
17 flush: bool,
18 _pid_file: File,
19 runtime: Handle,
20}
21
22impl LocalDiskSpool {
23 pub fn new(path: &Path, flush: bool, runtime: Handle) -> anyhow::Result<Self> {
24 let pid_file_path = path.join("lock");
25 let _pid_file = lock_pid_file(pid_file_path)?;
26
27 Self::create_dir_structure(path)?;
28
29 Ok(Self {
30 path: path.to_path_buf(),
31 flush,
32 _pid_file,
33 runtime,
34 })
35 }
36
37 fn create_dir_structure(path: &Path) -> anyhow::Result<()> {
38 std::fs::create_dir_all(path.join("new"))?;
39 std::fs::create_dir_all(path.join("data"))?;
40 Ok(())
41 }
42
43 fn compute_path(&self, id: SpoolId) -> PathBuf {
44 id.compute_path(&self.path.join("data"))
45 }
46
47 fn cleanup_dirs(path: &Path) {
48 let new_dir = path.join("new");
49 for entry in jwalk::WalkDir::new(new_dir) {
50 if let Ok(entry) = entry {
51 if !entry.file_type().is_file() {
52 continue;
53 }
54 let path = entry.path();
55 if let Err(err) = std::fs::remove_file(&path) {
56 eprintln!("Failed to remove {path:?}: {err:#}");
57 }
58 }
59 }
60
61 let data_dir = path.join("data");
62 Self::cleanup_data(&data_dir);
63 }
64
65 fn cleanup_data(data_dir: &Path) {
66 for entry in jwalk::WalkDir::new(data_dir) {
67 if let Ok(entry) = entry {
68 if !entry.file_type().is_dir() {
69 continue;
70 }
71 let path = entry.path();
72 std::fs::remove_dir(&path).ok();
76 }
77 }
78 }
79}
80
81#[async_trait]
82impl Spool for LocalDiskSpool {
83 async fn load(&self, id: SpoolId) -> anyhow::Result<Vec<u8>> {
84 let path = self.compute_path(id);
85 tokio::fs::read(&path)
86 .await
87 .with_context(|| format!("failed to load {id} from {path:?}"))
88 }
89
90 async fn remove(&self, id: SpoolId) -> anyhow::Result<()> {
91 let path = self.compute_path(id);
92 tokio::fs::remove_file(&path)
93 .await
94 .with_context(|| format!("failed to remove {id} from {path:?}"))
95 }
96
97 async fn store(
98 &self,
99 id: SpoolId,
100 data: Arc<Box<[u8]>>,
101 force_sync: bool,
102 _deadline: Option<Instant>,
103 ) -> anyhow::Result<()> {
104 let path = self.compute_path(id);
105 let new_dir = self.path.join("new");
106 let flush = force_sync || self.flush;
107 tokio::task::Builder::new()
108 .name("LocalDiskSpool store")
109 .spawn_blocking_on(
110 move || {
111 let mut temp = NamedTempFile::new_in(new_dir).with_context(|| {
112 format!("failed to create a temporary file to store {id}")
113 })?;
114
115 temp.write_all(&data)
116 .with_context(|| format!("failed to write data for {id}"))?;
117
118 if flush {
119 temp.as_file_mut()
120 .sync_data()
121 .with_context(|| format!("failed to sync data for {id}"))?;
122 }
123
124 std::fs::create_dir_all(path.parent().unwrap()).with_context(|| {
125 format!("failed to create dir structure for {id} {path:?}")
126 })?;
127
128 temp.persist(&path).with_context(|| {
129 format!("failed to move temp file for {id} to {path:?}")
130 })?;
131 Ok(())
132 },
133 &self.runtime,
134 )?
135 .await?
136 }
137
138 fn enumerate(
139 &self,
140 sender: Sender<SpoolEntry>,
141 start_time: DateTime<Utc>,
142 ) -> anyhow::Result<()> {
143 let path = self.path.clone();
144 tokio::task::Builder::new()
145 .name("LocalDiskSpool enumerate")
146 .spawn_blocking_on(
147 move || -> anyhow::Result<()> {
148 Self::cleanup_dirs(&path);
149
150 for entry in jwalk::WalkDir::new(path.join("data")) {
151 if let Ok(entry) = entry {
152 if !entry.file_type().is_file() {
153 continue;
154 }
155 let path = entry.path();
156 if let Some(id) = SpoolId::from_path(&path) {
157 if id.created() >= start_time {
158 continue;
162 }
163 match std::fs::read(&path) {
164 Ok(data) => sender
165 .send(SpoolEntry::Item { id, data })
166 .map_err(|err| {
167 anyhow::anyhow!("failed to send data for {id}: {err:#}")
168 })?,
169 Err(err) => sender
170 .send(SpoolEntry::Corrupt {
171 id,
172 error: format!("{err:#}"),
173 })
174 .map_err(|err| {
175 anyhow::anyhow!(
176 "failed to send SpoolEntry for {id}: {err:#}"
177 )
178 })?,
179 };
180 } else {
181 eprintln!("{} is not a spool id", path.display());
182 }
183 }
184 }
185 anyhow::Result::Ok(())
186 },
187 &self.runtime,
188 )?;
189 Ok(())
190 }
191
192 async fn cleanup(&self) -> anyhow::Result<()> {
193 let data_dir = self.path.join("data");
194 Ok(tokio::task::Builder::new()
195 .name("LocalDiskSpool cleanup")
196 .spawn_blocking_on(
197 move || {
198 Self::cleanup_data(&data_dir);
199 },
200 &self.runtime,
201 )?
202 .await?)
203 }
204
205 async fn shutdown(&self) -> anyhow::Result<()> {
206 Ok(())
207 }
208
209 async fn advise_low_memory(&self) -> anyhow::Result<isize> {
210 Ok(0)
211 }
212}
213
214pub fn set_sticky_bit(path: &Path) {
217 #[cfg(unix)]
218 {
219 use std::os::unix::fs::PermissionsExt;
220 if let Ok(metadata) = path.metadata() {
221 let mut perms = metadata.permissions();
222 let mode = perms.mode();
223 perms.set_mode(mode | libc::S_ISVTX);
224 let _ = std::fs::set_permissions(path, perms);
225 }
226 }
227
228 #[cfg(windows)]
229 {
230 let _ = path;
231 }
232}
233
234fn lock_pid_file(pid_file: PathBuf) -> anyhow::Result<std::fs::File> {
235 let pid_file_dir = pid_file
236 .parent()
237 .ok_or_else(|| anyhow::anyhow!("{} has no parent?", pid_file.display()))?;
238 std::fs::create_dir_all(pid_file_dir).with_context(|| {
239 format!(
240 "while creating directory structure: {}",
241 pid_file_dir.display()
242 )
243 })?;
244
245 #[allow(clippy::suspicious_open_options)]
246 let mut file = std::fs::OpenOptions::new()
247 .create(true)
248 .write(true)
249 .open(&pid_file)
250 .with_context(|| format!("opening pid file {}", pid_file.display()))?;
251 set_sticky_bit(&pid_file);
252 let res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
253 if res != 0 {
254 let err = std::io::Error::last_os_error();
255
256 let owner = match std::fs::read_to_string(&pid_file) {
257 Ok(pid) => format!(". Owned by pid {}.", pid.trim()),
258 Err(_) => "".to_string(),
259 };
260
261 anyhow::bail!(
262 "unable to lock pid file {}: {}{owner}",
263 pid_file.display(),
264 err
265 );
266 }
267
268 unsafe { libc::ftruncate(file.as_raw_fd(), 0) };
269 writeln!(file, "{}", unsafe { libc::getpid() }).ok();
270
271 Ok(file)
272}
273
274#[cfg(test)]
275mod test {
276 use super::*;
277
278 #[tokio::test]
279 async fn basic_spool() -> anyhow::Result<()> {
280 let location = tempfile::tempdir()?;
281 let spool = LocalDiskSpool::new(location.path(), false, Handle::current())?;
282 let data_dir = location.path().join("data");
283
284 {
285 let id1 = SpoolId::new();
286 let id1_path = id1.compute_path(&data_dir).display().to_string();
287
288 assert_eq!(
290 format!("{:#}", spool.load(id1).await.unwrap_err()),
291 format!(
292 "failed to load {id1} from \"{id1_path}\": \
293 No such file or directory (os error 2)"
294 )
295 );
296 }
297
298 let mut ids = vec![];
300 for i in 0..100 {
301 let id = SpoolId::new();
302 spool
303 .store(
304 id,
305 Arc::new(format!("I am {i}").as_bytes().to_vec().into_boxed_slice()),
306 false,
307 None,
308 )
309 .await?;
310 ids.push(id);
311 }
312
313 for (i, &id) in ids.iter().enumerate() {
315 let data = spool.load(id).await?;
316 let text = String::from_utf8(data)?;
317 assert_eq!(text, format!("I am {i}"));
318 }
319
320 {
321 let (tx, rx) = flume::bounded(32);
323 spool.enumerate(tx, Utc::now())?;
324 let mut count = 0;
325
326 while let Ok(item) = rx.recv_async().await {
327 match item {
328 SpoolEntry::Item { id, data } => {
329 let i = ids
330 .iter()
331 .position(|&item| item == id)
332 .ok_or_else(|| anyhow::anyhow!("{id} not found in ids!"))?;
333
334 let text = String::from_utf8(data)?;
335 assert_eq!(text, format!("I am {i}"));
336
337 spool.remove(id).await?;
338 let id_path = id.compute_path(&data_dir).display().to_string();
340 assert_eq!(
341 format!("{:#}", spool.load(id).await.unwrap_err()),
342 format!(
343 "failed to load {id} from \"{id_path}\": \
344 No such file or directory (os error 2)"
345 )
346 );
347 count += 1;
348 }
349 SpoolEntry::Corrupt { id, error } => {
350 anyhow::bail!("Corrupt: {id}: {error}");
351 }
352 }
353 }
354
355 assert_eq!(count, 100);
356 }
357
358 for _ in 0..2 {
364 let (tx, rx) = flume::bounded(32);
366 spool.enumerate(tx, Utc::now())?;
367 let mut unexpected = vec![];
368
369 while let Ok(item) = rx.recv_async().await {
370 match item {
371 SpoolEntry::Item { id, .. } | SpoolEntry::Corrupt { id, .. } => {
372 unexpected.push(id)
373 }
374 }
375 }
376
377 assert_eq!(unexpected.len(), 0);
378 }
379
380 Ok(())
381 }
382}