message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16use kumo_log_types::rfc3464::Report;
17use kumo_log_types::rfc5965::ARFReport;
18#[cfg(feature = "impl")]
19use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
20use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
21#[cfg(feature = "impl")]
22use mlua::{LuaSerdeExt, UserData, UserDataMethods};
23use parking_lot::Mutex;
24use prometheus::{Histogram, IntGauge};
25use serde::{Deserialize, Serialize};
26use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
27use std::hash::Hash;
28use std::sync::{Arc, LazyLock, Weak};
29use std::time::{Duration, Instant};
30use timeq::TimerEntryWithDelay;
31
32bitflags::bitflags! {
33    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
34    struct MessageFlags: u8 {
35        /// true if Metadata needs to be saved
36        const META_DIRTY = 1;
37        /// true if Data needs to be saved
38        const DATA_DIRTY = 2;
39        /// true if scheduling restrictions are present in the metadata
40        const SCHEDULED = 4;
41        /// true if high durability writes should always be used
42        const FORCE_SYNC = 8;
43    }
44}
45
46static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
47    prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
48});
49static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
50    prometheus::register_int_gauge!(
51        "message_meta_resident_count",
52        "total number of Message objects with metadata loaded"
53    )
54    .unwrap()
55});
56static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
57    prometheus::register_int_gauge!(
58        "message_data_resident_count",
59        "total number of Message objects with body data loaded"
60    )
61    .unwrap()
62});
63static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
64static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
65    prometheus::register_histogram!(
66        "message_save_latency",
67        "how long it takes to save a message to spool"
68    )
69    .unwrap()
70});
71static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
72    prometheus::register_histogram!(
73        "message_data_load_latency",
74        "how long it takes to load message data from spool"
75    )
76    .unwrap()
77});
78static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
79    prometheus::register_histogram!(
80        "message_meta_load_latency",
81        "how long it takes to load message metadata from spool"
82    )
83    .unwrap()
84});
85
86#[derive(Debug)]
87struct MessageInner {
88    metadata: Option<Box<MetaData>>,
89    data: Arc<Box<[u8]>>,
90    flags: MessageFlags,
91    num_attempts: u16,
92    due: Option<DateTime<Utc>>,
93}
94
95#[derive(Debug)]
96pub(crate) struct MessageWithId {
97    id: SpoolId,
98    inner: Mutex<MessageInner>,
99    link: LinkedListAtomicLink,
100}
101
102intrusive_adapter!(
103    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
104);
105
106/// A list of messages with an O(1) list overhead; no additional
107/// memory per-message is required to track this list.
108/// However, a given Message can only belong to one instance
109/// of such a list at a time.
110pub struct MessageList {
111    list: LinkedList<MessageWithIdAdapter>,
112    len: usize,
113}
114
115impl Default for MessageList {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl MessageList {
122    /// Create a new MessageList
123    pub fn new() -> Self {
124        Self {
125            list: LinkedList::new(MessageWithIdAdapter::default()),
126            len: 0,
127        }
128    }
129
130    /// Returns the number of elements contained in the list
131    pub fn len(&self) -> usize {
132        self.len
133    }
134
135    pub fn is_empty(&self) -> bool {
136        self.len == 0
137    }
138
139    /// Take all of the elements from this list and return them
140    /// in a new separate instance of MessageList.
141    pub fn take(&mut self) -> Self {
142        let new_list = Self {
143            list: self.list.take(),
144            len: self.len,
145        };
146        self.len = 0;
147        new_list
148    }
149
150    /// Push a message to the back of the list
151    pub fn push_back(&mut self, message: Message) {
152        self.list.push_back(message.msg_and_id);
153        self.len += 1;
154    }
155
156    /// Pop a message from the front of the list
157    pub fn pop_front(&mut self) -> Option<Message> {
158        self.list.pop_front().map(|msg_and_id| {
159            self.len -= 1;
160            Message { msg_and_id }
161        })
162    }
163
164    /// Pop a message from the back of the list
165    pub fn pop_back(&mut self) -> Option<Message> {
166        self.list.pop_back().map(|msg_and_id| {
167            self.len -= 1;
168            Message { msg_and_id }
169        })
170    }
171
172    /// Pop all of the messages from this list into a vector
173    /// of messages.
174    /// Memory usage is O(number-of-messages).
175    pub fn drain(&mut self) -> Vec<Message> {
176        let mut messages = Vec::with_capacity(self.len);
177        while let Some(msg) = self.pop_front() {
178            messages.push(msg);
179        }
180        messages
181    }
182
183    pub fn extend_from_iter<I>(&mut self, iter: I)
184    where
185        I: Iterator<Item = Message>,
186    {
187        for msg in iter {
188            self.push_back(msg)
189        }
190    }
191}
192
193impl IntoIterator for MessageList {
194    type Item = Message;
195    type IntoIter = MessageListIter;
196    fn into_iter(self) -> MessageListIter {
197        MessageListIter { list: self.list }
198    }
199}
200
201pub struct MessageListIter {
202    list: LinkedList<MessageWithIdAdapter>,
203}
204
205impl Iterator for MessageListIter {
206    type Item = Message;
207    fn next(&mut self) -> Option<Message> {
208        self.list
209            .pop_front()
210            .map(|msg_and_id| Message { msg_and_id })
211    }
212}
213
214#[derive(Clone, Debug)]
215#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
216pub struct Message {
217    pub(crate) msg_and_id: Arc<MessageWithId>,
218}
219
220impl PartialEq for Message {
221    fn eq(&self, other: &Self) -> bool {
222        self.id() == other.id()
223    }
224}
225impl Eq for Message {}
226
227impl Hash for Message {
228    fn hash<H>(&self, hasher: &mut H)
229    where
230        H: std::hash::Hasher,
231    {
232        self.id().hash(hasher)
233    }
234}
235
236#[derive(Clone, Debug)]
237pub struct WeakMessage {
238    weak: Weak<MessageWithId>,
239}
240
241impl WeakMessage {
242    pub fn upgrade(&self) -> Option<Message> {
243        Some(Message {
244            msg_and_id: self.weak.upgrade()?,
245        })
246    }
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250struct MetaData {
251    sender: EnvelopeAddress,
252    recipient: EnvelopeAddress,
253    meta: serde_json::Value,
254    #[serde(default)]
255    schedule: Option<Scheduling>,
256}
257
258impl Drop for MessageInner {
259    fn drop(&mut self) {
260        if self.metadata.is_some() {
261            META_COUNT.dec();
262        }
263        if !self.data.is_empty() {
264            DATA_COUNT.dec();
265        }
266        MESSAGE_COUNT.dec();
267    }
268}
269
270impl Message {
271    /// Create a new message with the supplied data.
272    /// The message meta and data are marked as dirty
273    pub fn new_dirty(
274        id: SpoolId,
275        sender: EnvelopeAddress,
276        recipient: EnvelopeAddress,
277        meta: serde_json::Value,
278        data: Arc<Box<[u8]>>,
279    ) -> anyhow::Result<Self> {
280        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
281        MESSAGE_COUNT.inc();
282        DATA_COUNT.inc();
283        META_COUNT.inc();
284        Ok(Self {
285            msg_and_id: Arc::new(MessageWithId {
286                id,
287                inner: Mutex::new(MessageInner {
288                    metadata: Some(Box::new(MetaData {
289                        sender,
290                        recipient,
291                        meta,
292                        schedule: None,
293                    })),
294                    data,
295                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
296                    num_attempts: 0,
297                    due: None,
298                }),
299                link: LinkedListAtomicLink::default(),
300            }),
301        })
302    }
303
304    pub fn weak(&self) -> WeakMessage {
305        WeakMessage {
306            weak: Arc::downgrade(&self.msg_and_id),
307        }
308    }
309
310    /// Helper for creating a message based on spool enumeration.
311    /// Given a spool id and the serialized metadata blob, returns
312    /// a message holding the deserialized version of that metadata.
313    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
314        let metadata: MetaData = serde_json::from_slice(&metadata)?;
315        MESSAGE_COUNT.inc();
316        META_COUNT.inc();
317
318        let flags = if metadata.schedule.is_some() {
319            MessageFlags::SCHEDULED
320        } else {
321            MessageFlags::empty()
322        };
323
324        Ok(Self {
325            msg_and_id: Arc::new(MessageWithId {
326                id,
327                inner: Mutex::new(MessageInner {
328                    metadata: Some(Box::new(metadata)),
329                    data: NO_DATA.clone(),
330                    flags,
331                    num_attempts: 0,
332                    due: None,
333                }),
334                link: LinkedListAtomicLink::default(),
335            }),
336        })
337    }
338
339    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
340        let meta_spool = get_meta_spool();
341        let data = meta_spool.load(id).await?;
342        Self::new_from_spool(id, data)
343    }
344
345    pub fn get_num_attempts(&self) -> u16 {
346        let inner = self.msg_and_id.inner.lock();
347        inner.num_attempts
348    }
349
350    pub fn set_num_attempts(&self, num_attempts: u16) {
351        let mut inner = self.msg_and_id.inner.lock();
352        inner.num_attempts = num_attempts;
353    }
354
355    pub fn increment_num_attempts(&self) {
356        let mut inner = self.msg_and_id.inner.lock();
357        inner.num_attempts += 1;
358    }
359
360    pub fn set_scheduling(
361        &self,
362        scheduling: Option<Scheduling>,
363    ) -> anyhow::Result<Option<Scheduling>> {
364        let mut inner = self.msg_and_id.inner.lock();
365        match &mut inner.metadata {
366            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
367            Some(meta) => {
368                meta.schedule = scheduling;
369                inner
370                    .flags
371                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
372                if let Some(sched) = scheduling {
373                    let due = inner.due.unwrap_or_else(Utc::now);
374                    inner.due = Some(sched.adjust_for_schedule(due));
375                }
376                Ok(scheduling)
377            }
378        }
379    }
380
381    pub fn get_scheduling(&self) -> Option<Scheduling> {
382        let inner = self.msg_and_id.inner.lock();
383        inner.metadata.as_ref().and_then(|meta| meta.schedule)
384    }
385
386    pub fn get_due(&self) -> Option<DateTime<Utc>> {
387        let inner = self.msg_and_id.inner.lock();
388        inner.due
389    }
390
391    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
392        let scale = rand::random::<f32>();
393        let value = (scale * limit as f32) as i64;
394        self.delay_by(seconds(value)?).await
395    }
396
397    pub async fn delay_by(
398        &self,
399        duration: chrono::Duration,
400    ) -> anyhow::Result<Option<DateTime<Utc>>> {
401        let due = Utc::now() + duration;
402        self.set_due(Some(due)).await
403    }
404
405    /// Delay by requested duration, and add up to 1 minute of jitter
406    pub async fn delay_by_and_jitter(
407        &self,
408        duration: chrono::Duration,
409    ) -> anyhow::Result<Option<DateTime<Utc>>> {
410        let scale = rand::random::<f32>();
411        let value = (scale * 60.) as i64;
412        let due = Utc::now() + duration + seconds(value)?;
413        self.set_due(Some(due)).await
414    }
415
416    pub async fn set_due(
417        &self,
418        due: Option<DateTime<Utc>>,
419    ) -> anyhow::Result<Option<DateTime<Utc>>> {
420        let due = {
421            let mut inner = self.msg_and_id.inner.lock();
422
423            if !inner.flags.contains(MessageFlags::SCHEDULED) {
424                // This is the simple, fast-path, common case
425                inner.due = due;
426                return Ok(inner.due);
427            }
428
429            let due = due.unwrap_or_else(Utc::now);
430
431            if let Some(meta) = &inner.metadata {
432                inner.due = match &meta.schedule {
433                    Some(sched) => Some(sched.adjust_for_schedule(due)),
434                    None => Some(due),
435                };
436                return Ok(inner.due);
437            }
438
439            // We'll need to load the metadata to correctly
440            // update the schedule for this message
441            due
442        };
443
444        self.load_meta().await?;
445
446        {
447            let mut inner = self.msg_and_id.inner.lock();
448            match &inner.metadata {
449                Some(meta) => {
450                    inner.due = match &meta.schedule {
451                        Some(sched) => Some(sched.adjust_for_schedule(due)),
452                        None => Some(due),
453                    };
454                    Ok(inner.due)
455                }
456                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
457            }
458        }
459    }
460
461    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
462        let inner = self.msg_and_id.inner.lock();
463        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
464            Some(Arc::clone(&inner.data))
465        } else {
466            None
467        }
468    }
469
470    fn get_meta_if_dirty(&self) -> Option<MetaData> {
471        let inner = self.msg_and_id.inner.lock();
472        if inner.flags.contains(MessageFlags::META_DIRTY) {
473            inner.metadata.as_ref().map(|md| (**md).clone())
474        } else {
475            None
476        }
477    }
478
479    pub fn set_force_sync(&self, force: bool) {
480        let mut inner = self.msg_and_id.inner.lock();
481        inner.flags.set(MessageFlags::FORCE_SYNC, force);
482    }
483
484    pub fn needs_save(&self) -> bool {
485        let inner = self.msg_and_id.inner.lock();
486        inner.flags.contains(MessageFlags::META_DIRTY)
487            || inner.flags.contains(MessageFlags::DATA_DIRTY)
488    }
489
490    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
491        let _timer = SAVE_HIST.start_timer();
492        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
493            .await
494    }
495
496    pub async fn save_to(
497        &self,
498        meta_spool: &(dyn Spool + Send + Sync),
499        data_spool: &(dyn Spool + Send + Sync),
500        deadline: Option<Instant>,
501    ) -> anyhow::Result<()> {
502        let force_sync = self
503            .msg_and_id
504            .inner
505            .lock()
506            .flags
507            .contains(MessageFlags::FORCE_SYNC);
508
509        let data_fut = if let Some(data) = self.get_data_if_dirty() {
510            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
511            data_spool
512                .store(self.msg_and_id.id, data, force_sync, deadline)
513                .map(|_| true)
514                .boxed()
515        } else {
516            futures::future::ready(false).boxed()
517        };
518        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
519            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
520            meta_spool
521                .store(self.msg_and_id.id, meta, force_sync, deadline)
522                .map(|_| true)
523                .boxed()
524        } else {
525            futures::future::ready(false).boxed()
526        };
527
528        // NOTE: if we have a deadline, it is tempting to want to use
529        // timeout_at here to enforce it, but the underlying spool
530        // futures are not guaranteed to be fully cancel safe, which
531        // is why we pass the deadline down to the save method to allow
532        // them to handle timeouts internally.
533        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
534
535        if data_res {
536            self.msg_and_id
537                .inner
538                .lock()
539                .flags
540                .remove(MessageFlags::DATA_DIRTY);
541        }
542        if meta_res {
543            self.msg_and_id
544                .inner
545                .lock()
546                .flags
547                .remove(MessageFlags::META_DIRTY);
548        }
549        Ok(())
550    }
551
552    pub fn id(&self) -> &SpoolId {
553        &self.msg_and_id.id
554    }
555
556    /// Save the data+meta if needed, then release both
557    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
558        self.save(None).await?;
559        self.shrink()
560    }
561
562    /// Save the data+meta if needed, then release just the data
563    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
564        self.save(None).await?;
565        self.shrink_data()
566    }
567
568    pub fn shrink_data(&self) -> anyhow::Result<bool> {
569        let mut inner = self.msg_and_id.inner.lock();
570        let mut did_shrink = false;
571        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
572            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
573        }
574        if !inner.data.is_empty() {
575            DATA_COUNT.dec();
576            did_shrink = true;
577        }
578        if !inner.data.is_empty() {
579            inner.data = NO_DATA.clone();
580            did_shrink = true;
581        }
582        Ok(did_shrink)
583    }
584
585    pub fn shrink(&self) -> anyhow::Result<bool> {
586        let mut inner = self.msg_and_id.inner.lock();
587        let mut did_shrink = false;
588        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
589            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
590        }
591        if inner.flags.contains(MessageFlags::META_DIRTY) {
592            anyhow::bail!("Cannot shrink message: META_DIRTY");
593        }
594        if inner.metadata.take().is_some() {
595            META_COUNT.dec();
596            did_shrink = true;
597        }
598        if !inner.data.is_empty() {
599            DATA_COUNT.dec();
600            did_shrink = true;
601        }
602        if !inner.data.is_empty() {
603            inner.data = NO_DATA.clone();
604            did_shrink = true;
605        }
606        Ok(did_shrink)
607    }
608
609    pub fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
610        let inner = self.msg_and_id.inner.lock();
611        match &inner.metadata {
612            Some(meta) => Ok(meta.sender.clone()),
613            None => anyhow::bail!("metadata is not loaded"),
614        }
615    }
616
617    pub fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
618        let mut inner = self.msg_and_id.inner.lock();
619        match &mut inner.metadata {
620            Some(meta) => {
621                meta.sender = sender;
622                inner.flags.set(MessageFlags::DATA_DIRTY, true);
623                Ok(())
624            }
625            None => anyhow::bail!("metadata is not loaded"),
626        }
627    }
628
629    pub fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
630        let inner = self.msg_and_id.inner.lock();
631        match &inner.metadata {
632            Some(meta) => Ok(meta.recipient.clone()),
633            None => anyhow::bail!("metadata is not loaded"),
634        }
635    }
636
637    pub fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
638        let mut inner = self.msg_and_id.inner.lock();
639        match &mut inner.metadata {
640            Some(meta) => {
641                meta.recipient = recipient;
642                inner.flags.set(MessageFlags::DATA_DIRTY, true);
643                Ok(())
644            }
645            None => anyhow::bail!("metadata is not loaded"),
646        }
647    }
648
649    pub fn is_meta_loaded(&self) -> bool {
650        self.msg_and_id.inner.lock().metadata.is_some()
651    }
652
653    pub fn is_data_loaded(&self) -> bool {
654        !self.msg_and_id.inner.lock().data.is_empty()
655    }
656
657    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
658        if self.is_meta_loaded() {
659            return Ok(());
660        }
661        self.load_meta().await
662    }
663
664    pub async fn load_data_if_needed(&self) -> anyhow::Result<()> {
665        if self.is_data_loaded() {
666            return Ok(());
667        }
668        self.load_data().await
669    }
670
671    pub async fn load_meta(&self) -> anyhow::Result<()> {
672        let _timer = LOAD_META_HIST.start_timer();
673        self.load_meta_from(&**get_meta_spool()).await
674    }
675
676    pub async fn load_meta_from(
677        &self,
678        meta_spool: &(dyn Spool + Send + Sync),
679    ) -> anyhow::Result<()> {
680        let id = self.id();
681        let data = meta_spool.load(*id).await?;
682        let mut inner = self.msg_and_id.inner.lock();
683        let was_not_loaded = inner.metadata.is_none();
684        let metadata: MetaData = serde_json::from_slice(&data)?;
685        inner.metadata.replace(Box::new(metadata));
686        if was_not_loaded {
687            META_COUNT.inc();
688        }
689        Ok(())
690    }
691
692    pub async fn load_data(&self) -> anyhow::Result<()> {
693        let _timer = LOAD_DATA_HIST.start_timer();
694        self.load_data_from(&**get_data_spool()).await
695    }
696
697    pub async fn load_data_from(
698        &self,
699        data_spool: &(dyn Spool + Send + Sync),
700    ) -> anyhow::Result<()> {
701        let data = data_spool.load(*self.id()).await?;
702        let mut inner = self.msg_and_id.inner.lock();
703        let was_empty = inner.data.is_empty();
704        inner.data = Arc::new(data.into_boxed_slice());
705        if was_empty {
706            DATA_COUNT.inc();
707        }
708        Ok(())
709    }
710
711    pub fn assign_data(&self, data: Vec<u8>) {
712        let mut inner = self.msg_and_id.inner.lock();
713        let was_empty = inner.data.is_empty();
714        inner.data = Arc::new(data.into_boxed_slice());
715        inner.flags.set(MessageFlags::DATA_DIRTY, true);
716        if was_empty {
717            DATA_COUNT.inc();
718        }
719    }
720
721    pub fn get_data(&self) -> Arc<Box<[u8]>> {
722        let inner = self.msg_and_id.inner.lock();
723        inner.data.clone()
724    }
725
726    pub fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
727        &self,
728        key: S,
729        value: V,
730    ) -> anyhow::Result<()> {
731        let mut inner = self.msg_and_id.inner.lock();
732        match &mut inner.metadata {
733            None => anyhow::bail!("set_meta: metadata must be loaded first"),
734            Some(meta) => {
735                let key = key.as_ref();
736                let value = value.into();
737
738                match &mut meta.meta {
739                    serde_json::Value::Object(map) => {
740                        map.insert(key.to_string(), value);
741                    }
742                    _ => anyhow::bail!("metadata is somehow not a json object"),
743                }
744
745                inner.flags.set(MessageFlags::META_DIRTY, true);
746                Ok(())
747            }
748        }
749    }
750
751    /// Retrieve `key` as a String.
752    pub fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
753        &self,
754        key: S,
755    ) -> anyhow::Result<Option<String>> {
756        match self.get_meta(key) {
757            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
758            Ok(serde_json::Value::Null) => Ok(None),
759            hmm => {
760                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
761            }
762        }
763    }
764
765    pub fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
766        let inner = self.msg_and_id.inner.lock();
767        match &inner.metadata {
768            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
769            Some(meta) => Ok(meta.meta.clone()),
770        }
771    }
772
773    pub fn get_meta<S: serde_json::value::Index>(
774        &self,
775        key: S,
776    ) -> anyhow::Result<serde_json::Value> {
777        let inner = self.msg_and_id.inner.lock();
778        match &inner.metadata {
779            None => anyhow::bail!("get_meta: metadata must be loaded first"),
780            Some(meta) => match meta.meta.get(key) {
781                Some(value) => Ok(value.clone()),
782                None => Ok(serde_json::Value::Null),
783            },
784        }
785    }
786
787    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
788        self.msg_and_id.id.age(now)
789    }
790
791    pub fn get_queue_name(&self) -> anyhow::Result<String> {
792        Ok(match self.get_meta_string("queue")? {
793            Some(name) => name,
794            None => {
795                let name = QueueNameComponents::format(
796                    self.get_meta_string("campaign")?,
797                    self.get_meta_string("tenant")?,
798                    self.recipient()?.domain().to_string().to_lowercase(),
799                    self.get_meta_string("routing_domain")?,
800                );
801                name.to_string()
802            }
803        })
804    }
805
806    #[cfg(feature = "impl")]
807    pub async fn dkim_verify(&self) -> anyhow::Result<Vec<AuthenticationResult>> {
808        let resolver = dns_resolver::get_resolver();
809        let data = self.get_data();
810        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
811
812        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
813        if parsed
814            .overall_conformance
815            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
816        {
817            return Ok(vec![AuthenticationResult {
818                method: "dkim".to_string(),
819                method_version: None,
820                result: "permerror".to_string(),
821                reason: Some("message has non-canonical line endings".to_string()),
822                props: Default::default(),
823            }]);
824        }
825        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
826
827        let from = message
828            .get_headers()
829            .from()
830            .map_err(any_err)?
831            .ok_or_else(|| anyhow::anyhow!("Missing or invalid From header"))?
832            .0;
833        if from.len() != 1 {
834            anyhow::bail!(
835                "From header must have a single sender, found {}",
836                from.len()
837            );
838        }
839        let from_domain = &from[0].address.domain;
840
841        let results =
842            kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
843        Ok(results)
844    }
845
846    /// Parses the content into an owned MimePart.
847    /// Changes to that MimePart are NOT reflected in the underlying
848    /// message; you must re-assign the message data if you wish to modify
849    /// the message content.
850    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
851        self.load_data_if_needed().await?;
852        let data = self.get_data();
853        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
854        Ok(MimePart::parse(owned_data)?)
855    }
856
857    pub fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
858        let data = self.get_data();
859        Report::parse(&data)
860    }
861
862    pub fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
863        let data = self.get_data();
864        ARFReport::parse(&data)
865    }
866
867    pub fn prepend_header(&self, name: Option<&str>, value: &str) {
868        let data = self.get_data();
869        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
870        emit_header(&mut new_data, name, value);
871        new_data.extend_from_slice(&data);
872        self.assign_data(new_data);
873    }
874
875    pub fn append_header(&self, name: Option<&str>, value: &str) {
876        let data = self.get_data();
877        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
878        for (idx, window) in data.windows(4).enumerate() {
879            if window == b"\r\n\r\n" {
880                let headers = &data[0..idx + 2];
881                let body = &data[idx + 2..];
882
883                new_data.extend_from_slice(headers);
884                emit_header(&mut new_data, name, value);
885                new_data.extend_from_slice(body);
886                self.assign_data(new_data);
887                return;
888            }
889        }
890    }
891
892    pub fn get_address_header(
893        &self,
894        header_name: &str,
895    ) -> anyhow::Result<Option<HeaderAddressList>> {
896        let data = self.get_data();
897        let HeaderParseResult { headers, .. } =
898            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
899
900        match headers.get_first(header_name) {
901            Some(hdr) => {
902                let list = hdr.as_address_list()?;
903                let result: HeaderAddressList = list.into();
904                Ok(Some(result))
905            }
906            None => Ok(None),
907        }
908    }
909
910    pub fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
911        let data = self.get_data();
912        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
913
914        match headers.get_first(name) {
915            Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
916            None => Ok(None),
917        }
918    }
919
920    pub fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
921        let data = self.get_data();
922        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
923
924        let mut values = vec![];
925        for hdr in headers.iter_named(name) {
926            values.push(hdr.as_unstructured()?);
927        }
928        Ok(values)
929    }
930
931    pub fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
932        let data = self.get_data();
933        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
934
935        let mut values = vec![];
936        for hdr in headers.iter() {
937            values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
938        }
939        Ok(values)
940    }
941
942    pub fn retain_headers<F: FnMut(&Header) -> bool>(&self, mut func: F) -> anyhow::Result<()> {
943        let data = self.get_data();
944        let mut new_data = Vec::with_capacity(data.len());
945        let HeaderParseResult {
946            headers,
947            body_offset,
948            ..
949        } = Header::parse_headers(data.as_ref().as_ref())?;
950        for hdr in headers.iter() {
951            let retain = (func)(hdr);
952            if !retain {
953                continue;
954            }
955            hdr.write_header(&mut new_data)?;
956        }
957        new_data.extend_from_slice(b"\r\n");
958        new_data.extend_from_slice(&data[body_offset..]);
959        self.assign_data(new_data);
960        Ok(())
961    }
962
963    pub fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
964        let mut removed = false;
965        self.retain_headers(|hdr| {
966            if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
967                removed = true;
968                false
969            } else {
970                true
971            }
972        })
973    }
974
975    pub fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
976        let data = self.get_data();
977        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
978
979        for hdr in headers.iter() {
980            let do_import = if names.is_empty() {
981                is_x_header(hdr.get_name())
982            } else {
983                is_header_in_names_list(hdr.get_name(), &names)
984            };
985            if do_import {
986                let name = imported_header_name(hdr.get_name());
987                self.set_meta(name, hdr.as_unstructured()?)?;
988            }
989        }
990
991        Ok(())
992    }
993
994    pub fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
995        self.retain_headers(|hdr| {
996            if names.is_empty() {
997                !is_x_header(hdr.get_name())
998            } else {
999                !is_header_in_names_list(hdr.get_name(), &names)
1000            }
1001        })
1002    }
1003
1004    pub fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1005        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1006    }
1007
1008    #[cfg(feature = "impl")]
1009    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1010        if let Some(runtime) = SIGN_POOL.get() {
1011            let msg = self.clone();
1012            runtime
1013                .spawn_blocking(move || {
1014                    let data = msg.get_data();
1015                    let header = signer.sign(&data)?;
1016                    msg.prepend_header(None, &header);
1017                    Ok::<(), anyhow::Error>(())
1018                })
1019                .await??;
1020        } else {
1021            let data = self.get_data();
1022            let header = signer.sign(&data)?;
1023            self.prepend_header(None, &header);
1024        }
1025        Ok(())
1026    }
1027
1028    pub fn import_scheduling_header(
1029        &self,
1030        header_name: &str,
1031        remove: bool,
1032    ) -> anyhow::Result<Option<Scheduling>> {
1033        if let Some(value) = self.get_first_named_header_value(header_name)? {
1034            let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1035                format!("{value} from header {header_name} is not a valid Scheduling header")
1036            })?;
1037            let result = self.set_scheduling(Some(sched))?;
1038
1039            if remove {
1040                self.remove_all_named_headers(header_name)?;
1041            }
1042
1043            Ok(result)
1044        } else {
1045            Ok(None)
1046        }
1047    }
1048
1049    pub fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1050        let data = self.get_data();
1051        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1052        let parts = msg.simplified_structure_pointers()?;
1053        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1054            match p.body()? {
1055                DecodedBody::Text(text) => {
1056                    let mut text = text.as_str().to_string();
1057                    text.push_str("\r\n");
1058                    text.push_str(content);
1059                    p.replace_text_body("text/plain", &text)?;
1060
1061                    let new_data = msg.to_message_string();
1062                    self.assign_data(new_data.into_bytes());
1063                    Ok(true)
1064                }
1065                DecodedBody::Binary(_) => {
1066                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1067                }
1068            }
1069        } else {
1070            Ok(false)
1071        }
1072    }
1073
1074    pub fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1075        let data = self.get_data();
1076        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1077        let parts = msg.simplified_structure_pointers()?;
1078        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1079            match p.body()? {
1080                DecodedBody::Text(text) => {
1081                    let mut text = text.as_str().to_string();
1082
1083                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1084                        Some(idx) => {
1085                            text.insert_str(idx, content);
1086                            text.insert_str(idx, "\r\n");
1087                        }
1088                        None => {
1089                            // Just append
1090                            text.push_str("\r\n");
1091                            text.push_str(content);
1092                        }
1093                    }
1094
1095                    p.replace_text_body("text/html", &text)?;
1096
1097                    let new_data = msg.to_message_string();
1098                    self.assign_data(new_data.into_bytes());
1099                    Ok(true)
1100                }
1101                DecodedBody::Binary(_) => {
1102                    anyhow::bail!("expected text/html part to be text, but it is binary");
1103                }
1104            }
1105        } else {
1106            Ok(false)
1107        }
1108    }
1109
1110    pub fn check_fix_conformance(
1111        &self,
1112        check: MessageConformance,
1113        fix: MessageConformance,
1114    ) -> anyhow::Result<()> {
1115        let data = self.get_data();
1116        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1117
1118        let conformance = msg.conformance();
1119
1120        // Don't raise errors for things that we're going to fix anyway
1121        let check = check - fix;
1122
1123        if check.intersects(conformance) {
1124            let problems = check.intersection(conformance).to_string();
1125            anyhow::bail!("Message has conformance issues: {problems}");
1126        }
1127
1128        if fix.intersects(conformance) {
1129            let to_fix = fix.intersection(conformance);
1130            let problems = to_fix.to_string();
1131
1132            let missing_headers_only = to_fix
1133                .difference(
1134                    MessageConformance::MISSING_DATE_HEADER
1135                        | MessageConformance::MISSING_MIME_VERSION
1136                        | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1137                )
1138                .is_empty();
1139
1140            if !missing_headers_only {
1141                msg = msg.rebuild().with_context(|| {
1142                    format!("Rebuilding message to correct conformance issues: {problems}")
1143                })?;
1144            }
1145
1146            if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1147                msg.headers_mut().set_date(Utc::now())?;
1148            }
1149
1150            if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1151                msg.headers_mut().set_mime_version("1.0")?;
1152            }
1153
1154            if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1155                let sender = self.sender()?;
1156                let domain = sender.domain();
1157                let id = *self.id();
1158                msg.headers_mut()
1159                    .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")))?;
1160            }
1161
1162            let new_data = msg.to_message_string();
1163            self.assign_data(new_data.into_bytes());
1164        }
1165
1166        Ok(())
1167    }
1168}
1169
1170fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1171    for name in names {
1172        if hdr_name.eq_ignore_ascii_case(name) {
1173            return true;
1174        }
1175    }
1176    false
1177}
1178
1179fn imported_header_name(name: &str) -> String {
1180    name.chars()
1181        .map(|c| match c.to_ascii_lowercase() {
1182            '-' => '_',
1183            c => c,
1184        })
1185        .collect()
1186}
1187
1188fn is_x_header(name: &str) -> bool {
1189    name.starts_with("X-") || name.starts_with("x-")
1190}
1191
1192fn size_header(name: Option<&str>, value: &str) -> usize {
1193    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1194}
1195
1196fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1197    if let Some(name) = name {
1198        dest.extend_from_slice(name.as_bytes());
1199        dest.extend_from_slice(b": ");
1200    }
1201    dest.extend_from_slice(value.as_bytes());
1202    if !value.ends_with("\r\n") {
1203        dest.extend_from_slice(b"\r\n");
1204    }
1205}
1206
1207#[cfg(feature = "impl")]
1208impl UserData for Message {
1209    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1210        methods.add_async_method(
1211            "set_meta",
1212            move |_, this, (name, value): (String, mlua::Value)| async move {
1213                this.load_meta_if_needed().await.map_err(any_err)?;
1214                let value = serde_json::value::to_value(value).map_err(any_err)?;
1215                this.set_meta(name, value).map_err(any_err)?;
1216                Ok(())
1217            },
1218        );
1219        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1220            this.load_meta_if_needed().await.map_err(any_err)?;
1221            let value = this.get_meta(name).map_err(any_err)?;
1222            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1223        });
1224        methods.add_async_method("get_data", move |lua, this, _: ()| async move {
1225            this.load_data_if_needed().await.map_err(any_err)?;
1226            let data = this.get_data();
1227            lua.create_string(&*data)
1228        });
1229        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1230            this.assign_data(data.as_bytes().to_vec());
1231            Ok(())
1232        });
1233
1234        methods.add_async_method("parse_mime", move |_lua, this, _: ()| async move {
1235            this.load_data_if_needed().await.map_err(any_err)?;
1236            let data = this.get_data();
1237            let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1238            let part = MimePart::parse(owned_data).map_err(any_err)?;
1239            Ok(mod_mimepart::PartRef::new(part))
1240        });
1241
1242        methods.add_method("append_text_plain", move |_lua, this, data: String| {
1243            this.append_text_plain(&data).map_err(any_err)
1244        });
1245
1246        methods.add_method("append_text_html", move |_lua, this, data: String| {
1247            this.append_text_html(&data).map_err(any_err)
1248        });
1249
1250        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1251        methods.add_method("sender", move |_, this, _: ()| {
1252            this.sender().map_err(any_err)
1253        });
1254
1255        methods.add_method("num_attempts", move |_, this, _: ()| {
1256            Ok(this.get_num_attempts())
1257        });
1258
1259        methods.add_method("queue_name", move |_, this, _: ()| {
1260            this.get_queue_name().map_err(any_err)
1261        });
1262
1263        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1264            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1265            let revised_due = this.set_due(due).await.map_err(any_err)?;
1266            lua.to_value(&revised_due)
1267        });
1268
1269        methods.add_method("set_sender", move |lua, this, value: mlua::Value| {
1270            let sender = match value {
1271                mlua::Value::String(s) => {
1272                    let s = s.to_str()?;
1273                    EnvelopeAddress::parse(&s).map_err(any_err)?
1274                }
1275                _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1276            };
1277            this.set_sender(sender).map_err(any_err)
1278        });
1279
1280        methods.add_method("recipient", move |_, this, _: ()| {
1281            this.recipient().map_err(any_err)
1282        });
1283
1284        methods.add_method("set_recipient", move |lua, this, value: mlua::Value| {
1285            let recipient = match value {
1286                mlua::Value::String(s) => {
1287                    let s = s.to_str()?;
1288                    EnvelopeAddress::parse(&s).map_err(any_err)?
1289                }
1290                _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1291            };
1292            this.set_recipient(recipient).map_err(any_err)
1293        });
1294
1295        #[cfg(feature = "impl")]
1296        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1297            this.dkim_sign(signer).await.map_err(any_err)
1298        });
1299
1300        methods.add_async_method("shrink", |_, this, _: ()| async move {
1301            if this.needs_save() {
1302                this.save(None).await.map_err(any_err)?;
1303            }
1304            this.shrink().map_err(any_err)
1305        });
1306
1307        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1308            if this.needs_save() {
1309                this.save(None).await.map_err(any_err)?;
1310            }
1311            this.shrink_data().map_err(any_err)
1312        });
1313
1314        methods.add_method(
1315            "add_authentication_results",
1316            move |lua, this, (serv_id, results): (String, mlua::Value)| {
1317                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1318                let results = AuthenticationResults {
1319                    serv_id,
1320                    version: None,
1321                    results,
1322                };
1323
1324                this.prepend_header(Some("Authentication-Results"), &results.encode_value());
1325
1326                Ok(())
1327            },
1328        );
1329
1330        #[cfg(feature = "impl")]
1331        methods.add_async_method("dkim_verify", |lua, this, ()| async move {
1332            let results = this.dkim_verify().await.map_err(any_err)?;
1333            lua.to_value_with(&results, serialize_options())
1334        });
1335
1336        methods.add_method(
1337            "prepend_header",
1338            move |_, this, (name, value): (String, String)| {
1339                this.prepend_header(Some(&name), &value);
1340                Ok(())
1341            },
1342        );
1343        methods.add_method(
1344            "append_header",
1345            move |_, this, (name, value): (String, String)| {
1346                this.append_header(Some(&name), &value);
1347                Ok(())
1348            },
1349        );
1350        methods.add_method("get_address_header", move |_, this, name: String| {
1351            this.get_address_header(&name).map_err(any_err)
1352        });
1353        methods.add_method("from_header", move |_, this, ()| {
1354            this.get_address_header("From").map_err(any_err)
1355        });
1356        methods.add_method("to_header", move |_, this, ()| {
1357            this.get_address_header("To").map_err(any_err)
1358        });
1359
1360        methods.add_method(
1361            "get_first_named_header_value",
1362            move |_, this, name: String| this.get_first_named_header_value(&name).map_err(any_err),
1363        );
1364        methods.add_method(
1365            "get_all_named_header_values",
1366            move |_, this, name: String| this.get_all_named_header_values(&name).map_err(any_err),
1367        );
1368        methods.add_method("get_all_headers", move |_, this, _: ()| {
1369            Ok(this
1370                .get_all_headers()
1371                .map_err(any_err)?
1372                .into_iter()
1373                .map(|(name, value)| vec![name, value])
1374                .collect::<Vec<Vec<String>>>())
1375        });
1376        methods.add_method("get_all_headers", move |_, this, _: ()| {
1377            Ok(this
1378                .get_all_headers()
1379                .map_err(any_err)?
1380                .into_iter()
1381                .map(|(name, value)| vec![name, value])
1382                .collect::<Vec<Vec<String>>>())
1383        });
1384        methods.add_method(
1385            "import_x_headers",
1386            move |_, this, names: Option<Vec<String>>| {
1387                this.import_x_headers(names.unwrap_or_default())
1388                    .map_err(any_err)
1389            },
1390        );
1391
1392        methods.add_method(
1393            "remove_x_headers",
1394            move |_, this, names: Option<Vec<String>>| {
1395                this.remove_x_headers(names.unwrap_or_default())
1396                    .map_err(any_err)
1397            },
1398        );
1399        methods.add_method("remove_all_named_headers", move |_, this, name: String| {
1400            this.remove_all_named_headers(&name).map_err(any_err)
1401        });
1402
1403        methods.add_method(
1404            "import_scheduling_header",
1405            move |lua, this, (header_name, remove): (String, bool)| {
1406                let opt_schedule = this
1407                    .import_scheduling_header(&header_name, remove)
1408                    .map_err(any_err)?;
1409                lua.to_value(&opt_schedule)
1410            },
1411        );
1412
1413        methods.add_method("set_scheduling", move |lua, this, params: mlua::Value| {
1414            let sched: Option<Scheduling> = from_lua_value(lua, params)?;
1415            let opt_schedule = this.set_scheduling(sched).map_err(any_err)?;
1416            lua.to_value(&opt_schedule)
1417        });
1418
1419        methods.add_method("parse_rfc3464", move |lua, this, _: ()| {
1420            let report = this.parse_rfc3464().map_err(any_err)?;
1421            match report {
1422                Some(report) => lua.to_value_with(&report, serialize_options()),
1423                None => Ok(mlua::Value::Nil),
1424            }
1425        });
1426
1427        methods.add_method("parse_rfc5965", move |lua, this, _: ()| {
1428            let report = this.parse_rfc5965().map_err(any_err)?;
1429            match report {
1430                Some(report) => lua.to_value_with(&report, serialize_options()),
1431                None => Ok(mlua::Value::Nil),
1432            }
1433        });
1434
1435        methods.add_async_method("save", |_, this, ()| async move {
1436            this.save(None).await.map_err(any_err)
1437        });
1438
1439        methods.add_method("set_force_sync", move |_, this, force: bool| {
1440            this.set_force_sync(force);
1441            Ok(())
1442        });
1443
1444        methods.add_async_method(
1445            "check_fix_conformance",
1446            |_, this, (check, fix): (String, String)| async move {
1447                use std::str::FromStr;
1448                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1449                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1450
1451                match this.check_fix_conformance(check, fix) {
1452                    Ok(_) => Ok(None),
1453                    Err(err) => Ok(Some(format!("{err:#}"))),
1454                }
1455            },
1456        );
1457    }
1458}
1459
1460impl TimerEntryWithDelay for WeakMessage {
1461    fn delay(&self) -> Duration {
1462        match self.upgrade() {
1463            None => {
1464                // Dangling/Cancelled. Make it appear due immediately
1465                Duration::from_millis(0)
1466            }
1467            Some(msg) => msg.delay(),
1468        }
1469    }
1470}
1471
1472impl TimerEntryWithDelay for Message {
1473    fn delay(&self) -> Duration {
1474        let inner = self.msg_and_id.inner.lock();
1475        match inner.due {
1476            Some(time) => {
1477                let now = Utc::now();
1478                let delta = time - now;
1479                delta.to_std().unwrap_or(Duration::from_millis(0))
1480            }
1481            None => Duration::from_millis(0),
1482        }
1483    }
1484}
1485
1486#[cfg(test)]
1487mod test {
1488    use super::*;
1489    use serde_json::json;
1490
1491    fn new_msg_body<S: AsRef<str>>(s: S) -> Message {
1492        Message::new_dirty(
1493            SpoolId::new(),
1494            EnvelopeAddress::parse("sender@example.com").unwrap(),
1495            EnvelopeAddress::parse("recip@example.com").unwrap(),
1496            serde_json::json!({}),
1497            Arc::new(s.as_ref().as_bytes().to_vec().into_boxed_slice()),
1498        )
1499        .unwrap()
1500    }
1501
1502    fn data_as_string(msg: &Message) -> String {
1503        String::from_utf8(msg.get_data().to_vec()).unwrap()
1504    }
1505
1506    const X_HDR_CONTENT: &str =
1507        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1508
1509    #[test]
1510    fn import_all_x_headers() {
1511        let msg = new_msg_body(X_HDR_CONTENT);
1512
1513        msg.import_x_headers(vec![]).unwrap();
1514        k9::assert_equal!(
1515            msg.get_meta_obj().unwrap(),
1516            json!({
1517                "x_hello": "there",
1518                "x_header": "value",
1519            })
1520        );
1521    }
1522
1523    #[test]
1524    fn meta_and_nil() {
1525        let msg = new_msg_body(X_HDR_CONTENT);
1526        // Ensure that json null round-trips
1527        msg.set_meta("test", serde_json::Value::Null).unwrap();
1528        k9::assert_equal!(msg.get_meta("test").unwrap(), serde_json::Value::Null);
1529
1530        // and that it is exposed to lua as nil
1531        let lua = mlua::Lua::new();
1532        lua.globals().set("msg", msg).unwrap();
1533        lua.load("assert(msg:get_meta('test') == nil)")
1534            .exec()
1535            .unwrap();
1536    }
1537
1538    #[test]
1539    fn import_some_x_headers() {
1540        let msg = new_msg_body(X_HDR_CONTENT);
1541
1542        msg.import_x_headers(vec!["x-hello".to_string()]).unwrap();
1543        k9::assert_equal!(
1544            msg.get_meta_obj().unwrap(),
1545            json!({
1546                "x_hello": "there",
1547            })
1548        );
1549    }
1550
1551    #[test]
1552    fn remove_all_x_headers() {
1553        let msg = new_msg_body(X_HDR_CONTENT);
1554
1555        msg.remove_x_headers(vec![]).unwrap();
1556        k9::assert_equal!(
1557            data_as_string(&msg),
1558            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1559        );
1560    }
1561
1562    #[test]
1563    fn prepend_header_2_params() {
1564        let msg = new_msg_body(X_HDR_CONTENT);
1565
1566        msg.prepend_header(Some("Date"), "Today");
1567        k9::assert_equal!(
1568            data_as_string(&msg),
1569            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1570        );
1571    }
1572
1573    #[test]
1574    fn prepend_header_1_params() {
1575        let msg = new_msg_body(X_HDR_CONTENT);
1576
1577        msg.prepend_header(None, "Date: Today");
1578        k9::assert_equal!(
1579            data_as_string(&msg),
1580            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1581        );
1582    }
1583
1584    #[test]
1585    fn append_header_2_params() {
1586        let msg = new_msg_body(X_HDR_CONTENT);
1587
1588        msg.append_header(Some("Date"), "Today");
1589        k9::assert_equal!(
1590            data_as_string(&msg),
1591            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1592        );
1593    }
1594
1595    #[test]
1596    fn append_header_1_params() {
1597        let msg = new_msg_body(X_HDR_CONTENT);
1598
1599        msg.append_header(None, "Date: Today");
1600        k9::assert_equal!(
1601            data_as_string(&msg),
1602            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1603        );
1604    }
1605
1606    const MULTI_HEADER_CONTENT: &str =
1607        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1608
1609    #[test]
1610    fn get_first_header() {
1611        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1612        k9::assert_equal!(
1613            msg.get_first_named_header_value("X-header")
1614                .unwrap()
1615                .unwrap(),
1616            "value"
1617        );
1618    }
1619
1620    #[test]
1621    fn get_all_header() {
1622        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1623        k9::assert_equal!(
1624            msg.get_all_named_header_values("X-header").unwrap(),
1625            vec!["value".to_string(), "another value".to_string()]
1626        );
1627    }
1628
1629    #[test]
1630    fn remove_first() {
1631        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1632        msg.remove_first_named_header("X-header").unwrap();
1633        k9::assert_equal!(
1634            data_as_string(&msg),
1635            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1636        );
1637    }
1638
1639    #[test]
1640    fn remove_all() {
1641        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1642        msg.remove_all_named_headers("X-header").unwrap();
1643        k9::assert_equal!(
1644            data_as_string(&msg),
1645            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1646        );
1647    }
1648
1649    #[test]
1650    fn append_text_plain() {
1651        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1652        msg.append_text_plain("I am at the bottom").unwrap();
1653        k9::assert_equal!(
1654            data_as_string(&msg),
1655            "X-Hello: there\r\n\
1656             X-Header: value\r\n\
1657             Subject: Hello\r\n\
1658             X-Header: another value\r\n\
1659             From :Someone@somewhere\r\n\
1660             Content-Type: text/plain;\r\n\
1661             \tcharset=\"us-ascii\"\r\n\
1662             \r\n\
1663             Body\r\n\
1664             I am at the bottom\r\n"
1665        );
1666    }
1667
1668    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1669\tboundary=\"my-boundary\"\r\n\
1670\r\n\
1671--my-boundary\r\n\
1672Content-Type: text/plain;\r\n\
1673\tcharset=\"us-ascii\"\r\n\
1674\r\n\
1675plain text\r\n\
1676--my-boundary\r\n\
1677Content-Type: text/html;\r\n\
1678\tcharset=\"us-ascii\"\r\n\
1679\r\n\
1680<b>rich</b> text\r\n\
1681--my-boundary\r\n\
1682Content-Type: application/octet-stream\r\n\
1683Content-Transfer-Encoding: base64\r\n\
1684Content-Disposition: attachment;\r\n\
1685\tfilename=\"woot.bin\"\r\n\
1686Content-ID: <woot.id@somewhere>\r\n\
1687\r\n\
1688AAECAw==\r\n\
1689--my-boundary--\r\n\
1690\r\n";
1691
1692    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1693\tboundary=\"my-boundary\"\r\n\
1694\r\n\
1695--my-boundary\r\n\
1696Content-Type: text/plain;\r\n\
1697\tcharset=\"us-ascii\"\r\n\
1698\r\n\
1699plain text\r\n\
1700--my-boundary\r\n\
1701Content-Type: text/html;\r\n\
1702\tcharset=\"us-ascii\"\r\n\
1703\r\n\
1704<BODY>\r\n\
1705<b>rich</b> text\r\n\
1706</BODY>\r\n\
1707--my-boundary\r\n\
1708Content-Type: application/octet-stream\r\n\
1709Content-Transfer-Encoding: base64\r\n\
1710Content-Disposition: attachment;\r\n\
1711\tfilename=\"woot.bin\"\r\n\
1712Content-ID: <woot.id>\r\n\
1713\r\n\
1714AAECAw==\r\n\
1715--my-boundary--\r\n\
1716\r\n";
1717
1718    #[test]
1719    fn append_text_html() {
1720        let msg = new_msg_body(MIXED_CONTENT);
1721        msg.append_text_html("bottom html").unwrap();
1722        k9::snapshot!(
1723            data_as_string(&msg),
1724            r#"
1725Content-Type: multipart/mixed;\r
1726\tboundary="my-boundary"\r
1727\r
1728--my-boundary\r
1729Content-Type: text/plain;\r
1730\tcharset="us-ascii"\r
1731\r
1732plain text\r
1733--my-boundary\r
1734Content-Type: text/html;\r
1735\tcharset="us-ascii"\r
1736\r
1737<b>rich</b> text\r
1738\r
1739bottom html\r
1740--my-boundary\r
1741Content-Type: application/octet-stream\r
1742Content-Transfer-Encoding: base64\r
1743Content-Disposition: attachment;\r
1744\tfilename="woot.bin"\r
1745Content-ID: <woot.id@somewhere>\r
1746\r
1747AAECAw==\r
1748--my-boundary--\r
1749\r
1750
1751"#
1752        );
1753
1754        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
1755        msg.append_text_html("bottom html 👻").unwrap();
1756        k9::snapshot!(
1757            data_as_string(&msg),
1758            r#"
1759Content-Type: multipart/mixed;\r
1760\tboundary="my-boundary"\r
1761\r
1762--my-boundary\r
1763Content-Type: text/plain;\r
1764\tcharset="us-ascii"\r
1765\r
1766plain text\r
1767--my-boundary\r
1768Content-Type: text/html;\r
1769\tcharset="utf-8"\r
1770Content-Transfer-Encoding: quoted-printable\r
1771\r
1772<BODY>\r
1773<b>rich</b> text\r
1774\r
1775bottom html =F0=9F=91=BB</BODY>\r
1776--my-boundary\r
1777Content-Type: application/octet-stream\r
1778Content-Transfer-Encoding: base64\r
1779Content-Disposition: attachment;\r
1780\tfilename="woot.bin"\r
1781Content-ID: <woot.id>\r
1782\r
1783AAECAw==\r
1784--my-boundary--\r
1785\r
1786
1787"#
1788        );
1789    }
1790
1791    #[test]
1792    fn append_text_plain_mixed() {
1793        let msg = new_msg_body(MIXED_CONTENT);
1794        msg.append_text_plain("bottom text 👾").unwrap();
1795        k9::snapshot!(
1796            data_as_string(&msg),
1797            r#"
1798Content-Type: multipart/mixed;\r
1799\tboundary="my-boundary"\r
1800\r
1801--my-boundary\r
1802Content-Type: text/plain;\r
1803\tcharset="utf-8"\r
1804Content-Transfer-Encoding: quoted-printable\r
1805\r
1806plain text\r
1807\r
1808bottom text =F0=9F=91=BE\r
1809--my-boundary\r
1810Content-Type: text/html;\r
1811\tcharset="us-ascii"\r
1812\r
1813<b>rich</b> text\r
1814--my-boundary\r
1815Content-Type: application/octet-stream\r
1816Content-Transfer-Encoding: base64\r
1817Content-Disposition: attachment;\r
1818\tfilename="woot.bin"\r
1819Content-ID: <woot.id@somewhere>\r
1820\r
1821AAECAw==\r
1822--my-boundary--\r
1823\r
1824
1825"#
1826        );
1827    }
1828
1829    #[test]
1830    fn check_conformance_angle_msg_id() {
1831        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
1832Message-ID: <<1234@example.com>>\r
1833\r
1834Hello";
1835        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
1836        k9::snapshot!(
1837            msg.check_fix_conformance(
1838                MessageConformance::MISSING_MESSAGE_ID_HEADER,
1839                MessageConformance::empty(),
1840            )
1841            .unwrap_err(),
1842            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
1843        );
1844
1845        msg.check_fix_conformance(
1846            MessageConformance::MISSING_MESSAGE_ID_HEADER,
1847            MessageConformance::MISSING_MESSAGE_ID_HEADER,
1848        )
1849        .unwrap();
1850
1851        // Can't use a snapshot test here because the fixed header
1852        // has a unique random component
1853        /*
1854                k9::snapshot!(
1855                    data_as_string(&msg),
1856                    r#"
1857        Subject: hello\r
1858        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
1859        \r
1860        Hello
1861        "#
1862                );
1863        */
1864
1865        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
1866Message-ID: <<1234@example.com>>\r
1867\r
1868Hello this is a really long line Hello this is a really long line \
1869Hello this is a really long line Hello this is a really long line \
1870Hello this is a really long line Hello this is a really long line \
1871Hello this is a really long line Hello this is a really long line \
1872Hello this is a really long line Hello this is a really long line \
1873Hello this is a really long line Hello this is a really long line \
1874Hello this is a really long line Hello this is a really long line
1875";
1876        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
1877        msg.check_fix_conformance(
1878            MessageConformance::MISSING_COLON_VALUE,
1879            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
1880        )
1881        .unwrap();
1882
1883        // Can't use a snapshot test here because the fixed header
1884        // has a random component
1885        /*
1886                k9::snapshot!(
1887                    data_as_string(&msg),
1888                    r#"
1889        Content-Type: text/plain;\r
1890        \tcharset="us-ascii"\r
1891        Content-Transfer-Encoding: quoted-printable\r
1892        Subject: hello\r
1893        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
1894        \r
1895        Hello this is a really long line Hello this is a really long line Hello thi=\r
1896        s is a really long line Hello this is a really long line Hello this is a re=\r
1897        ally long line Hello this is a really long line Hello this is a really long=\r
1898         line Hello this is a really long line Hello this is a really long line Hel=\r
1899        lo this is a really long line Hello this is a really long line Hello this i=\r
1900        s a really long line Hello this is a really long line Hello this is a reall=\r
1901        y long line=0A\r
1902
1903        "#
1904                );
1905        */
1906    }
1907
1908    #[test]
1909    fn check_conformance() {
1910        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1911        msg.check_fix_conformance(
1912            MessageConformance::default(),
1913            MessageConformance::MISSING_MIME_VERSION,
1914        )
1915        .unwrap();
1916        k9::snapshot!(
1917            data_as_string(&msg),
1918            r#"
1919X-Hello: there\r
1920X-Header: value\r
1921Subject: Hello\r
1922X-Header: another value\r
1923From :Someone@somewhere\r
1924Mime-Version: 1.0\r
1925\r
1926Body
1927"#
1928        );
1929
1930        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1931        msg.check_fix_conformance(
1932            MessageConformance::default(),
1933            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
1934        )
1935        .unwrap();
1936        k9::snapshot!(
1937            data_as_string(&msg),
1938            r#"
1939Content-Type: text/plain;\r
1940\tcharset="us-ascii"\r
1941X-Hello: there\r
1942X-Header: value\r
1943Subject: Hello\r
1944X-Header: another value\r
1945From: <Someone@somewhere>\r
1946Mime-Version: 1.0\r
1947\r
1948Body\r
1949
1950"#
1951        );
1952    }
1953
1954    #[test]
1955    fn set_scheduling() -> anyhow::Result<()> {
1956        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1957        assert!(msg.get_due().is_none(), "due is implicitly now");
1958
1959        let now = Utc::now();
1960        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
1961
1962        msg.set_scheduling(Some(Scheduling {
1963            restriction: None,
1964            first_attempt: Some((now + one_day).into()),
1965            expires: None,
1966        }))?;
1967
1968        let due = msg.get_due().expect("due to now be set");
1969        assert!(due - now >= one_day, "due time is at least 1 day away");
1970
1971        Ok(())
1972    }
1973
1974    #[cfg(all(test, target_pointer_width = "64"))]
1975    #[test]
1976    fn sizes() {
1977        assert_eq!(std::mem::size_of::<Message>(), 8);
1978        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
1979        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
1980    }
1981}