message/
xfer.rs

1use crate::{EnvelopeAddress, Message};
2use serde::{Deserialize, Serialize};
3use spool::SpoolId;
4
5/// This is a wire format, so changes need to
6/// be appropriately backwards/forwards compatible
7/// or placed into a separate struct with runtime
8/// handling for version mismatches.
9#[derive(Debug, Serialize, Deserialize, Clone)]
10struct MetaDataV1 {
11    id: SpoolId,
12    sender: EnvelopeAddress,
13    recipient: Vec<EnvelopeAddress>,
14    meta: serde_json::Value,
15}
16
17impl Message {
18    pub async fn serialize_for_xfer(
19        &self,
20        additional_meta: serde_json::Value,
21    ) -> anyhow::Result<Vec<u8>> {
22        self.load_data_if_needed().await?;
23        let id = *self.id();
24        let data = self.get_data();
25        let mut meta = self.clone_meta_data().await?;
26
27        if let serde_json::Value::Object(src) = additional_meta {
28            if let Some(obj) = meta.meta.as_object_mut() {
29                for (k, v) in src {
30                    obj.insert(k, v);
31                }
32            }
33        }
34
35        let meta = MetaDataV1 {
36            id,
37            sender: meta.sender,
38            recipient: meta.recipient,
39            meta: meta.meta,
40        };
41
42        let serialized_meta = serde_json::to_string(&meta)?;
43
44        let mut result: Vec<u8> = serialized_meta.into();
45        result.push(b'\n');
46        result.extend_from_slice(&data);
47
48        Ok(result)
49    }
50
51    pub fn deserialize_from_xfer(serialized: &[u8]) -> anyhow::Result<Self> {
52        let newline = memchr::memchr(b'\n', serialized)
53            .ok_or_else(|| anyhow::anyhow!("invalid xfer payload"))?;
54
55        let (meta_json, data) = serialized.split_at(newline);
56
57        let meta: MetaDataV1 = serde_json::from_slice(&meta_json)?;
58        // 1... because split_at includes the newline at the startgg
59        let payload: Box<[u8]> = data[1..].to_vec().into_boxed_slice();
60
61        let metadata = crate::message::MetaData {
62            sender: meta.sender,
63            recipient: meta.recipient,
64            meta: meta.meta,
65            schedule: None,
66        };
67
68        // Create a new id with *this* nodes mac but the source
69        // node's timestamp.  This should reduce the chances
70        // of a conflict leading to multiple messages alive in
71        // the same process with the same id, while preserving
72        // the timestamp of the source message.
73        // Ideally we would check that the ids are not the same
74        // here and raise an error, but for the sake of testing,
75        // we allow whatever value is produced to be used and
76        // we check that in the tests, and we SHOULD also
77        // check this in the xfer logic and have it
78        let id = meta.id.derive_new_with_cloned_timestamp();
79
80        Ok(Self::new_from_parts(id, metadata, payload.into()))
81    }
82}
83
84#[cfg(test)]
85mod test {
86    use super::*;
87    use crate::message::test::new_msg_body;
88    use serde_json::json;
89
90    #[tokio::test]
91    async fn xfer_serialization() {
92        let msg = new_msg_body("Subject: simple message\r\n\r\nHello\r\n");
93        msg.set_meta("canary", true).unwrap();
94        let serialized = msg
95            .serialize_for_xfer(json!({"additional": "meta"}))
96            .await
97            .unwrap();
98        eprintln!("serialized as: {}", String::from_utf8_lossy(&serialized));
99
100        let round_trip = Message::deserialize_from_xfer(&serialized).unwrap();
101        assert_eq!(round_trip.get_meta("canary").unwrap(), true);
102        assert_eq!(round_trip.get_meta("additional").unwrap(), "meta");
103        eprintln!(
104            "deserialized message:\n{}",
105            String::from_utf8_lossy(&round_trip.get_data())
106        );
107
108        eprintln!("id           ={}", msg.id());
109        eprintln!("round_trip id={}", round_trip.id());
110        assert_ne!(msg.id(), round_trip.id());
111
112        assert_eq!(
113            round_trip
114                .get_first_named_header_value("subject")
115                .unwrap()
116                .unwrap(),
117            "simple message"
118        );
119    }
120}