message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16use kumo_log_types::rfc3464::Report;
17use kumo_log_types::rfc5965::ARFReport;
18#[cfg(feature = "impl")]
19use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
20use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
21#[cfg(feature = "impl")]
22use mlua::{LuaSerdeExt, UserData, UserDataMethods};
23use parking_lot::Mutex;
24use prometheus::{Histogram, IntGauge};
25use serde::{Deserialize, Serialize};
26use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
27use std::hash::Hash;
28use std::sync::{Arc, LazyLock, Weak};
29use std::time::{Duration, Instant};
30use timeq::TimerEntryWithDelay;
31
32bitflags::bitflags! {
33    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
34    struct MessageFlags: u8 {
35        /// true if Metadata needs to be saved
36        const META_DIRTY = 1;
37        /// true if Data needs to be saved
38        const DATA_DIRTY = 2;
39        /// true if scheduling restrictions are present in the metadata
40        const SCHEDULED = 4;
41        /// true if high durability writes should always be used
42        const FORCE_SYNC = 8;
43    }
44}
45
46static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
47    prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
48});
49static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
50    prometheus::register_int_gauge!(
51        "message_meta_resident_count",
52        "total number of Message objects with metadata loaded"
53    )
54    .unwrap()
55});
56static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
57    prometheus::register_int_gauge!(
58        "message_data_resident_count",
59        "total number of Message objects with body data loaded"
60    )
61    .unwrap()
62});
63static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
64static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
65    prometheus::register_histogram!(
66        "message_save_latency",
67        "how long it takes to save a message to spool"
68    )
69    .unwrap()
70});
71static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
72    prometheus::register_histogram!(
73        "message_data_load_latency",
74        "how long it takes to load message data from spool"
75    )
76    .unwrap()
77});
78static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
79    prometheus::register_histogram!(
80        "message_meta_load_latency",
81        "how long it takes to load message metadata from spool"
82    )
83    .unwrap()
84});
85
86#[derive(Debug)]
87struct MessageInner {
88    metadata: Option<Box<MetaData>>,
89    data: Arc<Box<[u8]>>,
90    flags: MessageFlags,
91    num_attempts: u16,
92    due: Option<DateTime<Utc>>,
93}
94
95#[derive(Debug)]
96pub(crate) struct MessageWithId {
97    id: SpoolId,
98    inner: Mutex<MessageInner>,
99    link: LinkedListAtomicLink,
100}
101
102intrusive_adapter!(
103    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
104);
105
106/// A list of messages with an O(1) list overhead; no additional
107/// memory per-message is required to track this list.
108/// However, a given Message can only belong to one instance
109/// of such a list at a time.
110pub struct MessageList {
111    list: LinkedList<MessageWithIdAdapter>,
112    len: usize,
113}
114
115impl Default for MessageList {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl MessageList {
122    /// Create a new MessageList
123    pub fn new() -> Self {
124        Self {
125            list: LinkedList::new(MessageWithIdAdapter::default()),
126            len: 0,
127        }
128    }
129
130    /// Returns the number of elements contained in the list
131    pub fn len(&self) -> usize {
132        self.len
133    }
134
135    pub fn is_empty(&self) -> bool {
136        self.len == 0
137    }
138
139    /// Take all of the elements from this list and return them
140    /// in a new separate instance of MessageList.
141    pub fn take(&mut self) -> Self {
142        let new_list = Self {
143            list: self.list.take(),
144            len: self.len,
145        };
146        self.len = 0;
147        new_list
148    }
149
150    /// Push a message to the back of the list
151    pub fn push_back(&mut self, message: Message) {
152        self.list.push_back(message.msg_and_id);
153        self.len += 1;
154    }
155
156    /// Pop a message from the front of the list
157    pub fn pop_front(&mut self) -> Option<Message> {
158        self.list.pop_front().map(|msg_and_id| {
159            self.len -= 1;
160            Message { msg_and_id }
161        })
162    }
163
164    /// Pop a message from the back of the list
165    pub fn pop_back(&mut self) -> Option<Message> {
166        self.list.pop_back().map(|msg_and_id| {
167            self.len -= 1;
168            Message { msg_and_id }
169        })
170    }
171
172    /// Pop all of the messages from this list into a vector
173    /// of messages.
174    /// Memory usage is O(number-of-messages).
175    pub fn drain(&mut self) -> Vec<Message> {
176        let mut messages = Vec::with_capacity(self.len);
177        while let Some(msg) = self.pop_front() {
178            messages.push(msg);
179        }
180        messages
181    }
182
183    pub fn extend_from_iter<I>(&mut self, iter: I)
184    where
185        I: Iterator<Item = Message>,
186    {
187        for msg in iter {
188            self.push_back(msg)
189        }
190    }
191}
192
193impl IntoIterator for MessageList {
194    type Item = Message;
195    type IntoIter = MessageListIter;
196    fn into_iter(self) -> MessageListIter {
197        MessageListIter { list: self.list }
198    }
199}
200
201pub struct MessageListIter {
202    list: LinkedList<MessageWithIdAdapter>,
203}
204
205impl Iterator for MessageListIter {
206    type Item = Message;
207    fn next(&mut self) -> Option<Message> {
208        self.list
209            .pop_front()
210            .map(|msg_and_id| Message { msg_and_id })
211    }
212}
213
214#[derive(Clone, Debug)]
215#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
216pub struct Message {
217    pub(crate) msg_and_id: Arc<MessageWithId>,
218}
219
220impl PartialEq for Message {
221    fn eq(&self, other: &Self) -> bool {
222        self.id() == other.id()
223    }
224}
225impl Eq for Message {}
226
227impl Hash for Message {
228    fn hash<H>(&self, hasher: &mut H)
229    where
230        H: std::hash::Hasher,
231    {
232        self.id().hash(hasher)
233    }
234}
235
236#[derive(Clone, Debug)]
237pub struct WeakMessage {
238    weak: Weak<MessageWithId>,
239}
240
241impl WeakMessage {
242    pub fn upgrade(&self) -> Option<Message> {
243        Some(Message {
244            msg_and_id: self.weak.upgrade()?,
245        })
246    }
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250struct MetaData {
251    sender: EnvelopeAddress,
252    recipient: EnvelopeAddress,
253    meta: serde_json::Value,
254    #[serde(default)]
255    schedule: Option<Scheduling>,
256}
257
258impl Drop for MessageInner {
259    fn drop(&mut self) {
260        if self.metadata.is_some() {
261            META_COUNT.dec();
262        }
263        if !self.data.is_empty() {
264            DATA_COUNT.dec();
265        }
266        MESSAGE_COUNT.dec();
267    }
268}
269
270impl Message {
271    /// Create a new message with the supplied data.
272    /// The message meta and data are marked as dirty
273    pub fn new_dirty(
274        id: SpoolId,
275        sender: EnvelopeAddress,
276        recipient: EnvelopeAddress,
277        meta: serde_json::Value,
278        data: Arc<Box<[u8]>>,
279    ) -> anyhow::Result<Self> {
280        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
281        MESSAGE_COUNT.inc();
282        DATA_COUNT.inc();
283        META_COUNT.inc();
284        Ok(Self {
285            msg_and_id: Arc::new(MessageWithId {
286                id,
287                inner: Mutex::new(MessageInner {
288                    metadata: Some(Box::new(MetaData {
289                        sender,
290                        recipient,
291                        meta,
292                        schedule: None,
293                    })),
294                    data,
295                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
296                    num_attempts: 0,
297                    due: None,
298                }),
299                link: LinkedListAtomicLink::default(),
300            }),
301        })
302    }
303
304    pub fn weak(&self) -> WeakMessage {
305        WeakMessage {
306            weak: Arc::downgrade(&self.msg_and_id),
307        }
308    }
309
310    /// Helper for creating a message based on spool enumeration.
311    /// Given a spool id and the serialized metadata blob, returns
312    /// a message holding the deserialized version of that metadata.
313    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
314        let metadata: MetaData = serde_json::from_slice(&metadata)?;
315        MESSAGE_COUNT.inc();
316        META_COUNT.inc();
317
318        let flags = if metadata.schedule.is_some() {
319            MessageFlags::SCHEDULED
320        } else {
321            MessageFlags::empty()
322        };
323
324        Ok(Self {
325            msg_and_id: Arc::new(MessageWithId {
326                id,
327                inner: Mutex::new(MessageInner {
328                    metadata: Some(Box::new(metadata)),
329                    data: NO_DATA.clone(),
330                    flags,
331                    num_attempts: 0,
332                    due: None,
333                }),
334                link: LinkedListAtomicLink::default(),
335            }),
336        })
337    }
338
339    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
340        let meta_spool = get_meta_spool();
341        let data = meta_spool.load(id).await?;
342        Self::new_from_spool(id, data)
343    }
344
345    pub fn get_num_attempts(&self) -> u16 {
346        let inner = self.msg_and_id.inner.lock();
347        inner.num_attempts
348    }
349
350    pub fn set_num_attempts(&self, num_attempts: u16) {
351        let mut inner = self.msg_and_id.inner.lock();
352        inner.num_attempts = num_attempts;
353    }
354
355    pub fn increment_num_attempts(&self) {
356        let mut inner = self.msg_and_id.inner.lock();
357        inner.num_attempts += 1;
358    }
359
360    pub fn set_scheduling(
361        &self,
362        scheduling: Option<Scheduling>,
363    ) -> anyhow::Result<Option<Scheduling>> {
364        let mut inner = self.msg_and_id.inner.lock();
365        match &mut inner.metadata {
366            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
367            Some(meta) => {
368                meta.schedule = scheduling;
369                inner
370                    .flags
371                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
372                if let Some(sched) = scheduling {
373                    let due = inner.due.unwrap_or_else(Utc::now);
374                    inner.due = Some(sched.adjust_for_schedule(due));
375                }
376                Ok(scheduling)
377            }
378        }
379    }
380
381    pub fn get_scheduling(&self) -> Option<Scheduling> {
382        let inner = self.msg_and_id.inner.lock();
383        inner.metadata.as_ref().and_then(|meta| meta.schedule)
384    }
385
386    pub fn get_due(&self) -> Option<DateTime<Utc>> {
387        let inner = self.msg_and_id.inner.lock();
388        inner.due
389    }
390
391    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
392        let scale = rand::random::<f32>();
393        let value = (scale * limit as f32) as i64;
394        self.delay_by(seconds(value)?).await
395    }
396
397    pub async fn delay_by(
398        &self,
399        duration: chrono::Duration,
400    ) -> anyhow::Result<Option<DateTime<Utc>>> {
401        let due = Utc::now() + duration;
402        self.set_due(Some(due)).await
403    }
404
405    /// Delay by requested duration, and add up to 1 minute of jitter
406    pub async fn delay_by_and_jitter(
407        &self,
408        duration: chrono::Duration,
409    ) -> anyhow::Result<Option<DateTime<Utc>>> {
410        let scale = rand::random::<f32>();
411        let value = (scale * 60.) as i64;
412        let due = Utc::now() + duration + seconds(value)?;
413        self.set_due(Some(due)).await
414    }
415
416    pub async fn set_due(
417        &self,
418        due: Option<DateTime<Utc>>,
419    ) -> anyhow::Result<Option<DateTime<Utc>>> {
420        let due = {
421            let mut inner = self.msg_and_id.inner.lock();
422
423            if !inner.flags.contains(MessageFlags::SCHEDULED) {
424                // This is the simple, fast-path, common case
425                inner.due = due;
426                return Ok(inner.due);
427            }
428
429            let due = due.unwrap_or_else(Utc::now);
430
431            if let Some(meta) = &inner.metadata {
432                inner.due = match &meta.schedule {
433                    Some(sched) => Some(sched.adjust_for_schedule(due)),
434                    None => Some(due),
435                };
436                return Ok(inner.due);
437            }
438
439            // We'll need to load the metadata to correctly
440            // update the schedule for this message
441            due
442        };
443
444        self.load_meta().await?;
445
446        {
447            let mut inner = self.msg_and_id.inner.lock();
448            match &inner.metadata {
449                Some(meta) => {
450                    inner.due = match &meta.schedule {
451                        Some(sched) => Some(sched.adjust_for_schedule(due)),
452                        None => Some(due),
453                    };
454                    Ok(inner.due)
455                }
456                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
457            }
458        }
459    }
460
461    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
462        let inner = self.msg_and_id.inner.lock();
463        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
464            Some(Arc::clone(&inner.data))
465        } else {
466            None
467        }
468    }
469
470    fn get_meta_if_dirty(&self) -> Option<MetaData> {
471        let inner = self.msg_and_id.inner.lock();
472        if inner.flags.contains(MessageFlags::META_DIRTY) {
473            inner.metadata.as_ref().map(|md| (**md).clone())
474        } else {
475            None
476        }
477    }
478
479    pub fn set_force_sync(&self, force: bool) {
480        let mut inner = self.msg_and_id.inner.lock();
481        inner.flags.set(MessageFlags::FORCE_SYNC, force);
482    }
483
484    pub fn needs_save(&self) -> bool {
485        let inner = self.msg_and_id.inner.lock();
486        inner.flags.contains(MessageFlags::META_DIRTY)
487            || inner.flags.contains(MessageFlags::DATA_DIRTY)
488    }
489
490    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
491        let _timer = SAVE_HIST.start_timer();
492        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
493            .await
494    }
495
496    pub async fn save_to(
497        &self,
498        meta_spool: &(dyn Spool + Send + Sync),
499        data_spool: &(dyn Spool + Send + Sync),
500        deadline: Option<Instant>,
501    ) -> anyhow::Result<()> {
502        let force_sync = self
503            .msg_and_id
504            .inner
505            .lock()
506            .flags
507            .contains(MessageFlags::FORCE_SYNC);
508
509        let data_fut = if let Some(data) = self.get_data_if_dirty() {
510            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
511            data_spool
512                .store(self.msg_and_id.id, data, force_sync, deadline)
513                .map(|_| true)
514                .boxed()
515        } else {
516            futures::future::ready(false).boxed()
517        };
518        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
519            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
520            meta_spool
521                .store(self.msg_and_id.id, meta, force_sync, deadline)
522                .map(|_| true)
523                .boxed()
524        } else {
525            futures::future::ready(false).boxed()
526        };
527
528        // NOTE: if we have a deadline, it is tempting to want to use
529        // timeout_at here to enforce it, but the underlying spool
530        // futures are not guaranteed to be fully cancel safe, which
531        // is why we pass the deadline down to the save method to allow
532        // them to handle timeouts internally.
533        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
534
535        if data_res {
536            self.msg_and_id
537                .inner
538                .lock()
539                .flags
540                .remove(MessageFlags::DATA_DIRTY);
541        }
542        if meta_res {
543            self.msg_and_id
544                .inner
545                .lock()
546                .flags
547                .remove(MessageFlags::META_DIRTY);
548        }
549        Ok(())
550    }
551
552    pub fn id(&self) -> &SpoolId {
553        &self.msg_and_id.id
554    }
555
556    /// Save the data+meta if needed, then release both
557    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
558        self.save(None).await?;
559        self.shrink()
560    }
561
562    /// Save the data+meta if needed, then release just the data
563    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
564        self.save(None).await?;
565        self.shrink_data()
566    }
567
568    pub fn shrink_data(&self) -> anyhow::Result<bool> {
569        let mut inner = self.msg_and_id.inner.lock();
570        let mut did_shrink = false;
571        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
572            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
573        }
574        if !inner.data.is_empty() {
575            DATA_COUNT.dec();
576            did_shrink = true;
577        }
578        if !inner.data.is_empty() {
579            inner.data = NO_DATA.clone();
580            did_shrink = true;
581        }
582        Ok(did_shrink)
583    }
584
585    pub fn shrink(&self) -> anyhow::Result<bool> {
586        let mut inner = self.msg_and_id.inner.lock();
587        let mut did_shrink = false;
588        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
589            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
590        }
591        if inner.flags.contains(MessageFlags::META_DIRTY) {
592            anyhow::bail!("Cannot shrink message: META_DIRTY");
593        }
594        if inner.metadata.take().is_some() {
595            META_COUNT.dec();
596            did_shrink = true;
597        }
598        if !inner.data.is_empty() {
599            DATA_COUNT.dec();
600            did_shrink = true;
601        }
602        if !inner.data.is_empty() {
603            inner.data = NO_DATA.clone();
604            did_shrink = true;
605        }
606        Ok(did_shrink)
607    }
608
609    pub fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
610        let inner = self.msg_and_id.inner.lock();
611        match &inner.metadata {
612            Some(meta) => Ok(meta.sender.clone()),
613            None => anyhow::bail!("metadata is not loaded"),
614        }
615    }
616
617    pub fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
618        let mut inner = self.msg_and_id.inner.lock();
619        match &mut inner.metadata {
620            Some(meta) => {
621                meta.sender = sender;
622                inner.flags.set(MessageFlags::DATA_DIRTY, true);
623                Ok(())
624            }
625            None => anyhow::bail!("metadata is not loaded"),
626        }
627    }
628
629    pub fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
630        let inner = self.msg_and_id.inner.lock();
631        match &inner.metadata {
632            Some(meta) => Ok(meta.recipient.clone()),
633            None => anyhow::bail!("metadata is not loaded"),
634        }
635    }
636
637    pub fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
638        let mut inner = self.msg_and_id.inner.lock();
639        match &mut inner.metadata {
640            Some(meta) => {
641                meta.recipient = recipient;
642                inner.flags.set(MessageFlags::DATA_DIRTY, true);
643                Ok(())
644            }
645            None => anyhow::bail!("metadata is not loaded"),
646        }
647    }
648
649    pub fn is_meta_loaded(&self) -> bool {
650        self.msg_and_id.inner.lock().metadata.is_some()
651    }
652
653    pub fn is_data_loaded(&self) -> bool {
654        !self.msg_and_id.inner.lock().data.is_empty()
655    }
656
657    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
658        if self.is_meta_loaded() {
659            return Ok(());
660        }
661        self.load_meta().await
662    }
663
664    pub async fn load_data_if_needed(&self) -> anyhow::Result<()> {
665        if self.is_data_loaded() {
666            return Ok(());
667        }
668        self.load_data().await
669    }
670
671    pub async fn load_meta(&self) -> anyhow::Result<()> {
672        let _timer = LOAD_META_HIST.start_timer();
673        self.load_meta_from(&**get_meta_spool()).await
674    }
675
676    pub async fn load_meta_from(
677        &self,
678        meta_spool: &(dyn Spool + Send + Sync),
679    ) -> anyhow::Result<()> {
680        let id = self.id();
681        let data = meta_spool.load(*id).await?;
682        let mut inner = self.msg_and_id.inner.lock();
683        let was_not_loaded = inner.metadata.is_none();
684        let metadata: MetaData = serde_json::from_slice(&data)?;
685        inner.metadata.replace(Box::new(metadata));
686        if was_not_loaded {
687            META_COUNT.inc();
688        }
689        Ok(())
690    }
691
692    pub async fn load_data(&self) -> anyhow::Result<()> {
693        let _timer = LOAD_DATA_HIST.start_timer();
694        self.load_data_from(&**get_data_spool()).await
695    }
696
697    pub async fn load_data_from(
698        &self,
699        data_spool: &(dyn Spool + Send + Sync),
700    ) -> anyhow::Result<()> {
701        let data = data_spool.load(*self.id()).await?;
702        let mut inner = self.msg_and_id.inner.lock();
703        let was_empty = inner.data.is_empty();
704        inner.data = Arc::new(data.into_boxed_slice());
705        if was_empty {
706            DATA_COUNT.inc();
707        }
708        Ok(())
709    }
710
711    pub fn assign_data(&self, data: Vec<u8>) {
712        let mut inner = self.msg_and_id.inner.lock();
713        let was_empty = inner.data.is_empty();
714        inner.data = Arc::new(data.into_boxed_slice());
715        inner.flags.set(MessageFlags::DATA_DIRTY, true);
716        if was_empty {
717            DATA_COUNT.inc();
718        }
719    }
720
721    pub fn get_data(&self) -> Arc<Box<[u8]>> {
722        let inner = self.msg_and_id.inner.lock();
723        inner.data.clone()
724    }
725
726    pub fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
727        &self,
728        key: S,
729        value: V,
730    ) -> anyhow::Result<()> {
731        let mut inner = self.msg_and_id.inner.lock();
732        match &mut inner.metadata {
733            None => anyhow::bail!("set_meta: metadata must be loaded first"),
734            Some(meta) => {
735                let key = key.as_ref();
736                let value = value.into();
737
738                match &mut meta.meta {
739                    serde_json::Value::Object(map) => {
740                        map.insert(key.to_string(), value);
741                    }
742                    _ => anyhow::bail!("metadata is somehow not a json object"),
743                }
744
745                inner.flags.set(MessageFlags::META_DIRTY, true);
746                Ok(())
747            }
748        }
749    }
750
751    /// Retrieve `key` as a String.
752    pub fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
753        &self,
754        key: S,
755    ) -> anyhow::Result<Option<String>> {
756        match self.get_meta(key) {
757            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
758            Ok(serde_json::Value::Null) => Ok(None),
759            hmm => {
760                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
761            }
762        }
763    }
764
765    pub fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
766        let inner = self.msg_and_id.inner.lock();
767        match &inner.metadata {
768            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
769            Some(meta) => Ok(meta.meta.clone()),
770        }
771    }
772
773    pub fn get_meta<S: serde_json::value::Index>(
774        &self,
775        key: S,
776    ) -> anyhow::Result<serde_json::Value> {
777        let inner = self.msg_and_id.inner.lock();
778        match &inner.metadata {
779            None => anyhow::bail!("get_meta: metadata must be loaded first"),
780            Some(meta) => match meta.meta.get(key) {
781                Some(value) => Ok(value.clone()),
782                None => Ok(serde_json::Value::Null),
783            },
784        }
785    }
786
787    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
788        self.msg_and_id.id.age(now)
789    }
790
791    pub fn get_queue_name(&self) -> anyhow::Result<String> {
792        Ok(match self.get_meta_string("queue")? {
793            Some(name) => name,
794            None => {
795                let name = QueueNameComponents::format(
796                    self.get_meta_string("campaign")?,
797                    self.get_meta_string("tenant")?,
798                    self.recipient()?.domain().to_string().to_lowercase(),
799                    self.get_meta_string("routing_domain")?,
800                );
801                name.to_string()
802            }
803        })
804    }
805
806    #[cfg(feature = "impl")]
807    pub async fn dkim_verify(&self) -> anyhow::Result<Vec<AuthenticationResult>> {
808        let resolver = dns_resolver::get_resolver();
809        let data = self.get_data();
810        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
811
812        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
813        if parsed
814            .overall_conformance
815            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
816        {
817            return Ok(vec![AuthenticationResult {
818                method: "dkim".to_string(),
819                method_version: None,
820                result: "permerror".to_string(),
821                reason: Some("message has non-canonical line endings".to_string()),
822                props: Default::default(),
823            }]);
824        }
825        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
826
827        let from = message
828            .get_headers()
829            .from()
830            .map_err(any_err)?
831            .ok_or_else(|| anyhow::anyhow!("Missing or invalid From header"))?
832            .0;
833        if from.len() != 1 {
834            anyhow::bail!(
835                "From header must have a single sender, found {}",
836                from.len()
837            );
838        }
839        let from_domain = &from[0].address.domain;
840
841        let results =
842            kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
843        Ok(results)
844    }
845
846    pub fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
847        let data = self.get_data();
848        Report::parse(&data)
849    }
850
851    pub fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
852        let data = self.get_data();
853        ARFReport::parse(&data)
854    }
855
856    pub fn prepend_header(&self, name: Option<&str>, value: &str) {
857        let data = self.get_data();
858        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
859        emit_header(&mut new_data, name, value);
860        new_data.extend_from_slice(&data);
861        self.assign_data(new_data);
862    }
863
864    pub fn append_header(&self, name: Option<&str>, value: &str) {
865        let data = self.get_data();
866        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
867        for (idx, window) in data.windows(4).enumerate() {
868            if window == b"\r\n\r\n" {
869                let headers = &data[0..idx + 2];
870                let body = &data[idx + 2..];
871
872                new_data.extend_from_slice(headers);
873                emit_header(&mut new_data, name, value);
874                new_data.extend_from_slice(body);
875                self.assign_data(new_data);
876                return;
877            }
878        }
879    }
880
881    pub fn get_address_header(
882        &self,
883        header_name: &str,
884    ) -> anyhow::Result<Option<HeaderAddressList>> {
885        let data = self.get_data();
886        let HeaderParseResult { headers, .. } =
887            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
888
889        match headers.get_first(header_name) {
890            Some(hdr) => {
891                let list = hdr.as_address_list()?;
892                let result: HeaderAddressList = list.into();
893                Ok(Some(result))
894            }
895            None => Ok(None),
896        }
897    }
898
899    pub fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
900        let data = self.get_data();
901        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
902
903        match headers.get_first(name) {
904            Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
905            None => Ok(None),
906        }
907    }
908
909    pub fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
910        let data = self.get_data();
911        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
912
913        let mut values = vec![];
914        for hdr in headers.iter_named(name) {
915            values.push(hdr.as_unstructured()?);
916        }
917        Ok(values)
918    }
919
920    pub fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
921        let data = self.get_data();
922        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
923
924        let mut values = vec![];
925        for hdr in headers.iter() {
926            values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
927        }
928        Ok(values)
929    }
930
931    pub fn retain_headers<F: FnMut(&Header) -> bool>(&self, mut func: F) -> anyhow::Result<()> {
932        let data = self.get_data();
933        let mut new_data = Vec::with_capacity(data.len());
934        let HeaderParseResult {
935            headers,
936            body_offset,
937            ..
938        } = Header::parse_headers(data.as_ref().as_ref())?;
939        for hdr in headers.iter() {
940            let retain = (func)(hdr);
941            if !retain {
942                continue;
943            }
944            hdr.write_header(&mut new_data)?;
945        }
946        new_data.extend_from_slice(b"\r\n");
947        new_data.extend_from_slice(&data[body_offset..]);
948        self.assign_data(new_data);
949        Ok(())
950    }
951
952    pub fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
953        let mut removed = false;
954        self.retain_headers(|hdr| {
955            if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
956                removed = true;
957                false
958            } else {
959                true
960            }
961        })
962    }
963
964    pub fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
965        let data = self.get_data();
966        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
967
968        for hdr in headers.iter() {
969            let do_import = if names.is_empty() {
970                is_x_header(hdr.get_name())
971            } else {
972                is_header_in_names_list(hdr.get_name(), &names)
973            };
974            if do_import {
975                let name = imported_header_name(hdr.get_name());
976                self.set_meta(name, hdr.as_unstructured()?)?;
977            }
978        }
979
980        Ok(())
981    }
982
983    pub fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
984        self.retain_headers(|hdr| {
985            if names.is_empty() {
986                !is_x_header(hdr.get_name())
987            } else {
988                !is_header_in_names_list(hdr.get_name(), &names)
989            }
990        })
991    }
992
993    pub fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
994        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
995    }
996
997    #[cfg(feature = "impl")]
998    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
999        if let Some(runtime) = SIGN_POOL.get() {
1000            let msg = self.clone();
1001            runtime
1002                .spawn_blocking(move || {
1003                    let data = msg.get_data();
1004                    let header = signer.sign(&data)?;
1005                    msg.prepend_header(None, &header);
1006                    Ok::<(), anyhow::Error>(())
1007                })
1008                .await??;
1009        } else {
1010            let data = self.get_data();
1011            let header = signer.sign(&data)?;
1012            self.prepend_header(None, &header);
1013        }
1014        Ok(())
1015    }
1016
1017    pub fn import_scheduling_header(
1018        &self,
1019        header_name: &str,
1020        remove: bool,
1021    ) -> anyhow::Result<Option<Scheduling>> {
1022        if let Some(value) = self.get_first_named_header_value(header_name)? {
1023            let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1024                format!("{value} from header {header_name} is not a valid Scheduling header")
1025            })?;
1026            let result = self.set_scheduling(Some(sched))?;
1027
1028            if remove {
1029                self.remove_all_named_headers(header_name)?;
1030            }
1031
1032            Ok(result)
1033        } else {
1034            Ok(None)
1035        }
1036    }
1037
1038    pub fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1039        let data = self.get_data();
1040        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1041        let parts = msg.simplified_structure_pointers()?;
1042        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1043            match p.body()? {
1044                DecodedBody::Text(text) => {
1045                    let mut text = text.as_str().to_string();
1046                    text.push_str("\r\n");
1047                    text.push_str(content);
1048                    p.replace_text_body("text/plain", &text);
1049
1050                    let new_data = msg.to_message_string();
1051                    self.assign_data(new_data.into_bytes());
1052                    Ok(true)
1053                }
1054                DecodedBody::Binary(_) => {
1055                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1056                }
1057            }
1058        } else {
1059            Ok(false)
1060        }
1061    }
1062
1063    pub fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1064        let data = self.get_data();
1065        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1066        let parts = msg.simplified_structure_pointers()?;
1067        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1068            match p.body()? {
1069                DecodedBody::Text(text) => {
1070                    let mut text = text.as_str().to_string();
1071
1072                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1073                        Some(idx) => {
1074                            text.insert_str(idx, content);
1075                            text.insert_str(idx, "\r\n");
1076                        }
1077                        None => {
1078                            // Just append
1079                            text.push_str("\r\n");
1080                            text.push_str(content);
1081                        }
1082                    }
1083
1084                    p.replace_text_body("text/html", &text);
1085
1086                    let new_data = msg.to_message_string();
1087                    self.assign_data(new_data.into_bytes());
1088                    Ok(true)
1089                }
1090                DecodedBody::Binary(_) => {
1091                    anyhow::bail!("expected text/html part to be text, but it is binary");
1092                }
1093            }
1094        } else {
1095            Ok(false)
1096        }
1097    }
1098
1099    pub fn check_fix_conformance(
1100        &self,
1101        check: MessageConformance,
1102        fix: MessageConformance,
1103    ) -> anyhow::Result<()> {
1104        let data = self.get_data();
1105        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1106
1107        let conformance = msg.conformance();
1108
1109        // Don't raise errors for things that we're going to fix anyway
1110        let check = check - fix;
1111
1112        if check.intersects(conformance) {
1113            let problems = check.intersection(conformance).to_string();
1114            anyhow::bail!("Message has conformance issues: {problems}");
1115        }
1116
1117        if fix.intersects(conformance) {
1118            let to_fix = fix.intersection(conformance);
1119            let problems = to_fix.to_string();
1120
1121            let missing_headers_only = to_fix
1122                .difference(
1123                    MessageConformance::MISSING_DATE_HEADER
1124                        | MessageConformance::MISSING_MIME_VERSION
1125                        | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1126                )
1127                .is_empty();
1128
1129            if !missing_headers_only {
1130                msg = msg.rebuild().with_context(|| {
1131                    format!("Rebuilding message to correct conformance issues: {problems}")
1132                })?;
1133            }
1134
1135            if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1136                msg.headers_mut().set_date(Utc::now());
1137            }
1138
1139            if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1140                msg.headers_mut().set_mime_version("1.0");
1141            }
1142
1143            if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1144                let sender = self.sender()?;
1145                let domain = sender.domain();
1146                let id = *self.id();
1147                msg.headers_mut()
1148                    .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")));
1149            }
1150
1151            let new_data = msg.to_message_string();
1152            self.assign_data(new_data.into_bytes());
1153        }
1154
1155        Ok(())
1156    }
1157}
1158
1159fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1160    for name in names {
1161        if hdr_name.eq_ignore_ascii_case(name) {
1162            return true;
1163        }
1164    }
1165    false
1166}
1167
1168fn imported_header_name(name: &str) -> String {
1169    name.chars()
1170        .map(|c| match c.to_ascii_lowercase() {
1171            '-' => '_',
1172            c => c,
1173        })
1174        .collect()
1175}
1176
1177fn is_x_header(name: &str) -> bool {
1178    name.starts_with("X-") || name.starts_with("x-")
1179}
1180
1181fn size_header(name: Option<&str>, value: &str) -> usize {
1182    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1183}
1184
1185fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1186    if let Some(name) = name {
1187        dest.extend_from_slice(name.as_bytes());
1188        dest.extend_from_slice(b": ");
1189    }
1190    dest.extend_from_slice(value.as_bytes());
1191    if !value.ends_with("\r\n") {
1192        dest.extend_from_slice(b"\r\n");
1193    }
1194}
1195
1196#[cfg(feature = "impl")]
1197impl UserData for Message {
1198    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1199        methods.add_async_method(
1200            "set_meta",
1201            move |_, this, (name, value): (String, mlua::Value)| async move {
1202                this.load_meta_if_needed().await.map_err(any_err)?;
1203                let value = serde_json::value::to_value(value).map_err(any_err)?;
1204                this.set_meta(name, value).map_err(any_err)?;
1205                Ok(())
1206            },
1207        );
1208        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1209            this.load_meta_if_needed().await.map_err(any_err)?;
1210            let value = this.get_meta(name).map_err(any_err)?;
1211            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1212        });
1213        methods.add_async_method("get_data", move |lua, this, _: ()| async move {
1214            this.load_data_if_needed().await.map_err(any_err)?;
1215            let data = this.get_data();
1216            lua.create_string(&*data)
1217        });
1218        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1219            this.assign_data(data.as_bytes().to_vec());
1220            Ok(())
1221        });
1222
1223        methods.add_method("append_text_plain", move |_lua, this, data: String| {
1224            this.append_text_plain(&data).map_err(any_err)
1225        });
1226
1227        methods.add_method("append_text_html", move |_lua, this, data: String| {
1228            this.append_text_html(&data).map_err(any_err)
1229        });
1230
1231        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1232        methods.add_method("sender", move |_, this, _: ()| {
1233            this.sender().map_err(any_err)
1234        });
1235
1236        methods.add_method("num_attempts", move |_, this, _: ()| {
1237            Ok(this.get_num_attempts())
1238        });
1239
1240        methods.add_method("queue_name", move |_, this, _: ()| {
1241            this.get_queue_name().map_err(any_err)
1242        });
1243
1244        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1245            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1246            let revised_due = this.set_due(due).await.map_err(any_err)?;
1247            lua.to_value(&revised_due)
1248        });
1249
1250        methods.add_method("set_sender", move |lua, this, value: mlua::Value| {
1251            let sender = match value {
1252                mlua::Value::String(s) => {
1253                    let s = s.to_str()?;
1254                    EnvelopeAddress::parse(&s).map_err(any_err)?
1255                }
1256                _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1257            };
1258            this.set_sender(sender).map_err(any_err)
1259        });
1260
1261        methods.add_method("recipient", move |_, this, _: ()| {
1262            this.recipient().map_err(any_err)
1263        });
1264
1265        methods.add_method("set_recipient", move |lua, this, value: mlua::Value| {
1266            let recipient = match value {
1267                mlua::Value::String(s) => {
1268                    let s = s.to_str()?;
1269                    EnvelopeAddress::parse(&s).map_err(any_err)?
1270                }
1271                _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1272            };
1273            this.set_recipient(recipient).map_err(any_err)
1274        });
1275
1276        #[cfg(feature = "impl")]
1277        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1278            this.dkim_sign(signer).await.map_err(any_err)
1279        });
1280
1281        methods.add_async_method("shrink", |_, this, _: ()| async move {
1282            if this.needs_save() {
1283                this.save(None).await.map_err(any_err)?;
1284            }
1285            this.shrink().map_err(any_err)
1286        });
1287
1288        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1289            if this.needs_save() {
1290                this.save(None).await.map_err(any_err)?;
1291            }
1292            this.shrink_data().map_err(any_err)
1293        });
1294
1295        methods.add_method(
1296            "add_authentication_results",
1297            move |lua, this, (serv_id, results): (String, mlua::Value)| {
1298                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1299                let results = AuthenticationResults {
1300                    serv_id,
1301                    version: None,
1302                    results,
1303                };
1304
1305                this.prepend_header(Some("Authentication-Results"), &results.encode_value());
1306
1307                Ok(())
1308            },
1309        );
1310
1311        #[cfg(feature = "impl")]
1312        methods.add_async_method("dkim_verify", |lua, this, ()| async move {
1313            let results = this.dkim_verify().await.map_err(any_err)?;
1314            lua.to_value_with(&results, serialize_options())
1315        });
1316
1317        methods.add_method(
1318            "prepend_header",
1319            move |_, this, (name, value): (String, String)| {
1320                this.prepend_header(Some(&name), &value);
1321                Ok(())
1322            },
1323        );
1324        methods.add_method(
1325            "append_header",
1326            move |_, this, (name, value): (String, String)| {
1327                this.append_header(Some(&name), &value);
1328                Ok(())
1329            },
1330        );
1331        methods.add_method("get_address_header", move |_, this, name: String| {
1332            this.get_address_header(&name).map_err(any_err)
1333        });
1334        methods.add_method("from_header", move |_, this, ()| {
1335            this.get_address_header("From").map_err(any_err)
1336        });
1337        methods.add_method("to_header", move |_, this, ()| {
1338            this.get_address_header("To").map_err(any_err)
1339        });
1340
1341        methods.add_method(
1342            "get_first_named_header_value",
1343            move |_, this, name: String| this.get_first_named_header_value(&name).map_err(any_err),
1344        );
1345        methods.add_method(
1346            "get_all_named_header_values",
1347            move |_, this, name: String| this.get_all_named_header_values(&name).map_err(any_err),
1348        );
1349        methods.add_method("get_all_headers", move |_, this, _: ()| {
1350            Ok(this
1351                .get_all_headers()
1352                .map_err(any_err)?
1353                .into_iter()
1354                .map(|(name, value)| vec![name, value])
1355                .collect::<Vec<Vec<String>>>())
1356        });
1357        methods.add_method("get_all_headers", move |_, this, _: ()| {
1358            Ok(this
1359                .get_all_headers()
1360                .map_err(any_err)?
1361                .into_iter()
1362                .map(|(name, value)| vec![name, value])
1363                .collect::<Vec<Vec<String>>>())
1364        });
1365        methods.add_method(
1366            "import_x_headers",
1367            move |_, this, names: Option<Vec<String>>| {
1368                this.import_x_headers(names.unwrap_or_default())
1369                    .map_err(any_err)
1370            },
1371        );
1372
1373        methods.add_method(
1374            "remove_x_headers",
1375            move |_, this, names: Option<Vec<String>>| {
1376                this.remove_x_headers(names.unwrap_or_default())
1377                    .map_err(any_err)
1378            },
1379        );
1380        methods.add_method("remove_all_named_headers", move |_, this, name: String| {
1381            this.remove_all_named_headers(&name).map_err(any_err)
1382        });
1383
1384        methods.add_method(
1385            "import_scheduling_header",
1386            move |lua, this, (header_name, remove): (String, bool)| {
1387                let opt_schedule = this
1388                    .import_scheduling_header(&header_name, remove)
1389                    .map_err(any_err)?;
1390                lua.to_value(&opt_schedule)
1391            },
1392        );
1393
1394        methods.add_method("set_scheduling", move |lua, this, params: mlua::Value| {
1395            let sched: Option<Scheduling> = from_lua_value(lua, params)?;
1396            let opt_schedule = this.set_scheduling(sched).map_err(any_err)?;
1397            lua.to_value(&opt_schedule)
1398        });
1399
1400        methods.add_method("parse_rfc3464", move |lua, this, _: ()| {
1401            let report = this.parse_rfc3464().map_err(any_err)?;
1402            match report {
1403                Some(report) => lua.to_value_with(&report, serialize_options()),
1404                None => Ok(mlua::Value::Nil),
1405            }
1406        });
1407
1408        methods.add_method("parse_rfc5965", move |lua, this, _: ()| {
1409            let report = this.parse_rfc5965().map_err(any_err)?;
1410            match report {
1411                Some(report) => lua.to_value_with(&report, serialize_options()),
1412                None => Ok(mlua::Value::Nil),
1413            }
1414        });
1415
1416        methods.add_async_method("save", |_, this, ()| async move {
1417            this.save(None).await.map_err(any_err)
1418        });
1419
1420        methods.add_method("set_force_sync", move |_, this, force: bool| {
1421            this.set_force_sync(force);
1422            Ok(())
1423        });
1424
1425        methods.add_async_method(
1426            "check_fix_conformance",
1427            |_, this, (check, fix): (String, String)| async move {
1428                use std::str::FromStr;
1429                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1430                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1431
1432                match this.check_fix_conformance(check, fix) {
1433                    Ok(_) => Ok(None),
1434                    Err(err) => Ok(Some(format!("{err:#}"))),
1435                }
1436            },
1437        );
1438    }
1439}
1440
1441impl TimerEntryWithDelay for WeakMessage {
1442    fn delay(&self) -> Duration {
1443        match self.upgrade() {
1444            None => {
1445                // Dangling/Cancelled. Make it appear due immediately
1446                Duration::from_millis(0)
1447            }
1448            Some(msg) => msg.delay(),
1449        }
1450    }
1451}
1452
1453impl TimerEntryWithDelay for Message {
1454    fn delay(&self) -> Duration {
1455        let inner = self.msg_and_id.inner.lock();
1456        match inner.due {
1457            Some(time) => {
1458                let now = Utc::now();
1459                let delta = time - now;
1460                delta.to_std().unwrap_or(Duration::from_millis(0))
1461            }
1462            None => Duration::from_millis(0),
1463        }
1464    }
1465}
1466
1467#[cfg(test)]
1468mod test {
1469    use super::*;
1470    use serde_json::json;
1471
1472    fn new_msg_body<S: AsRef<str>>(s: S) -> Message {
1473        Message::new_dirty(
1474            SpoolId::new(),
1475            EnvelopeAddress::parse("sender@example.com").unwrap(),
1476            EnvelopeAddress::parse("recip@example.com").unwrap(),
1477            serde_json::json!({}),
1478            Arc::new(s.as_ref().as_bytes().to_vec().into_boxed_slice()),
1479        )
1480        .unwrap()
1481    }
1482
1483    fn data_as_string(msg: &Message) -> String {
1484        String::from_utf8(msg.get_data().to_vec()).unwrap()
1485    }
1486
1487    const X_HDR_CONTENT: &str =
1488        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1489
1490    #[test]
1491    fn import_all_x_headers() {
1492        let msg = new_msg_body(X_HDR_CONTENT);
1493
1494        msg.import_x_headers(vec![]).unwrap();
1495        k9::assert_equal!(
1496            msg.get_meta_obj().unwrap(),
1497            json!({
1498                "x_hello": "there",
1499                "x_header": "value",
1500            })
1501        );
1502    }
1503
1504    #[test]
1505    fn meta_and_nil() {
1506        let msg = new_msg_body(X_HDR_CONTENT);
1507        // Ensure that json null round-trips
1508        msg.set_meta("test", serde_json::Value::Null).unwrap();
1509        k9::assert_equal!(msg.get_meta("test").unwrap(), serde_json::Value::Null);
1510
1511        // and that it is exposed to lua as nil
1512        let lua = mlua::Lua::new();
1513        lua.globals().set("msg", msg).unwrap();
1514        lua.load("assert(msg:get_meta('test') == nil)")
1515            .exec()
1516            .unwrap();
1517    }
1518
1519    #[test]
1520    fn import_some_x_headers() {
1521        let msg = new_msg_body(X_HDR_CONTENT);
1522
1523        msg.import_x_headers(vec!["x-hello".to_string()]).unwrap();
1524        k9::assert_equal!(
1525            msg.get_meta_obj().unwrap(),
1526            json!({
1527                "x_hello": "there",
1528            })
1529        );
1530    }
1531
1532    #[test]
1533    fn remove_all_x_headers() {
1534        let msg = new_msg_body(X_HDR_CONTENT);
1535
1536        msg.remove_x_headers(vec![]).unwrap();
1537        k9::assert_equal!(
1538            data_as_string(&msg),
1539            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1540        );
1541    }
1542
1543    #[test]
1544    fn prepend_header_2_params() {
1545        let msg = new_msg_body(X_HDR_CONTENT);
1546
1547        msg.prepend_header(Some("Date"), "Today");
1548        k9::assert_equal!(
1549            data_as_string(&msg),
1550            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1551        );
1552    }
1553
1554    #[test]
1555    fn prepend_header_1_params() {
1556        let msg = new_msg_body(X_HDR_CONTENT);
1557
1558        msg.prepend_header(None, "Date: Today");
1559        k9::assert_equal!(
1560            data_as_string(&msg),
1561            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1562        );
1563    }
1564
1565    #[test]
1566    fn append_header_2_params() {
1567        let msg = new_msg_body(X_HDR_CONTENT);
1568
1569        msg.append_header(Some("Date"), "Today");
1570        k9::assert_equal!(
1571            data_as_string(&msg),
1572            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1573        );
1574    }
1575
1576    #[test]
1577    fn append_header_1_params() {
1578        let msg = new_msg_body(X_HDR_CONTENT);
1579
1580        msg.append_header(None, "Date: Today");
1581        k9::assert_equal!(
1582            data_as_string(&msg),
1583            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1584        );
1585    }
1586
1587    const MULTI_HEADER_CONTENT: &str =
1588        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1589
1590    #[test]
1591    fn get_first_header() {
1592        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1593        k9::assert_equal!(
1594            msg.get_first_named_header_value("X-header")
1595                .unwrap()
1596                .unwrap(),
1597            "value"
1598        );
1599    }
1600
1601    #[test]
1602    fn get_all_header() {
1603        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1604        k9::assert_equal!(
1605            msg.get_all_named_header_values("X-header").unwrap(),
1606            vec!["value".to_string(), "another value".to_string()]
1607        );
1608    }
1609
1610    #[test]
1611    fn remove_first() {
1612        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1613        msg.remove_first_named_header("X-header").unwrap();
1614        k9::assert_equal!(
1615            data_as_string(&msg),
1616            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1617        );
1618    }
1619
1620    #[test]
1621    fn remove_all() {
1622        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1623        msg.remove_all_named_headers("X-header").unwrap();
1624        k9::assert_equal!(
1625            data_as_string(&msg),
1626            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1627        );
1628    }
1629
1630    #[test]
1631    fn append_text_plain() {
1632        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1633        msg.append_text_plain("I am at the bottom").unwrap();
1634        k9::assert_equal!(
1635            data_as_string(&msg),
1636            "X-Hello: there\r\n\
1637             X-Header: value\r\n\
1638             Subject: Hello\r\n\
1639             X-Header: another value\r\n\
1640             From :Someone@somewhere\r\n\
1641             Content-Type: text/plain;\r\n\
1642             \tcharset=\"us-ascii\"\r\n\
1643             \r\n\
1644             Body\r\n\
1645             I am at the bottom\r\n"
1646        );
1647    }
1648
1649    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1650\tboundary=\"my-boundary\"\r\n\
1651\r\n\
1652--my-boundary\r\n\
1653Content-Type: text/plain;\r\n\
1654\tcharset=\"us-ascii\"\r\n\
1655\r\n\
1656plain text\r\n\
1657--my-boundary\r\n\
1658Content-Type: text/html;\r\n\
1659\tcharset=\"us-ascii\"\r\n\
1660\r\n\
1661<b>rich</b> text\r\n\
1662--my-boundary\r\n\
1663Content-Type: application/octet-stream\r\n\
1664Content-Transfer-Encoding: base64\r\n\
1665Content-Disposition: attachment;\r\n\
1666\tfilename=\"woot.bin\"\r\n\
1667Content-ID: <woot.id@somewhere>\r\n\
1668\r\n\
1669AAECAw==\r\n\
1670--my-boundary--\r\n\
1671\r\n";
1672
1673    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1674\tboundary=\"my-boundary\"\r\n\
1675\r\n\
1676--my-boundary\r\n\
1677Content-Type: text/plain;\r\n\
1678\tcharset=\"us-ascii\"\r\n\
1679\r\n\
1680plain text\r\n\
1681--my-boundary\r\n\
1682Content-Type: text/html;\r\n\
1683\tcharset=\"us-ascii\"\r\n\
1684\r\n\
1685<BODY>\r\n\
1686<b>rich</b> text\r\n\
1687</BODY>\r\n\
1688--my-boundary\r\n\
1689Content-Type: application/octet-stream\r\n\
1690Content-Transfer-Encoding: base64\r\n\
1691Content-Disposition: attachment;\r\n\
1692\tfilename=\"woot.bin\"\r\n\
1693Content-ID: <woot.id>\r\n\
1694\r\n\
1695AAECAw==\r\n\
1696--my-boundary--\r\n\
1697\r\n";
1698
1699    #[test]
1700    fn append_text_html() {
1701        let msg = new_msg_body(MIXED_CONTENT);
1702        msg.append_text_html("bottom html").unwrap();
1703        k9::snapshot!(
1704            data_as_string(&msg),
1705            r#"
1706Content-Type: multipart/mixed;\r
1707\tboundary="my-boundary"\r
1708\r
1709--my-boundary\r
1710Content-Type: text/plain;\r
1711\tcharset="us-ascii"\r
1712\r
1713plain text\r
1714--my-boundary\r
1715Content-Type: text/html;\r
1716\tcharset="us-ascii"\r
1717\r
1718<b>rich</b> text\r
1719\r
1720bottom html\r
1721--my-boundary\r
1722Content-Type: application/octet-stream\r
1723Content-Transfer-Encoding: base64\r
1724Content-Disposition: attachment;\r
1725\tfilename="woot.bin"\r
1726Content-ID: <woot.id@somewhere>\r
1727\r
1728AAECAw==\r
1729--my-boundary--\r
1730\r
1731
1732"#
1733        );
1734
1735        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
1736        msg.append_text_html("bottom html 👻").unwrap();
1737        k9::snapshot!(
1738            data_as_string(&msg),
1739            r#"
1740Content-Type: multipart/mixed;\r
1741\tboundary="my-boundary"\r
1742\r
1743--my-boundary\r
1744Content-Type: text/plain;\r
1745\tcharset="us-ascii"\r
1746\r
1747plain text\r
1748--my-boundary\r
1749Content-Type: text/html;\r
1750\tcharset="utf-8"\r
1751Content-Transfer-Encoding: quoted-printable\r
1752\r
1753<BODY>\r
1754<b>rich</b> text\r
1755\r
1756bottom html =F0=9F=91=BB</BODY>\r
1757--my-boundary\r
1758Content-Type: application/octet-stream\r
1759Content-Transfer-Encoding: base64\r
1760Content-Disposition: attachment;\r
1761\tfilename="woot.bin"\r
1762Content-ID: <woot.id>\r
1763\r
1764AAECAw==\r
1765--my-boundary--\r
1766\r
1767
1768"#
1769        );
1770    }
1771
1772    #[test]
1773    fn append_text_plain_mixed() {
1774        let msg = new_msg_body(MIXED_CONTENT);
1775        msg.append_text_plain("bottom text 👾").unwrap();
1776        k9::snapshot!(
1777            data_as_string(&msg),
1778            r#"
1779Content-Type: multipart/mixed;\r
1780\tboundary="my-boundary"\r
1781\r
1782--my-boundary\r
1783Content-Type: text/plain;\r
1784\tcharset="utf-8"\r
1785Content-Transfer-Encoding: quoted-printable\r
1786\r
1787plain text\r
1788\r
1789bottom text =F0=9F=91=BE\r
1790--my-boundary\r
1791Content-Type: text/html;\r
1792\tcharset="us-ascii"\r
1793\r
1794<b>rich</b> text\r
1795--my-boundary\r
1796Content-Type: application/octet-stream\r
1797Content-Transfer-Encoding: base64\r
1798Content-Disposition: attachment;\r
1799\tfilename="woot.bin"\r
1800Content-ID: <woot.id@somewhere>\r
1801\r
1802AAECAw==\r
1803--my-boundary--\r
1804\r
1805
1806"#
1807        );
1808    }
1809
1810    #[test]
1811    fn check_conformance_angle_msg_id() {
1812        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
1813Message-ID: <<1234@example.com>>\r
1814\r
1815Hello";
1816        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
1817        k9::snapshot!(
1818            msg.check_fix_conformance(
1819                MessageConformance::MISSING_MESSAGE_ID_HEADER,
1820                MessageConformance::empty(),
1821            )
1822            .unwrap_err(),
1823            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
1824        );
1825
1826        msg.check_fix_conformance(
1827            MessageConformance::MISSING_MESSAGE_ID_HEADER,
1828            MessageConformance::MISSING_MESSAGE_ID_HEADER,
1829        )
1830        .unwrap();
1831
1832        // Can't use a snapshot test here because the fixed header
1833        // has a unique random component
1834        /*
1835                k9::snapshot!(
1836                    data_as_string(&msg),
1837                    r#"
1838        Subject: hello\r
1839        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
1840        \r
1841        Hello
1842        "#
1843                );
1844        */
1845
1846        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
1847Message-ID: <<1234@example.com>>\r
1848\r
1849Hello this is a really long line Hello this is a really long line \
1850Hello this is a really long line Hello this is a really long line \
1851Hello this is a really long line Hello this is a really long line \
1852Hello this is a really long line Hello this is a really long line \
1853Hello this is a really long line Hello this is a really long line \
1854Hello this is a really long line Hello this is a really long line \
1855Hello this is a really long line Hello this is a really long line
1856";
1857        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
1858        msg.check_fix_conformance(
1859            MessageConformance::MISSING_COLON_VALUE,
1860            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
1861        )
1862        .unwrap();
1863
1864        // Can't use a snapshot test here because the fixed header
1865        // has a random component
1866        /*
1867                k9::snapshot!(
1868                    data_as_string(&msg),
1869                    r#"
1870        Content-Type: text/plain;\r
1871        \tcharset="us-ascii"\r
1872        Content-Transfer-Encoding: quoted-printable\r
1873        Subject: hello\r
1874        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
1875        \r
1876        Hello this is a really long line Hello this is a really long line Hello thi=\r
1877        s is a really long line Hello this is a really long line Hello this is a re=\r
1878        ally long line Hello this is a really long line Hello this is a really long=\r
1879         line Hello this is a really long line Hello this is a really long line Hel=\r
1880        lo this is a really long line Hello this is a really long line Hello this i=\r
1881        s a really long line Hello this is a really long line Hello this is a reall=\r
1882        y long line=0A\r
1883
1884        "#
1885                );
1886        */
1887    }
1888
1889    #[test]
1890    fn check_conformance() {
1891        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1892        msg.check_fix_conformance(
1893            MessageConformance::default(),
1894            MessageConformance::MISSING_MIME_VERSION,
1895        )
1896        .unwrap();
1897        k9::snapshot!(
1898            data_as_string(&msg),
1899            r#"
1900X-Hello: there\r
1901X-Header: value\r
1902Subject: Hello\r
1903X-Header: another value\r
1904From :Someone@somewhere\r
1905Mime-Version: 1.0\r
1906\r
1907Body
1908"#
1909        );
1910
1911        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1912        msg.check_fix_conformance(
1913            MessageConformance::default(),
1914            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
1915        )
1916        .unwrap();
1917        k9::snapshot!(
1918            data_as_string(&msg),
1919            r#"
1920Content-Type: text/plain;\r
1921\tcharset="us-ascii"\r
1922X-Hello: there\r
1923X-Header: value\r
1924Subject: Hello\r
1925X-Header: another value\r
1926From: <Someone@somewhere>\r
1927Mime-Version: 1.0\r
1928\r
1929Body\r
1930
1931"#
1932        );
1933    }
1934
1935    #[test]
1936    fn set_scheduling() -> anyhow::Result<()> {
1937        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1938        assert!(msg.get_due().is_none(), "due is implicitly now");
1939
1940        let now = Utc::now();
1941        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
1942
1943        msg.set_scheduling(Some(Scheduling {
1944            restriction: None,
1945            first_attempt: Some((now + one_day).into()),
1946            expires: None,
1947        }))?;
1948
1949        let due = msg.get_due().expect("due to now be set");
1950        assert!(due - now >= one_day, "due time is at least 1 day away");
1951
1952        Ok(())
1953    }
1954
1955    #[cfg(all(test, target_pointer_width = "64"))]
1956    #[test]
1957    fn sizes() {
1958        assert_eq!(std::mem::size_of::<Message>(), 8);
1959        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
1960        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
1961    }
1962}