spool/
spool_id.rs

1use chrono::{DateTime, Duration, TimeZone, Utc};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::sync::LazyLock;
5use uuid::{ClockSequence, Context, Timestamp, Uuid};
6
7/// Identifies a message within the spool of its host node.
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
9#[serde(into = "String", try_from = "String")]
10#[derive(utoipa::ToSchema)]
11#[schema(value_type=String, example="d7ef132b5d7711eea8c8000c29c33806")]
12pub struct SpoolId(Uuid);
13
14impl std::fmt::Display for SpoolId {
15    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
16        self.0.simple().fmt(fmt)
17    }
18}
19
20impl From<Uuid> for SpoolId {
21    fn from(uuid: Uuid) -> Self {
22        Self(uuid)
23    }
24}
25
26impl From<SpoolId> for String {
27    fn from(id: SpoolId) -> String {
28        id.to_string()
29    }
30}
31
32impl TryFrom<String> for SpoolId {
33    type Error = uuid::Error;
34
35    fn try_from(s: String) -> Result<Self, Self::Error> {
36        let uuid = Uuid::parse_str(&s)?;
37        Ok(Self(uuid))
38    }
39}
40
41impl Default for SpoolId {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl SpoolId {
48    pub fn new() -> Self {
49        // We're using v1, but we should be able to seamlessly upgrade to v7
50        // once that feature stabilizes in the uuid crate
51        Self(uuid_helper::now_v1())
52    }
53
54    pub fn compute_path(&self, in_dir: &Path) -> PathBuf {
55        let (a, b, c, [d, e, f, g, h, i, j, k]) = self.0.as_fields();
56        // Note that in a v1 UUID, a,b,c holds the timestamp components
57        // from least-significant up to most significant.
58        let [a1, a2, a3, a4] = a.to_be_bytes();
59        let name = format!(
60            "{a1:02x}/{a2:02x}/{a3:02x}/{a4:02x}/{b:04x}{c:04x}{d:02x}{e:02x}{f:02x}{g:02x}{h:02x}{i:02x}{j:02x}{k:02x}"
61        );
62        in_dir.join(name)
63    }
64
65    pub fn as_bytes(&self) -> &[u8; 16] {
66        self.0.as_bytes()
67    }
68
69    pub fn from_slice(s: &[u8]) -> Option<Self> {
70        let uuid = Uuid::from_slice(s).ok()?;
71        Some(Self(uuid))
72    }
73
74    pub fn from_ascii_bytes(s: &[u8]) -> Option<Self> {
75        let uuid = Uuid::try_parse_ascii(s).ok()?;
76        Some(Self(uuid))
77    }
78
79    #[allow(clippy::should_implement_trait)]
80    pub fn from_str(s: &str) -> Option<Self> {
81        let uuid = Uuid::parse_str(s).ok()?;
82        Some(Self(uuid))
83    }
84
85    pub fn from_path(mut path: &Path) -> Option<Self> {
86        let mut components = vec![];
87
88        for _ in 0..5 {
89            components.push(path.file_name()?.to_str()?);
90            path = path.parent()?;
91        }
92
93        components.reverse();
94        Some(Self(Uuid::parse_str(&components.join("")).ok()?))
95    }
96
97    /// Returns time elapsed since the id was created,
98    /// given the current timestamp
99    pub fn age(&self, now: DateTime<Utc>) -> Duration {
100        let created = self.created();
101        now - created
102    }
103
104    pub fn created(&self) -> DateTime<Utc> {
105        let (seconds, nanos) = self.0.get_timestamp().unwrap().to_unix();
106        Utc.timestamp_opt(seconds.try_into().unwrap(), nanos)
107            .unwrap()
108    }
109
110    /// Assuming that self is a SpoolId received from some other node,
111    /// this method produces a new SpoolId with the information
112    /// from the local node, but with the timestamp from the source
113    /// spool id.
114    /// The intent is to reduces the chances of having multiple
115    /// messages with the same spool id live on a system in the
116    /// case of a misconfiguration that produces a loop.
117    pub fn derive_new_with_cloned_timestamp(&self) -> Self {
118        let ts = self.0.get_timestamp().unwrap();
119
120        let candidate = Self(uuid_helper::new_v1(ts));
121
122        if candidate != *self {
123            return candidate;
124        }
125
126        // There's a conflict; try to avoid it by working
127        // through a sequence that increments a shared, initially
128        // randomized, counter.
129        // If we do have a routing loop then at least
130        // we stand some chance of avoiding re-using
131        // the same spoolid, but it isn't totally foolproof.
132
133        // Note: Context is only suitable for V1 uuids,
134        // which is what we're using here.
135        static CONTEXT: LazyLock<Context> = LazyLock::new(Context::new_random);
136
137        let (mut seconds, mut subsec_nanos) = ts.to_gregorian();
138        loop {
139            let (counter, secs, nanos) =
140                CONTEXT.generate_timestamp_sequence(seconds, subsec_nanos.into());
141            seconds = secs;
142            subsec_nanos = nanos as u16;
143
144            let ts = Timestamp::from_unix_time(
145                seconds,
146                subsec_nanos.into(),
147                counter.into(),
148                CONTEXT.usable_bits() as u8,
149            );
150
151            let candidate = Self(uuid_helper::new_v1(ts));
152
153            if candidate != *self {
154                return candidate;
155            }
156        }
157    }
158}
159
160#[cfg(test)]
161mod test {
162    use super::*;
163
164    #[test]
165    fn roundtrip_path() {
166        let id = SpoolId::new();
167        eprintln!("{id}");
168        let path = id.compute_path(Path::new("."));
169        let id2 = SpoolId::from_path(&path).unwrap();
170        assert_eq!(id, id2);
171    }
172
173    #[test]
174    fn roundtrip_bytes() {
175        let id = SpoolId::new();
176        eprintln!("{id}");
177        let bytes = id.as_bytes();
178        let id2 = SpoolId::from_slice(bytes.as_slice()).unwrap();
179        assert_eq!(id, id2);
180    }
181}