1use crate::Message;
2use rfc5321::parser::EnvelopeAddress;
3use serde::{Deserialize, Serialize};
4use spool::SpoolId;
5
6#[derive(Debug, Serialize, Deserialize, Clone)]
11struct MetaDataV1 {
12 id: SpoolId,
13 sender: EnvelopeAddress,
14 recipient: Vec<EnvelopeAddress>,
15 meta: serde_json::Value,
16}
17
18impl Message {
19 pub async fn serialize_for_xfer(
20 &self,
21 additional_meta: serde_json::Value,
22 ) -> anyhow::Result<Vec<u8>> {
23 let id = *self.id();
24 let data = self.data().await?;
25 let mut meta = self.clone_meta_data().await?;
26
27 if let serde_json::Value::Object(src) = additional_meta {
28 if let Some(obj) = meta.meta.as_object_mut() {
29 for (k, v) in src {
30 obj.insert(k, v);
31 }
32 }
33 }
34
35 let meta = MetaDataV1 {
36 id,
37 sender: meta.sender,
38 recipient: meta.recipient,
39 meta: meta.meta,
40 };
41
42 let serialized_meta = serde_json::to_string(&meta)?;
43
44 let mut result: Vec<u8> = serialized_meta.into();
45 result.push(b'\n');
46 result.extend_from_slice(&data);
47
48 Ok(result)
49 }
50
51 pub fn deserialize_from_xfer(serialized: &[u8]) -> anyhow::Result<Self> {
52 let newline = memchr::memchr(b'\n', serialized)
53 .ok_or_else(|| anyhow::anyhow!("invalid xfer payload"))?;
54
55 let (meta_json, data) = serialized.split_at(newline);
56
57 let meta: MetaDataV1 = serde_json::from_slice(&meta_json)?;
58 let payload: Box<[u8]> = data[1..].to_vec().into_boxed_slice();
60
61 let metadata = crate::message::MetaData {
62 sender: meta.sender,
63 recipient: meta.recipient,
64 meta: meta.meta,
65 schedule: None,
66 };
67
68 let id = meta.id.derive_new_with_cloned_timestamp();
79
80 Ok(Self::new_from_parts(id, metadata, payload.into()))
81 }
82}
83
84#[cfg(test)]
85mod test {
86 use super::*;
87 use crate::message::test::new_msg_body;
88 use serde_json::json;
89
90 #[tokio::test]
91 async fn xfer_serialization() {
92 let msg = new_msg_body("Subject: simple message\r\n\r\nHello\r\n");
93 msg.set_meta("canary", true).await.unwrap();
94 let serialized = msg
95 .serialize_for_xfer(json!({"additional": "meta"}))
96 .await
97 .unwrap();
98 eprintln!("serialized as: {}", String::from_utf8_lossy(&serialized));
99
100 let round_trip = Message::deserialize_from_xfer(&serialized).unwrap();
101 assert_eq!(round_trip.get_meta("canary").await.unwrap(), true);
102 assert_eq!(round_trip.get_meta("additional").await.unwrap(), "meta");
103 eprintln!(
104 "deserialized message:\n{}",
105 String::from_utf8_lossy(&round_trip.data().await.unwrap())
106 );
107
108 eprintln!("id ={}", msg.id());
109 eprintln!("round_trip id={}", round_trip.id());
110 assert_ne!(msg.id(), round_trip.id());
111
112 assert_eq!(
113 round_trip
114 .get_first_named_header_value("subject")
115 .await
116 .unwrap()
117 .unwrap(),
118 "simple message"
119 );
120 }
121}