message/
xfer.rs

1use crate::{EnvelopeAddress, Message};
2use serde::{Deserialize, Serialize};
3use spool::SpoolId;
4
5/// This is a wire format, so changes need to
6/// be appropriately backwards/forwards compatible
7/// or placed into a separate struct with runtime
8/// handling for version mismatches.
9#[derive(Debug, Serialize, Deserialize, Clone)]
10struct MetaDataV1 {
11    id: SpoolId,
12    sender: EnvelopeAddress,
13    recipient: Vec<EnvelopeAddress>,
14    meta: serde_json::Value,
15}
16
17impl Message {
18    pub async fn serialize_for_xfer(
19        &self,
20        additional_meta: serde_json::Value,
21    ) -> anyhow::Result<Vec<u8>> {
22        let id = *self.id();
23        let data = self.data().await?;
24        let mut meta = self.clone_meta_data().await?;
25
26        if let serde_json::Value::Object(src) = additional_meta {
27            if let Some(obj) = meta.meta.as_object_mut() {
28                for (k, v) in src {
29                    obj.insert(k, v);
30                }
31            }
32        }
33
34        let meta = MetaDataV1 {
35            id,
36            sender: meta.sender,
37            recipient: meta.recipient,
38            meta: meta.meta,
39        };
40
41        let serialized_meta = serde_json::to_string(&meta)?;
42
43        let mut result: Vec<u8> = serialized_meta.into();
44        result.push(b'\n');
45        result.extend_from_slice(&data);
46
47        Ok(result)
48    }
49
50    pub fn deserialize_from_xfer(serialized: &[u8]) -> anyhow::Result<Self> {
51        let newline = memchr::memchr(b'\n', serialized)
52            .ok_or_else(|| anyhow::anyhow!("invalid xfer payload"))?;
53
54        let (meta_json, data) = serialized.split_at(newline);
55
56        let meta: MetaDataV1 = serde_json::from_slice(&meta_json)?;
57        // 1... because split_at includes the newline at the startgg
58        let payload: Box<[u8]> = data[1..].to_vec().into_boxed_slice();
59
60        let metadata = crate::message::MetaData {
61            sender: meta.sender,
62            recipient: meta.recipient,
63            meta: meta.meta,
64            schedule: None,
65        };
66
67        // Create a new id with *this* nodes mac but the source
68        // node's timestamp.  This should reduce the chances
69        // of a conflict leading to multiple messages alive in
70        // the same process with the same id, while preserving
71        // the timestamp of the source message.
72        // Ideally we would check that the ids are not the same
73        // here and raise an error, but for the sake of testing,
74        // we allow whatever value is produced to be used and
75        // we check that in the tests, and we SHOULD also
76        // check this in the xfer logic and have it
77        let id = meta.id.derive_new_with_cloned_timestamp();
78
79        Ok(Self::new_from_parts(id, metadata, payload.into()))
80    }
81}
82
83#[cfg(test)]
84mod test {
85    use super::*;
86    use crate::message::test::new_msg_body;
87    use serde_json::json;
88
89    #[tokio::test]
90    async fn xfer_serialization() {
91        let msg = new_msg_body("Subject: simple message\r\n\r\nHello\r\n");
92        msg.set_meta("canary", true).await.unwrap();
93        let serialized = msg
94            .serialize_for_xfer(json!({"additional": "meta"}))
95            .await
96            .unwrap();
97        eprintln!("serialized as: {}", String::from_utf8_lossy(&serialized));
98
99        let round_trip = Message::deserialize_from_xfer(&serialized).unwrap();
100        assert_eq!(round_trip.get_meta("canary").await.unwrap(), true);
101        assert_eq!(round_trip.get_meta("additional").await.unwrap(), "meta");
102        eprintln!(
103            "deserialized message:\n{}",
104            String::from_utf8_lossy(&round_trip.data().await.unwrap())
105        );
106
107        eprintln!("id           ={}", msg.id());
108        eprintln!("round_trip id={}", round_trip.id());
109        assert_ne!(msg.id(), round_trip.id());
110
111        assert_eq!(
112            round_trip
113                .get_first_named_header_value("subject")
114                .await
115                .unwrap()
116                .unwrap(),
117            "simple message"
118        );
119    }
120}