message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20#[cfg(feature = "impl")]
21use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
22use mailparsing::{
23    CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
24};
25#[cfg(feature = "impl")]
26use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
27#[cfg(feature = "impl")]
28use mod_dns_resolver::get_resolver_instance;
29use parking_lot::Mutex;
30use prometheus::{Histogram, IntGauge};
31use serde::{Deserialize, Serialize};
32use serde_with::formats::PreferOne;
33use serde_with::{serde_as, OneOrMany};
34use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
35use std::hash::Hash;
36use std::sync::{Arc, LazyLock, Weak};
37use std::time::{Duration, Instant};
38use timeq::TimerEntryWithDelay;
39
40bitflags::bitflags! {
41    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
42    struct MessageFlags: u8 {
43        /// true if Metadata needs to be saved
44        const META_DIRTY = 1;
45        /// true if Data needs to be saved
46        const DATA_DIRTY = 2;
47        /// true if scheduling restrictions are present in the metadata
48        const SCHEDULED = 4;
49        /// true if high durability writes should always be used
50        const FORCE_SYNC = 8;
51    }
52}
53
54static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
55    prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
56});
57static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
58    prometheus::register_int_gauge!(
59        "message_meta_resident_count",
60        "total number of Message objects with metadata loaded"
61    )
62    .unwrap()
63});
64static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
65    prometheus::register_int_gauge!(
66        "message_data_resident_count",
67        "total number of Message objects with body data loaded"
68    )
69    .unwrap()
70});
71static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
72static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
73    prometheus::register_histogram!(
74        "message_save_latency",
75        "how long it takes to save a message to spool"
76    )
77    .unwrap()
78});
79static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
80    prometheus::register_histogram!(
81        "message_data_load_latency",
82        "how long it takes to load message data from spool"
83    )
84    .unwrap()
85});
86static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
87    prometheus::register_histogram!(
88        "message_meta_load_latency",
89        "how long it takes to load message metadata from spool"
90    )
91    .unwrap()
92});
93
94#[derive(Debug)]
95struct MessageInner {
96    metadata: Option<Box<MetaData>>,
97    data: Arc<Box<[u8]>>,
98    flags: MessageFlags,
99    num_attempts: u16,
100    due: Option<DateTime<Utc>>,
101}
102
103#[derive(Debug)]
104pub(crate) struct MessageWithId {
105    id: SpoolId,
106    inner: Mutex<MessageInner>,
107    link: LinkedListAtomicLink,
108}
109
110intrusive_adapter!(
111    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
112);
113
114/// A list of messages with an O(1) list overhead; no additional
115/// memory per-message is required to track this list.
116/// However, a given Message can only belong to one instance
117/// of such a list at a time.
118pub struct MessageList {
119    list: LinkedList<MessageWithIdAdapter>,
120    len: usize,
121}
122
123impl Default for MessageList {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129impl MessageList {
130    /// Create a new MessageList
131    pub fn new() -> Self {
132        Self {
133            list: LinkedList::new(MessageWithIdAdapter::default()),
134            len: 0,
135        }
136    }
137
138    /// Returns the number of elements contained in the list
139    pub fn len(&self) -> usize {
140        self.len
141    }
142
143    pub fn is_empty(&self) -> bool {
144        self.len == 0
145    }
146
147    /// Take all of the elements from this list and return them
148    /// in a new separate instance of MessageList.
149    pub fn take(&mut self) -> Self {
150        let new_list = Self {
151            list: self.list.take(),
152            len: self.len,
153        };
154        self.len = 0;
155        new_list
156    }
157
158    /// Push a message to the back of the list
159    pub fn push_back(&mut self, message: Message) {
160        self.list.push_back(message.msg_and_id);
161        self.len += 1;
162    }
163
164    /// Pop a message from the front of the list
165    pub fn pop_front(&mut self) -> Option<Message> {
166        self.list.pop_front().map(|msg_and_id| {
167            self.len -= 1;
168            Message { msg_and_id }
169        })
170    }
171
172    /// Pop a message from the back of the list
173    pub fn pop_back(&mut self) -> Option<Message> {
174        self.list.pop_back().map(|msg_and_id| {
175            self.len -= 1;
176            Message { msg_and_id }
177        })
178    }
179
180    /// Pop all of the messages from this list into a vector
181    /// of messages.
182    /// Memory usage is O(number-of-messages).
183    pub fn drain(&mut self) -> Vec<Message> {
184        let mut messages = Vec::with_capacity(self.len);
185        while let Some(msg) = self.pop_front() {
186            messages.push(msg);
187        }
188        messages
189    }
190
191    pub fn extend_from_iter<I>(&mut self, iter: I)
192    where
193        I: Iterator<Item = Message>,
194    {
195        for msg in iter {
196            self.push_back(msg)
197        }
198    }
199}
200
201impl IntoIterator for MessageList {
202    type Item = Message;
203    type IntoIter = MessageListIter;
204    fn into_iter(self) -> MessageListIter {
205        MessageListIter { list: self.list }
206    }
207}
208
209pub struct MessageListIter {
210    list: LinkedList<MessageWithIdAdapter>,
211}
212
213impl Iterator for MessageListIter {
214    type Item = Message;
215    fn next(&mut self) -> Option<Message> {
216        self.list
217            .pop_front()
218            .map(|msg_and_id| Message { msg_and_id })
219    }
220}
221
222#[derive(Clone, Debug)]
223#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
224pub struct Message {
225    pub(crate) msg_and_id: Arc<MessageWithId>,
226}
227
228impl PartialEq for Message {
229    fn eq(&self, other: &Self) -> bool {
230        self.id() == other.id()
231    }
232}
233impl Eq for Message {}
234
235impl Hash for Message {
236    fn hash<H>(&self, hasher: &mut H)
237    where
238        H: std::hash::Hasher,
239    {
240        self.id().hash(hasher)
241    }
242}
243
244#[derive(Clone, Debug)]
245pub struct WeakMessage {
246    weak: Weak<MessageWithId>,
247}
248
249impl WeakMessage {
250    pub fn upgrade(&self) -> Option<Message> {
251        Some(Message {
252            msg_and_id: self.weak.upgrade()?,
253        })
254    }
255}
256
257#[serde_as]
258#[derive(Debug, Serialize, Deserialize, Clone)]
259pub(crate) struct MetaData {
260    pub sender: EnvelopeAddress,
261    #[serde_as(as = "OneOrMany<_, PreferOne>")]
262    pub recipient: Vec<EnvelopeAddress>,
263    pub meta: serde_json::Value,
264    #[serde(default)]
265    pub schedule: Option<Scheduling>,
266}
267
268impl Drop for MessageInner {
269    fn drop(&mut self) {
270        if self.metadata.is_some() {
271            META_COUNT.dec();
272        }
273        if !self.data.is_empty() {
274            DATA_COUNT.dec();
275        }
276        MESSAGE_COUNT.dec();
277    }
278}
279
280impl Message {
281    /// Create a new message with the supplied data.
282    /// The message meta and data are marked as dirty
283    pub fn new_dirty(
284        id: SpoolId,
285        sender: EnvelopeAddress,
286        recipient: Vec<EnvelopeAddress>,
287        meta: serde_json::Value,
288        data: Arc<Box<[u8]>>,
289    ) -> anyhow::Result<Self> {
290        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
291        MESSAGE_COUNT.inc();
292        DATA_COUNT.inc();
293        META_COUNT.inc();
294        Ok(Self {
295            msg_and_id: Arc::new(MessageWithId {
296                id,
297                inner: Mutex::new(MessageInner {
298                    metadata: Some(Box::new(MetaData {
299                        sender,
300                        recipient,
301                        meta,
302                        schedule: None,
303                    })),
304                    data,
305                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
306                    num_attempts: 0,
307                    due: None,
308                }),
309                link: LinkedListAtomicLink::default(),
310            }),
311        })
312    }
313
314    pub fn weak(&self) -> WeakMessage {
315        WeakMessage {
316            weak: Arc::downgrade(&self.msg_and_id),
317        }
318    }
319
320    /// Helper for creating a message based on spool enumeration.
321    /// Given a spool id and the serialized metadata blob, returns
322    /// a message holding the deserialized version of that metadata.
323    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
324        let metadata: MetaData = serde_json::from_slice(&metadata)?;
325        MESSAGE_COUNT.inc();
326        META_COUNT.inc();
327
328        let flags = if metadata.schedule.is_some() {
329            MessageFlags::SCHEDULED
330        } else {
331            MessageFlags::empty()
332        };
333
334        Ok(Self {
335            msg_and_id: Arc::new(MessageWithId {
336                id,
337                inner: Mutex::new(MessageInner {
338                    metadata: Some(Box::new(metadata)),
339                    data: NO_DATA.clone(),
340                    flags,
341                    num_attempts: 0,
342                    due: None,
343                }),
344                link: LinkedListAtomicLink::default(),
345            }),
346        })
347    }
348
349    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
350        MESSAGE_COUNT.inc();
351        META_COUNT.inc();
352
353        let flags = if metadata.schedule.is_some() {
354            MessageFlags::SCHEDULED
355        } else {
356            MessageFlags::empty()
357        };
358
359        Self {
360            msg_and_id: Arc::new(MessageWithId {
361                id,
362                inner: Mutex::new(MessageInner {
363                    metadata: Some(Box::new(metadata)),
364                    data,
365                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
366                    num_attempts: 0,
367                    due: None,
368                }),
369                link: LinkedListAtomicLink::default(),
370            }),
371        }
372    }
373
374    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
375        let meta_spool = get_meta_spool();
376        let data = meta_spool.load(id).await?;
377        Self::new_from_spool(id, data)
378    }
379
380    pub fn get_num_attempts(&self) -> u16 {
381        let inner = self.msg_and_id.inner.lock();
382        inner.num_attempts
383    }
384
385    pub fn set_num_attempts(&self, num_attempts: u16) {
386        let mut inner = self.msg_and_id.inner.lock();
387        inner.num_attempts = num_attempts;
388    }
389
390    pub fn increment_num_attempts(&self) {
391        let mut inner = self.msg_and_id.inner.lock();
392        inner.num_attempts += 1;
393    }
394
395    pub async fn set_scheduling(
396        &self,
397        scheduling: Option<Scheduling>,
398    ) -> anyhow::Result<Option<Scheduling>> {
399        self.load_meta_if_needed().await?;
400        let mut inner = self.msg_and_id.inner.lock();
401        match &mut inner.metadata {
402            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
403            Some(meta) => {
404                meta.schedule = scheduling;
405                inner
406                    .flags
407                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
408                if let Some(sched) = scheduling {
409                    let due = inner.due.unwrap_or_else(Utc::now);
410                    inner.due = Some(sched.adjust_for_schedule(due));
411                }
412                Ok(scheduling)
413            }
414        }
415    }
416
417    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
418        self.load_meta_if_needed().await?;
419        let inner = self.msg_and_id.inner.lock();
420        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
421    }
422
423    pub fn get_due(&self) -> Option<DateTime<Utc>> {
424        let inner = self.msg_and_id.inner.lock();
425        inner.due
426    }
427
428    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
429        let scale = rand::random::<f32>();
430        let value = (scale * limit as f32) as i64;
431        self.delay_by(seconds(value)?).await
432    }
433
434    pub async fn delay_by(
435        &self,
436        duration: chrono::Duration,
437    ) -> anyhow::Result<Option<DateTime<Utc>>> {
438        let due = Utc::now() + duration;
439        self.set_due(Some(due)).await
440    }
441
442    /// Delay by requested duration, and add up to 1 minute of jitter
443    pub async fn delay_by_and_jitter(
444        &self,
445        duration: chrono::Duration,
446    ) -> anyhow::Result<Option<DateTime<Utc>>> {
447        let scale = rand::random::<f32>();
448        let value = (scale * 60.) as i64;
449        let due = Utc::now() + duration + seconds(value)?;
450        self.set_due(Some(due)).await
451    }
452
453    pub async fn set_due(
454        &self,
455        due: Option<DateTime<Utc>>,
456    ) -> anyhow::Result<Option<DateTime<Utc>>> {
457        let due = {
458            let mut inner = self.msg_and_id.inner.lock();
459
460            if !inner.flags.contains(MessageFlags::SCHEDULED) {
461                // This is the simple, fast-path, common case
462                inner.due = due;
463                return Ok(inner.due);
464            }
465
466            let due = due.unwrap_or_else(Utc::now);
467
468            if let Some(meta) = &inner.metadata {
469                inner.due = match &meta.schedule {
470                    Some(sched) => Some(sched.adjust_for_schedule(due)),
471                    None => Some(due),
472                };
473                return Ok(inner.due);
474            }
475
476            // We'll need to load the metadata to correctly
477            // update the schedule for this message
478            due
479        };
480
481        self.load_meta().await?;
482
483        {
484            let mut inner = self.msg_and_id.inner.lock();
485            match &inner.metadata {
486                Some(meta) => {
487                    inner.due = match &meta.schedule {
488                        Some(sched) => Some(sched.adjust_for_schedule(due)),
489                        None => Some(due),
490                    };
491                    Ok(inner.due)
492                }
493                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
494            }
495        }
496    }
497
498    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
499        let inner = self.msg_and_id.inner.lock();
500        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
501            Some(Arc::clone(&inner.data))
502        } else {
503            None
504        }
505    }
506
507    fn get_meta_if_dirty(&self) -> Option<MetaData> {
508        let inner = self.msg_and_id.inner.lock();
509        if inner.flags.contains(MessageFlags::META_DIRTY) {
510            inner.metadata.as_ref().map(|md| (**md).clone())
511        } else {
512            None
513        }
514    }
515
516    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
517        self.load_meta_if_needed().await?;
518        let inner = self.msg_and_id.inner.lock();
519        inner
520            .metadata
521            .as_ref()
522            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
523            .map(|md| (**md).clone())
524    }
525
526    pub fn set_force_sync(&self, force: bool) {
527        let mut inner = self.msg_and_id.inner.lock();
528        inner.flags.set(MessageFlags::FORCE_SYNC, force);
529    }
530
531    pub fn needs_save(&self) -> bool {
532        let inner = self.msg_and_id.inner.lock();
533        inner.flags.contains(MessageFlags::META_DIRTY)
534            || inner.flags.contains(MessageFlags::DATA_DIRTY)
535    }
536
537    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
538        let _timer = SAVE_HIST.start_timer();
539        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
540            .await
541    }
542
543    pub async fn save_to(
544        &self,
545        meta_spool: &(dyn Spool + Send + Sync),
546        data_spool: &(dyn Spool + Send + Sync),
547        deadline: Option<Instant>,
548    ) -> anyhow::Result<()> {
549        let force_sync = self
550            .msg_and_id
551            .inner
552            .lock()
553            .flags
554            .contains(MessageFlags::FORCE_SYNC);
555
556        let data_fut = if let Some(data) = self.get_data_if_dirty() {
557            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
558            data_spool
559                .store(self.msg_and_id.id, data, force_sync, deadline)
560                .map(|_| true)
561                .boxed()
562        } else {
563            futures::future::ready(false).boxed()
564        };
565        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
566            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
567            meta_spool
568                .store(self.msg_and_id.id, meta, force_sync, deadline)
569                .map(|_| true)
570                .boxed()
571        } else {
572            futures::future::ready(false).boxed()
573        };
574
575        // NOTE: if we have a deadline, it is tempting to want to use
576        // timeout_at here to enforce it, but the underlying spool
577        // futures are not guaranteed to be fully cancel safe, which
578        // is why we pass the deadline down to the save method to allow
579        // them to handle timeouts internally.
580        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
581
582        if data_res {
583            self.msg_and_id
584                .inner
585                .lock()
586                .flags
587                .remove(MessageFlags::DATA_DIRTY);
588        }
589        if meta_res {
590            self.msg_and_id
591                .inner
592                .lock()
593                .flags
594                .remove(MessageFlags::META_DIRTY);
595        }
596        Ok(())
597    }
598
599    pub fn id(&self) -> &SpoolId {
600        &self.msg_and_id.id
601    }
602
603    /// Save the data+meta if needed, then release both
604    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
605        self.save(None).await?;
606        self.shrink()
607    }
608
609    /// Save the data+meta if needed, then release just the data
610    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
611        self.save(None).await?;
612        self.shrink_data()
613    }
614
615    pub fn shrink_data(&self) -> anyhow::Result<bool> {
616        let mut inner = self.msg_and_id.inner.lock();
617        let mut did_shrink = false;
618        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
619            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
620        }
621        if !inner.data.is_empty() {
622            DATA_COUNT.dec();
623            did_shrink = true;
624        }
625        if !inner.data.is_empty() {
626            inner.data = NO_DATA.clone();
627            did_shrink = true;
628        }
629        Ok(did_shrink)
630    }
631
632    pub fn shrink(&self) -> anyhow::Result<bool> {
633        let mut inner = self.msg_and_id.inner.lock();
634        let mut did_shrink = false;
635        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
636            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
637        }
638        if inner.flags.contains(MessageFlags::META_DIRTY) {
639            anyhow::bail!("Cannot shrink message: META_DIRTY");
640        }
641        if inner.metadata.take().is_some() {
642            META_COUNT.dec();
643            did_shrink = true;
644        }
645        if !inner.data.is_empty() {
646            DATA_COUNT.dec();
647            did_shrink = true;
648        }
649        if !inner.data.is_empty() {
650            inner.data = NO_DATA.clone();
651            did_shrink = true;
652        }
653        Ok(did_shrink)
654    }
655
656    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
657        self.load_meta_if_needed().await?;
658        let inner = self.msg_and_id.inner.lock();
659        match &inner.metadata {
660            Some(meta) => Ok(meta.sender.clone()),
661            None => anyhow::bail!("Message::sender metadata is not loaded"),
662        }
663    }
664
665    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
666        self.load_meta_if_needed().await?;
667        let mut inner = self.msg_and_id.inner.lock();
668        match &mut inner.metadata {
669            Some(meta) => {
670                meta.sender = sender;
671                inner.flags.set(MessageFlags::DATA_DIRTY, true);
672                Ok(())
673            }
674            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
675        }
676    }
677
678    #[deprecated = "use recipient_list or first_recipient instead"]
679    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
680        self.first_recipient().await
681    }
682
683    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
684        self.load_meta_if_needed().await?;
685        let inner = self.msg_and_id.inner.lock();
686        match &inner.metadata {
687            Some(meta) => match meta.recipient.first() {
688                Some(recip) => Ok(recip.clone()),
689                None => anyhow::bail!("recipient list is empty!?"),
690            },
691            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
692        }
693    }
694
695    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
696        self.load_meta_if_needed().await?;
697        let inner = self.msg_and_id.inner.lock();
698        match &inner.metadata {
699            Some(meta) => Ok(meta.recipient.clone()),
700            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
701        }
702    }
703
704    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
705        self.load_meta_if_needed().await?;
706        let inner = self.msg_and_id.inner.lock();
707        match &inner.metadata {
708            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
709            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
710        }
711    }
712
713    #[deprecated = "use set_recipient_list instead"]
714    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
715        self.set_recipient_list(vec![recipient]).await
716    }
717
718    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
719        self.load_meta_if_needed().await?;
720
721        let mut inner = self.msg_and_id.inner.lock();
722        match &mut inner.metadata {
723            Some(meta) => {
724                meta.recipient = recipient;
725                inner.flags.set(MessageFlags::DATA_DIRTY, true);
726                Ok(())
727            }
728            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
729        }
730    }
731
732    pub fn is_meta_loaded(&self) -> bool {
733        self.msg_and_id.inner.lock().metadata.is_some()
734    }
735
736    pub fn is_data_loaded(&self) -> bool {
737        !self.msg_and_id.inner.lock().data.is_empty()
738    }
739
740    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
741        if self.is_meta_loaded() {
742            return Ok(());
743        }
744        self.load_meta().await
745    }
746
747    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
748        self.load_data_if_needed().await
749    }
750
751    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
752        if self.is_data_loaded() {
753            return Ok(self.get_data_maybe_not_loaded());
754        }
755        self.load_data().await
756    }
757
758    pub async fn load_meta(&self) -> anyhow::Result<()> {
759        let _timer = LOAD_META_HIST.start_timer();
760        self.load_meta_from(&**get_meta_spool()).await
761    }
762
763    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
764        let id = self.id();
765        let data = meta_spool.load(*id).await?;
766        let mut inner = self.msg_and_id.inner.lock();
767        let was_not_loaded = inner.metadata.is_none();
768        let metadata: MetaData = serde_json::from_slice(&data)?;
769        inner.metadata.replace(Box::new(metadata));
770        if was_not_loaded {
771            META_COUNT.inc();
772        }
773        Ok(())
774    }
775
776    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
777        let _timer = LOAD_DATA_HIST.start_timer();
778        self.load_data_from(&**get_data_spool()).await
779    }
780
781    async fn load_data_from(
782        &self,
783        data_spool: &(dyn Spool + Send + Sync),
784    ) -> anyhow::Result<Arc<Box<[u8]>>> {
785        let data = data_spool.load(*self.id()).await?;
786        let mut inner = self.msg_and_id.inner.lock();
787        let was_empty = inner.data.is_empty();
788        inner.data = Arc::new(data.into_boxed_slice());
789        if was_empty {
790            DATA_COUNT.inc();
791        }
792        Ok(inner.data.clone())
793    }
794
795    pub fn assign_data(&self, data: Vec<u8>) {
796        let mut inner = self.msg_and_id.inner.lock();
797        let was_empty = inner.data.is_empty();
798        inner.data = Arc::new(data.into_boxed_slice());
799        inner.flags.set(MessageFlags::DATA_DIRTY, true);
800        if was_empty {
801            DATA_COUNT.inc();
802        }
803    }
804
805    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
806        let inner = self.msg_and_id.inner.lock();
807        inner.data.clone()
808    }
809
810    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
811        &self,
812        key: S,
813        value: V,
814    ) -> anyhow::Result<()> {
815        self.load_meta_if_needed().await?;
816        let mut inner = self.msg_and_id.inner.lock();
817        match &mut inner.metadata {
818            None => anyhow::bail!("set_meta: metadata must be loaded first"),
819            Some(meta) => {
820                let key = key.as_ref();
821                let value = value.into();
822
823                match &mut meta.meta {
824                    serde_json::Value::Object(map) => {
825                        map.insert(key.to_string(), value);
826                    }
827                    _ => anyhow::bail!("metadata is somehow not a json object"),
828                }
829
830                inner.flags.set(MessageFlags::META_DIRTY, true);
831                Ok(())
832            }
833        }
834    }
835
836    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
837        self.load_meta_if_needed().await?;
838        let mut inner = self.msg_and_id.inner.lock();
839        match &mut inner.metadata {
840            None => anyhow::bail!("set_meta: metadata must be loaded first"),
841            Some(meta) => {
842                let key = key.as_ref();
843
844                match &mut meta.meta {
845                    serde_json::Value::Object(map) => {
846                        map.remove(key);
847                    }
848                    _ => anyhow::bail!("metadata is somehow not a json object"),
849                }
850
851                inner.flags.set(MessageFlags::META_DIRTY, true);
852                Ok(())
853            }
854        }
855    }
856
857    /// Retrieve `key` as a String.
858    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
859        &self,
860        key: S,
861    ) -> anyhow::Result<Option<String>> {
862        match self.get_meta(key).await {
863            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
864            Ok(serde_json::Value::Null) => Ok(None),
865            hmm => {
866                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
867            }
868        }
869    }
870
871    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
872        self.load_meta_if_needed().await?;
873        let inner = self.msg_and_id.inner.lock();
874        match &inner.metadata {
875            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
876            Some(meta) => Ok(meta.meta.clone()),
877        }
878    }
879
880    pub async fn get_meta<S: serde_json::value::Index>(
881        &self,
882        key: S,
883    ) -> anyhow::Result<serde_json::Value> {
884        self.load_meta_if_needed().await?;
885        let inner = self.msg_and_id.inner.lock();
886        match &inner.metadata {
887            None => anyhow::bail!("get_meta: metadata must be loaded first"),
888            Some(meta) => match meta.meta.get(key) {
889                Some(value) => Ok(value.clone()),
890                None => Ok(serde_json::Value::Null),
891            },
892        }
893    }
894
895    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
896        self.msg_and_id.id.age(now)
897    }
898
899    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
900        Ok(match self.get_meta_string("queue").await? {
901            Some(name) => name,
902            None => {
903                let name = QueueNameComponents::format(
904                    self.get_meta_string("campaign").await?,
905                    self.get_meta_string("tenant").await?,
906                    self.first_recipient()
907                        .await?
908                        .domain()
909                        .to_string()
910                        .to_lowercase(),
911                    self.get_meta_string("routing_domain").await?,
912                );
913                name.to_string()
914            }
915        })
916    }
917
918    #[cfg(feature = "impl")]
919    pub async fn arc_verify(
920        &self,
921        opt_resolver_name: Option<String>,
922    ) -> anyhow::Result<AuthenticationResult> {
923        let resolver = get_resolver_instance(&opt_resolver_name)?;
924        let data = self.data().await?;
925        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
926
927        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
928        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
929
930        let arc = ARC::verify(&message, &**resolver).await;
931        Ok(arc.authentication_result())
932    }
933
934    #[cfg(feature = "impl")]
935    pub async fn arc_seal(
936        &self,
937        signer: Signer,
938        auth_results: AuthenticationResults,
939        opt_resolver_name: Option<String>,
940    ) -> anyhow::Result<()> {
941        let resolver = get_resolver_instance(&opt_resolver_name)?;
942        let data = self.data().await?;
943        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
944        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
945        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
946        let arc = ARC::verify(&message, &**resolver).await;
947
948        let headers = arc.seal(&message, auth_results, signer.signer())?;
949        if !headers.is_empty() {
950            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
951
952            for hdr in headers {
953                hdr.write_header(&mut new_data).ok();
954            }
955            new_data.extend_from_slice(&data);
956            self.assign_data(new_data);
957        }
958
959        Ok(())
960    }
961
962    #[cfg(feature = "impl")]
963    pub async fn dkim_verify(
964        &self,
965        opt_resolver_name: Option<String>,
966    ) -> anyhow::Result<Vec<AuthenticationResult>> {
967        let resolver = get_resolver_instance(&opt_resolver_name)?;
968        let data = self.data().await?;
969        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
970
971        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
972        if parsed
973            .overall_conformance
974            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
975        {
976            return Ok(vec![AuthenticationResult {
977                method: "dkim".to_string(),
978                method_version: None,
979                result: "permerror".to_string(),
980                reason: Some("message has non-canonical line endings".to_string()),
981                props: Default::default(),
982            }]);
983        }
984        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
985
986        let from = match message.get_headers().from() {
987            Ok(Some(from)) if from.0.len() == 1 => from.0,
988            Ok(Some(from)) => {
989                return Ok(vec![AuthenticationResult {
990                    method: "dkim".to_string(),
991                    method_version: None,
992                    result: "permerror".to_string(),
993                    reason: Some(format!(
994                        "From header must have a single sender, found {}",
995                        from.len()
996                    )),
997                    props: Default::default(),
998                }]);
999            }
1000            Ok(None) => {
1001                return Ok(vec![AuthenticationResult {
1002                    method: "dkim".to_string(),
1003                    method_version: None,
1004                    result: "permerror".to_string(),
1005                    reason: Some("From header is missing".to_string()),
1006                    props: Default::default(),
1007                }]);
1008            }
1009            Err(err) => {
1010                return Ok(vec![AuthenticationResult {
1011                    method: "dkim".to_string(),
1012                    method_version: None,
1013                    result: "permerror".to_string(),
1014                    reason: Some(format!("From header is invalid: {err:#}")),
1015                    props: Default::default(),
1016                }]);
1017            }
1018        };
1019        let from_domain = &from[0].address.domain;
1020
1021        let results =
1022            kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1023        Ok(results)
1024    }
1025
1026    /// Parses the content into an owned MimePart.
1027    /// Changes to that MimePart are NOT reflected in the underlying
1028    /// message; you must re-assign the message data if you wish to modify
1029    /// the message content.
1030    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1031        let data = self.data().await?;
1032        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1033        Ok(MimePart::parse(owned_data)?)
1034    }
1035
1036    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1037        let data = self.data().await?;
1038        Report::parse(&data)
1039    }
1040
1041    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1042        let data = self.data().await?;
1043        ARFReport::parse(&data)
1044    }
1045
1046    pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1047        let data = self.data().await?;
1048        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1049        emit_header(&mut new_data, name, value);
1050        new_data.extend_from_slice(&data);
1051        self.assign_data(new_data);
1052        Ok(())
1053    }
1054
1055    pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1056        let data = self.data().await?;
1057        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1058        for (idx, window) in data.windows(4).enumerate() {
1059            if window == b"\r\n\r\n" {
1060                let headers = &data[0..idx + 2];
1061                let body = &data[idx + 2..];
1062
1063                new_data.extend_from_slice(headers);
1064                emit_header(&mut new_data, name, value);
1065                new_data.extend_from_slice(body);
1066                self.assign_data(new_data);
1067                return Ok(());
1068            }
1069        }
1070
1071        anyhow::bail!("append_header could not find the end of the header block");
1072    }
1073
1074    pub async fn get_address_header(
1075        &self,
1076        header_name: &str,
1077    ) -> anyhow::Result<Option<HeaderAddressList>> {
1078        let data = self.data().await?;
1079        let HeaderParseResult { headers, .. } =
1080            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1081
1082        match headers.get_first(header_name) {
1083            Some(hdr) => {
1084                let list = hdr.as_address_list()?;
1085                let result: HeaderAddressList = list.into();
1086                Ok(Some(result))
1087            }
1088            None => Ok(None),
1089        }
1090    }
1091
1092    pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1093        let data = self.data().await?;
1094        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1095
1096        match headers.get_first(name) {
1097            Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1098            None => Ok(None),
1099        }
1100    }
1101
1102    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1103        let data = self.data().await?;
1104        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1105
1106        let mut values = vec![];
1107        for hdr in headers.iter_named(name) {
1108            values.push(hdr.as_unstructured()?);
1109        }
1110        Ok(values)
1111    }
1112
1113    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1114        let data = self.data().await?;
1115        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1116
1117        let mut values = vec![];
1118        for hdr in headers.iter() {
1119            values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1120        }
1121        Ok(values)
1122    }
1123
1124    pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1125        &self,
1126        mut func: F,
1127    ) -> anyhow::Result<()> {
1128        let data = self.data().await?;
1129        let mut new_data = Vec::with_capacity(data.len());
1130        let HeaderParseResult {
1131            headers,
1132            body_offset,
1133            ..
1134        } = Header::parse_headers(data.as_ref().as_ref())?;
1135        for hdr in headers.iter() {
1136            let retain = (func)(hdr);
1137            if !retain {
1138                continue;
1139            }
1140            hdr.write_header(&mut new_data)?;
1141        }
1142        new_data.extend_from_slice(b"\r\n");
1143        new_data.extend_from_slice(&data[body_offset..]);
1144        self.assign_data(new_data);
1145        Ok(())
1146    }
1147
1148    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1149        let mut removed = false;
1150        self.retain_headers(|hdr| {
1151            if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1152                removed = true;
1153                false
1154            } else {
1155                true
1156            }
1157        })
1158        .await
1159    }
1160
1161    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1162        let data = self.data().await?;
1163        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1164
1165        for hdr in headers.iter() {
1166            let do_import = if names.is_empty() {
1167                is_x_header(hdr.get_name())
1168            } else {
1169                is_header_in_names_list(hdr.get_name(), &names)
1170            };
1171            if do_import {
1172                let name = imported_header_name(hdr.get_name());
1173                self.set_meta(name, hdr.as_unstructured()?).await?;
1174            }
1175        }
1176
1177        Ok(())
1178    }
1179
1180    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1181        self.retain_headers(|hdr| {
1182            if names.is_empty() {
1183                !is_x_header(hdr.get_name())
1184            } else {
1185                !is_header_in_names_list(hdr.get_name(), &names)
1186            }
1187        })
1188        .await
1189    }
1190
1191    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1192        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1193            .await
1194    }
1195
1196    #[cfg(feature = "impl")]
1197    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1198        let data = self.data().await?;
1199        let header = if let Some(runtime) = SIGN_POOL.get() {
1200            runtime.spawn_blocking(move || signer.sign(&data)).await??
1201        } else {
1202            signer.sign(&data)?
1203        };
1204        self.prepend_header(None, &header).await?;
1205        Ok(())
1206    }
1207
1208    pub async fn import_scheduling_header(
1209        &self,
1210        header_name: &str,
1211        remove: bool,
1212    ) -> anyhow::Result<Option<Scheduling>> {
1213        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1214            let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1215                format!("{value} from header {header_name} is not a valid Scheduling header")
1216            })?;
1217            let result = self.set_scheduling(Some(sched)).await?;
1218
1219            if remove {
1220                self.remove_all_named_headers(header_name).await?;
1221            }
1222
1223            Ok(result)
1224        } else {
1225            Ok(None)
1226        }
1227    }
1228
1229    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1230        let data = self.data().await?;
1231        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1232        let parts = msg.simplified_structure_pointers()?;
1233        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1234            match p.body()? {
1235                DecodedBody::Text(text) => {
1236                    let mut text = text.as_str().to_string();
1237                    text.push_str("\r\n");
1238                    text.push_str(content);
1239                    p.replace_text_body("text/plain", &text)?;
1240
1241                    let new_data = msg.to_message_string();
1242                    self.assign_data(new_data.into_bytes());
1243                    Ok(true)
1244                }
1245                DecodedBody::Binary(_) => {
1246                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1247                }
1248            }
1249        } else {
1250            Ok(false)
1251        }
1252    }
1253
1254    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1255        let data = self.data().await?;
1256        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1257        let parts = msg.simplified_structure_pointers()?;
1258        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1259            match p.body()? {
1260                DecodedBody::Text(text) => {
1261                    let mut text = text.as_str().to_string();
1262
1263                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1264                        Some(idx) => {
1265                            text.insert_str(idx, content);
1266                            text.insert_str(idx, "\r\n");
1267                        }
1268                        None => {
1269                            // Just append
1270                            text.push_str("\r\n");
1271                            text.push_str(content);
1272                        }
1273                    }
1274
1275                    p.replace_text_body("text/html", &text)?;
1276
1277                    let new_data = msg.to_message_string();
1278                    self.assign_data(new_data.into_bytes());
1279                    Ok(true)
1280                }
1281                DecodedBody::Binary(_) => {
1282                    anyhow::bail!("expected text/html part to be text, but it is binary");
1283                }
1284            }
1285        } else {
1286            Ok(false)
1287        }
1288    }
1289
1290    pub async fn check_fix_conformance(
1291        &self,
1292        check: MessageConformance,
1293        fix: MessageConformance,
1294        settings: Option<&CheckFixSettings>,
1295    ) -> anyhow::Result<()> {
1296        let data = self.data().await?;
1297        let data_bytes = data.as_ref().as_ref();
1298        let msg = MimePart::parse(data_bytes)?;
1299
1300        let mut settings = settings.map(Clone::clone).unwrap_or_default();
1301
1302        if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1303            && settings.message_id.is_none()
1304            && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1305        {
1306            let sender = self.sender().await?;
1307            let domain = sender.domain();
1308            let id = *self.id();
1309            settings.message_id.replace(format!("{id}@{domain}"));
1310        }
1311
1312        if settings.detect_encoding {
1313            settings.data_bytes.replace(data.clone());
1314        }
1315
1316        let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1317
1318        if let Some(msg) = opt_msg {
1319            let new_data = msg.to_message_string();
1320            self.assign_data(new_data.into_bytes());
1321        }
1322
1323        Ok(())
1324    }
1325}
1326
1327fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1328    for name in names {
1329        if hdr_name.eq_ignore_ascii_case(name) {
1330            return true;
1331        }
1332    }
1333    false
1334}
1335
1336fn imported_header_name(name: &str) -> String {
1337    name.chars()
1338        .map(|c| match c.to_ascii_lowercase() {
1339            '-' => '_',
1340            c => c,
1341        })
1342        .collect()
1343}
1344
1345fn is_x_header(name: &str) -> bool {
1346    name.starts_with("X-") || name.starts_with("x-")
1347}
1348
1349fn size_header(name: Option<&str>, value: &str) -> usize {
1350    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1351}
1352
1353fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1354    if let Some(name) = name {
1355        dest.extend_from_slice(name.as_bytes());
1356        dest.extend_from_slice(b": ");
1357    }
1358    dest.extend_from_slice(value.as_bytes());
1359    if !value.ends_with("\r\n") {
1360        dest.extend_from_slice(b"\r\n");
1361    }
1362}
1363
1364#[cfg(feature = "impl")]
1365impl UserData for Message {
1366    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1367        methods.add_async_method(
1368            "set_meta",
1369            move |_, this, (name, value): (String, mlua::Value)| async move {
1370                let value = serde_json::value::to_value(value).map_err(any_err)?;
1371                this.set_meta(name, value).await.map_err(any_err)?;
1372                Ok(())
1373            },
1374        );
1375        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1376            let value = this.get_meta(name).await.map_err(any_err)?;
1377            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1378        });
1379        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1380            let data = this.data().await.map_err(any_err)?;
1381            lua.create_string(&*data)
1382        });
1383        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1384            this.assign_data(data.as_bytes().to_vec());
1385            Ok(())
1386        });
1387
1388        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1389            let data = this.data().await.map_err(any_err)?;
1390            let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1391            let part = MimePart::parse(owned_data).map_err(any_err)?;
1392            Ok(mod_mimepart::PartRef::new(part))
1393        });
1394
1395        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1396            this.append_text_plain(&data).await.map_err(any_err)
1397        });
1398
1399        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1400            this.append_text_html(&data).await.map_err(any_err)
1401        });
1402
1403        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1404        methods.add_async_method("sender", move |_, this, _: ()| async move {
1405            this.sender().await.map_err(any_err)
1406        });
1407
1408        methods.add_method("num_attempts", move |_, this, _: ()| {
1409            Ok(this.get_num_attempts())
1410        });
1411
1412        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1413            this.get_queue_name().await.map_err(any_err)
1414        });
1415
1416        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1417            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1418            let revised_due = this.set_due(due).await.map_err(any_err)?;
1419            lua.to_value(&revised_due)
1420        });
1421
1422        methods.add_async_method(
1423            "set_sender",
1424            move |lua, this, value: mlua::Value| async move {
1425                let sender = match value {
1426                    mlua::Value::String(s) => {
1427                        let s = s.to_str()?;
1428                        EnvelopeAddress::parse(&s).map_err(any_err)?
1429                    }
1430                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1431                };
1432                this.set_sender(sender).await.map_err(any_err)
1433            },
1434        );
1435
1436        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1437            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1438            match recipients.len() {
1439                0 => Ok(mlua::Value::Nil),
1440                1 => {
1441                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1442                    recip.into_lua(&lua)
1443                }
1444                _ => recipients.into_lua(&lua),
1445            }
1446        });
1447
1448        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1449            let recipients = this.recipient_list().await.map_err(any_err)?;
1450            recipients.into_lua(&lua)
1451        });
1452
1453        methods.add_async_method(
1454            "set_recipient",
1455            move |lua, this, value: mlua::Value| async move {
1456                let recipients = match value {
1457                    mlua::Value::String(s) => {
1458                        let s = s.to_str()?;
1459                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1460                    }
1461                    _ => {
1462                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1463                            recips
1464                        } else {
1465                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1466                        }
1467                    }
1468                };
1469                this.set_recipient_list(recipients).await.map_err(any_err)
1470            },
1471        );
1472
1473        #[cfg(feature = "impl")]
1474        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1475            this.dkim_sign(signer).await.map_err(any_err)
1476        });
1477
1478        methods.add_async_method("shrink", |_, this, _: ()| async move {
1479            if this.needs_save() {
1480                this.save(None).await.map_err(any_err)?;
1481            }
1482            this.shrink().map_err(any_err)
1483        });
1484
1485        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1486            if this.needs_save() {
1487                this.save(None).await.map_err(any_err)?;
1488            }
1489            this.shrink_data().map_err(any_err)
1490        });
1491
1492        methods.add_async_method(
1493            "add_authentication_results",
1494            |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1495                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1496                let results = AuthenticationResults {
1497                    serv_id,
1498                    version: None,
1499                    results,
1500                };
1501
1502                this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1503                    .await
1504                    .map_err(any_err)?;
1505
1506                Ok(())
1507            },
1508        );
1509
1510        #[cfg(feature = "impl")]
1511        methods.add_async_method(
1512            "arc_verify",
1513            |lua, this, opt_resolver_name: Option<String>| async move {
1514                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1515                lua.to_value_with(&results, serialize_options())
1516            },
1517        );
1518
1519        #[cfg(feature = "impl")]
1520        methods.add_async_method(
1521            "arc_seal",
1522            |lua,
1523             this,
1524             (signer, serv_id, auth_res, opt_resolver_name): (
1525                Signer,
1526                String,
1527                mlua::Value,
1528                Option<String>,
1529            )| async move {
1530                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1531                this.arc_seal(
1532                    signer,
1533                    AuthenticationResults {
1534                        serv_id,
1535                        version: None,
1536                        results,
1537                    },
1538                    opt_resolver_name,
1539                )
1540                .await
1541                .map_err(any_err)
1542            },
1543        );
1544
1545        #[cfg(feature = "impl")]
1546        methods.add_async_method(
1547            "dkim_verify",
1548            |lua, this, opt_resolver_name: Option<String>| async move {
1549                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1550                lua.to_value_with(&results, serialize_options())
1551            },
1552        );
1553
1554        methods.add_async_method(
1555            "prepend_header",
1556            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1557                let encode = encode.unwrap_or(false);
1558                if encode {
1559                    let header = Header::new_unstructured(name.clone(), value);
1560                    this.prepend_header(Some(&name), header.get_raw_value())
1561                        .await
1562                        .map_err(any_err)?;
1563                } else {
1564                    this.prepend_header(Some(&name), &value)
1565                        .await
1566                        .map_err(any_err)?;
1567                }
1568                Ok(())
1569            },
1570        );
1571        methods.add_async_method(
1572            "append_header",
1573            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1574                let encode = encode.unwrap_or(false);
1575                if encode {
1576                    let header = Header::new_unstructured(name.clone(), value);
1577                    this.append_header(Some(&name), header.get_raw_value())
1578                        .await
1579                        .map_err(any_err)?;
1580                } else {
1581                    this.append_header(Some(&name), &value)
1582                        .await
1583                        .map_err(any_err)?;
1584                }
1585                Ok(())
1586            },
1587        );
1588        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1589            this.get_address_header(&name).await.map_err(any_err)
1590        });
1591        methods.add_async_method("from_header", |_, this, ()| async move {
1592            this.get_address_header("From").await.map_err(any_err)
1593        });
1594        methods.add_async_method("to_header", |_, this, ()| async move {
1595            this.get_address_header("To").await.map_err(any_err)
1596        });
1597
1598        methods.add_async_method(
1599            "get_first_named_header_value",
1600            |_, this, name: String| async move {
1601                this.get_first_named_header_value(&name)
1602                    .await
1603                    .map_err(any_err)
1604            },
1605        );
1606        methods.add_async_method(
1607            "get_all_named_header_values",
1608            |_, this, name: String| async move {
1609                this.get_all_named_header_values(&name)
1610                    .await
1611                    .map_err(any_err)
1612            },
1613        );
1614        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1615            Ok(this
1616                .get_all_headers()
1617                .await
1618                .map_err(any_err)?
1619                .into_iter()
1620                .map(|(name, value)| vec![name, value])
1621                .collect::<Vec<Vec<String>>>())
1622        });
1623        methods.add_async_method(
1624            "import_x_headers",
1625            |_, this, names: Option<Vec<String>>| async move {
1626                this.import_x_headers(names.unwrap_or_default())
1627                    .await
1628                    .map_err(any_err)
1629            },
1630        );
1631
1632        methods.add_async_method(
1633            "remove_x_headers",
1634            |_, this, names: Option<Vec<String>>| async move {
1635                this.remove_x_headers(names.unwrap_or_default())
1636                    .await
1637                    .map_err(any_err)
1638            },
1639        );
1640        methods.add_async_method(
1641            "remove_all_named_headers",
1642            |_, this, name: String| async move {
1643                this.remove_all_named_headers(&name).await.map_err(any_err)
1644            },
1645        );
1646
1647        methods.add_async_method(
1648            "import_scheduling_header",
1649            |lua, this, (header_name, remove): (String, bool)| async move {
1650                let opt_schedule = this
1651                    .import_scheduling_header(&header_name, remove)
1652                    .await
1653                    .map_err(any_err)?;
1654                lua.to_value(&opt_schedule)
1655            },
1656        );
1657
1658        methods.add_async_method(
1659            "set_scheduling",
1660            move |lua, this, params: mlua::Value| async move {
1661                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1662                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1663                lua.to_value(&opt_schedule)
1664            },
1665        );
1666
1667        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1668            let report = this.parse_rfc3464().await.map_err(any_err)?;
1669            match report {
1670                Some(report) => lua.to_value_with(&report, serialize_options()),
1671                None => Ok(mlua::Value::Nil),
1672            }
1673        });
1674
1675        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1676            let report = this.parse_rfc5965().await.map_err(any_err)?;
1677            match report {
1678                Some(report) => lua.to_value_with(&report, serialize_options()),
1679                None => Ok(mlua::Value::Nil),
1680            }
1681        });
1682
1683        methods.add_async_method("save", |_, this, ()| async move {
1684            this.save(None).await.map_err(any_err)
1685        });
1686
1687        methods.add_method("set_force_sync", move |_, this, force: bool| {
1688            this.set_force_sync(force);
1689            Ok(())
1690        });
1691
1692        methods.add_async_method(
1693            "check_fix_conformance",
1694            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1695                use std::str::FromStr;
1696                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1697                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1698
1699                let settings = match settings {
1700                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1701                    None => None,
1702                };
1703
1704                match this
1705                    .check_fix_conformance(check, fix, settings.as_ref())
1706                    .await
1707                {
1708                    Ok(_) => Ok(None),
1709                    Err(err) => Ok(Some(format!("{err:#}"))),
1710                }
1711            },
1712        );
1713    }
1714}
1715
1716impl TimerEntryWithDelay for WeakMessage {
1717    fn delay(&self) -> Duration {
1718        match self.upgrade() {
1719            None => {
1720                // Dangling/Cancelled. Make it appear due immediately
1721                Duration::from_millis(0)
1722            }
1723            Some(msg) => msg.delay(),
1724        }
1725    }
1726}
1727
1728impl TimerEntryWithDelay for Message {
1729    fn delay(&self) -> Duration {
1730        let inner = self.msg_and_id.inner.lock();
1731        match inner.due {
1732            Some(time) => {
1733                let now = Utc::now();
1734                let delta = time - now;
1735                delta.to_std().unwrap_or(Duration::from_millis(0))
1736            }
1737            None => Duration::from_millis(0),
1738        }
1739    }
1740}
1741
1742#[cfg(test)]
1743pub(crate) mod test {
1744    use super::*;
1745    use serde_json::json;
1746
1747    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1748        Message::new_dirty(
1749            SpoolId::new(),
1750            EnvelopeAddress::parse("sender@example.com").unwrap(),
1751            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1752            serde_json::json!({}),
1753            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1754        )
1755        .unwrap()
1756    }
1757
1758    fn data_as_string(msg: &Message) -> String {
1759        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1760    }
1761
1762    const X_HDR_CONTENT: &str =
1763        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1764
1765    #[tokio::test]
1766    async fn import_all_x_headers() {
1767        let msg = new_msg_body(X_HDR_CONTENT);
1768
1769        msg.import_x_headers(vec![]).await.unwrap();
1770        k9::assert_equal!(
1771            msg.get_meta_obj().await.unwrap(),
1772            json!({
1773                "x_hello": "there",
1774                "x_header": "value",
1775            })
1776        );
1777    }
1778
1779    #[tokio::test]
1780    async fn meta_and_nil() {
1781        let msg = new_msg_body(X_HDR_CONTENT);
1782        // Ensure that json null round-trips
1783        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1784        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1785
1786        // and that it is exposed to lua as nil
1787        let lua = mlua::Lua::new();
1788        lua.globals().set("msg", msg).unwrap();
1789        lua.load("assert(msg:get_meta('test') == nil)")
1790            .exec()
1791            .unwrap();
1792    }
1793
1794    #[tokio::test]
1795    async fn import_some_x_headers() {
1796        let msg = new_msg_body(X_HDR_CONTENT);
1797
1798        msg.import_x_headers(vec!["x-hello".to_string()])
1799            .await
1800            .unwrap();
1801        k9::assert_equal!(
1802            msg.get_meta_obj().await.unwrap(),
1803            json!({
1804                "x_hello": "there",
1805            })
1806        );
1807    }
1808
1809    #[tokio::test]
1810    async fn remove_all_x_headers() {
1811        let msg = new_msg_body(X_HDR_CONTENT);
1812
1813        msg.remove_x_headers(vec![]).await.unwrap();
1814        k9::assert_equal!(
1815            data_as_string(&msg),
1816            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1817        );
1818    }
1819
1820    #[tokio::test]
1821    async fn prepend_header_2_params() {
1822        let msg = new_msg_body(X_HDR_CONTENT);
1823
1824        msg.prepend_header(Some("Date"), "Today").await.unwrap();
1825        k9::assert_equal!(
1826            data_as_string(&msg),
1827            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn prepend_header_1_params() {
1833        let msg = new_msg_body(X_HDR_CONTENT);
1834
1835        msg.prepend_header(None, "Date: Today").await.unwrap();
1836        k9::assert_equal!(
1837            data_as_string(&msg),
1838            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1839        );
1840    }
1841
1842    #[tokio::test]
1843    async fn append_header_2_params() {
1844        let msg = new_msg_body(X_HDR_CONTENT);
1845
1846        msg.append_header(Some("Date"), "Today").await.unwrap();
1847        k9::assert_equal!(
1848            data_as_string(&msg),
1849            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1850        );
1851    }
1852
1853    #[tokio::test]
1854    async fn append_header_1_params() {
1855        let msg = new_msg_body(X_HDR_CONTENT);
1856
1857        msg.append_header(None, "Date: Today").await.unwrap();
1858        k9::assert_equal!(
1859            data_as_string(&msg),
1860            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1861        );
1862    }
1863
1864    const MULTI_HEADER_CONTENT: &str =
1865        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1866
1867    #[tokio::test]
1868    async fn get_first_header() {
1869        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1870        k9::assert_equal!(
1871            msg.get_first_named_header_value("X-header")
1872                .await
1873                .unwrap()
1874                .unwrap(),
1875            "value"
1876        );
1877    }
1878
1879    #[tokio::test]
1880    async fn get_all_header() {
1881        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1882        k9::assert_equal!(
1883            msg.get_all_named_header_values("X-header").await.unwrap(),
1884            vec!["value".to_string(), "another value".to_string()]
1885        );
1886    }
1887
1888    #[tokio::test]
1889    async fn remove_first() {
1890        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1891        msg.remove_first_named_header("X-header").await.unwrap();
1892        k9::assert_equal!(
1893            data_as_string(&msg),
1894            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1895        );
1896    }
1897
1898    #[tokio::test]
1899    async fn remove_all() {
1900        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1901        msg.remove_all_named_headers("X-header").await.unwrap();
1902        k9::assert_equal!(
1903            data_as_string(&msg),
1904            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1905        );
1906    }
1907
1908    #[tokio::test]
1909    async fn append_text_plain() {
1910        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1911        msg.append_text_plain("I am at the bottom").await.unwrap();
1912        k9::assert_equal!(
1913            data_as_string(&msg),
1914            "X-Hello: there\r\n\
1915             X-Header: value\r\n\
1916             Subject: Hello\r\n\
1917             X-Header: another value\r\n\
1918             From :Someone@somewhere\r\n\
1919             Content-Type: text/plain;\r\n\
1920             \tcharset=\"us-ascii\"\r\n\
1921             \r\n\
1922             Body\r\n\
1923             I am at the bottom\r\n"
1924        );
1925    }
1926
1927    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1928\tboundary=\"my-boundary\"\r\n\
1929\r\n\
1930--my-boundary\r\n\
1931Content-Type: text/plain;\r\n\
1932\tcharset=\"us-ascii\"\r\n\
1933\r\n\
1934plain text\r\n\
1935--my-boundary\r\n\
1936Content-Type: text/html;\r\n\
1937\tcharset=\"us-ascii\"\r\n\
1938\r\n\
1939<b>rich</b> text\r\n\
1940--my-boundary\r\n\
1941Content-Type: application/octet-stream\r\n\
1942Content-Transfer-Encoding: base64\r\n\
1943Content-Disposition: attachment;\r\n\
1944\tfilename=\"woot.bin\"\r\n\
1945Content-ID: <woot.id@somewhere>\r\n\
1946\r\n\
1947AAECAw==\r\n\
1948--my-boundary--\r\n\
1949\r\n";
1950
1951    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1952\tboundary=\"my-boundary\"\r\n\
1953\r\n\
1954--my-boundary\r\n\
1955Content-Type: text/plain;\r\n\
1956\tcharset=\"us-ascii\"\r\n\
1957\r\n\
1958plain text\r\n\
1959--my-boundary\r\n\
1960Content-Type: text/html;\r\n\
1961\tcharset=\"us-ascii\"\r\n\
1962\r\n\
1963<BODY>\r\n\
1964<b>rich</b> text\r\n\
1965</BODY>\r\n\
1966--my-boundary\r\n\
1967Content-Type: application/octet-stream\r\n\
1968Content-Transfer-Encoding: base64\r\n\
1969Content-Disposition: attachment;\r\n\
1970\tfilename=\"woot.bin\"\r\n\
1971Content-ID: <woot.id>\r\n\
1972\r\n\
1973AAECAw==\r\n\
1974--my-boundary--\r\n\
1975\r\n";
1976
1977    #[tokio::test]
1978    async fn append_text_html() {
1979        let msg = new_msg_body(MIXED_CONTENT);
1980        msg.append_text_html("bottom html").await.unwrap();
1981        k9::snapshot!(
1982            data_as_string(&msg),
1983            r#"
1984Content-Type: multipart/mixed;\r
1985\tboundary="my-boundary"\r
1986\r
1987--my-boundary\r
1988Content-Type: text/plain;\r
1989\tcharset="us-ascii"\r
1990\r
1991plain text\r
1992--my-boundary\r
1993Content-Type: text/html;\r
1994\tcharset="us-ascii"\r
1995\r
1996<b>rich</b> text\r
1997\r
1998bottom html\r
1999--my-boundary\r
2000Content-Type: application/octet-stream\r
2001Content-Transfer-Encoding: base64\r
2002Content-Disposition: attachment;\r
2003\tfilename="woot.bin"\r
2004Content-ID: <woot.id@somewhere>\r
2005\r
2006AAECAw==\r
2007--my-boundary--\r
2008\r
2009
2010"#
2011        );
2012
2013        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2014        msg.append_text_html("bottom html 👻").await.unwrap();
2015        k9::snapshot!(
2016            data_as_string(&msg),
2017            r#"
2018Content-Type: multipart/mixed;\r
2019\tboundary="my-boundary"\r
2020\r
2021--my-boundary\r
2022Content-Type: text/plain;\r
2023\tcharset="us-ascii"\r
2024\r
2025plain text\r
2026--my-boundary\r
2027Content-Type: text/html;\r
2028\tcharset="utf-8"\r
2029Content-Transfer-Encoding: quoted-printable\r
2030\r
2031<BODY>\r
2032<b>rich</b> text\r
2033\r
2034bottom html =F0=9F=91=BB</BODY>\r
2035--my-boundary\r
2036Content-Type: application/octet-stream\r
2037Content-Transfer-Encoding: base64\r
2038Content-Disposition: attachment;\r
2039\tfilename="woot.bin"\r
2040Content-ID: <woot.id>\r
2041\r
2042AAECAw==\r
2043--my-boundary--\r
2044\r
2045
2046"#
2047        );
2048    }
2049
2050    #[tokio::test]
2051    async fn append_text_plain_mixed() {
2052        let msg = new_msg_body(MIXED_CONTENT);
2053        msg.append_text_plain("bottom text 👾").await.unwrap();
2054        k9::snapshot!(
2055            data_as_string(&msg),
2056            r#"
2057Content-Type: multipart/mixed;\r
2058\tboundary="my-boundary"\r
2059\r
2060--my-boundary\r
2061Content-Type: text/plain;\r
2062\tcharset="utf-8"\r
2063Content-Transfer-Encoding: quoted-printable\r
2064\r
2065plain text\r
2066\r
2067bottom text =F0=9F=91=BE\r
2068--my-boundary\r
2069Content-Type: text/html;\r
2070\tcharset="us-ascii"\r
2071\r
2072<b>rich</b> text\r
2073--my-boundary\r
2074Content-Type: application/octet-stream\r
2075Content-Transfer-Encoding: base64\r
2076Content-Disposition: attachment;\r
2077\tfilename="woot.bin"\r
2078Content-ID: <woot.id@somewhere>\r
2079\r
2080AAECAw==\r
2081--my-boundary--\r
2082\r
2083
2084"#
2085        );
2086    }
2087
2088    #[tokio::test]
2089    async fn check_conformance_angle_msg_id() {
2090        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2091Message-ID: <<1234@example.com>>\r
2092\r
2093Hello";
2094        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2095        k9::snapshot!(
2096            msg.check_fix_conformance(
2097                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2098                MessageConformance::empty(),
2099                None,
2100            )
2101            .await
2102            .unwrap_err()
2103            .to_string(),
2104            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2105        );
2106
2107        msg.check_fix_conformance(
2108            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2109            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2110            None,
2111        )
2112        .await
2113        .unwrap();
2114
2115        // Can't use a snapshot test here because the fixed header
2116        // has a unique random component
2117        /*
2118                k9::snapshot!(
2119                    data_as_string(&msg),
2120                    r#"
2121        Subject: hello\r
2122        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2123        \r
2124        Hello
2125        "#
2126                );
2127        */
2128
2129        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2130Message-ID: <<1234@example.com>>\r
2131\r
2132Hello this is a really long line Hello this is a really long line \
2133Hello this is a really long line Hello this is a really long line \
2134Hello this is a really long line Hello this is a really long line \
2135Hello this is a really long line Hello this is a really long line \
2136Hello this is a really long line Hello this is a really long line \
2137Hello this is a really long line Hello this is a really long line \
2138Hello this is a really long line Hello this is a really long line
2139";
2140        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2141        msg.check_fix_conformance(
2142            MessageConformance::MISSING_COLON_VALUE,
2143            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2144            None,
2145        )
2146        .await
2147        .unwrap();
2148
2149        // Can't use a snapshot test here because the fixed header
2150        // has a random component
2151        /*
2152                k9::snapshot!(
2153                    data_as_string(&msg),
2154                    r#"
2155        Content-Type: text/plain;\r
2156        \tcharset="us-ascii"\r
2157        Content-Transfer-Encoding: quoted-printable\r
2158        Subject: hello\r
2159        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2160        \r
2161        Hello this is a really long line Hello this is a really long line Hello thi=\r
2162        s is a really long line Hello this is a really long line Hello this is a re=\r
2163        ally long line Hello this is a really long line Hello this is a really long=\r
2164         line Hello this is a really long line Hello this is a really long line Hel=\r
2165        lo this is a really long line Hello this is a really long line Hello this i=\r
2166        s a really long line Hello this is a really long line Hello this is a reall=\r
2167        y long line=0A\r
2168
2169        "#
2170                );
2171        */
2172    }
2173
2174    #[tokio::test]
2175    async fn check_conformance() {
2176        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2177        msg.check_fix_conformance(
2178            MessageConformance::default(),
2179            MessageConformance::MISSING_MIME_VERSION,
2180            None,
2181        )
2182        .await
2183        .unwrap();
2184        k9::snapshot!(
2185            data_as_string(&msg),
2186            r#"
2187X-Hello: there\r
2188X-Header: value\r
2189Subject: Hello\r
2190X-Header: another value\r
2191From :Someone@somewhere\r
2192Mime-Version: 1.0\r
2193\r
2194Body
2195"#
2196        );
2197
2198        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2199        msg.check_fix_conformance(
2200            MessageConformance::default(),
2201            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2202            None,
2203        )
2204        .await
2205        .unwrap();
2206        k9::snapshot!(
2207            data_as_string(&msg),
2208            r#"
2209Content-Type: text/plain;\r
2210\tcharset="us-ascii"\r
2211X-Hello: there\r
2212X-Header: value\r
2213Subject: Hello\r
2214X-Header: another value\r
2215From: <Someone@somewhere>\r
2216Mime-Version: 1.0\r
2217\r
2218Body\r
2219
2220"#
2221        );
2222    }
2223
2224    #[tokio::test]
2225    async fn check_fix_latin_input() {
2226        const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2227        let msg = new_msg_body(&*POUNDS);
2228        msg.check_fix_conformance(
2229            MessageConformance::default(),
2230            MessageConformance::NEEDS_TRANSFER_ENCODING,
2231            Some(&CheckFixSettings {
2232                detect_encoding: true,
2233                include_encodings: vec!["iso-8859-1".to_string()],
2234                ..Default::default()
2235            }),
2236        )
2237        .await
2238        .unwrap();
2239
2240        let subject = msg
2241            .get_first_named_header_value("subject")
2242            .await
2243            .unwrap()
2244            .unwrap();
2245        assert_eq!(subject, "£");
2246    }
2247
2248    #[tokio::test]
2249    async fn set_scheduling() -> anyhow::Result<()> {
2250        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2251        assert!(msg.get_due().is_none(), "due is implicitly now");
2252
2253        let now = Utc::now();
2254        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2255
2256        msg.set_scheduling(Some(Scheduling {
2257            restriction: None,
2258            first_attempt: Some((now + one_day).into()),
2259            expires: None,
2260        }))
2261        .await?;
2262
2263        let due = msg.get_due().expect("due to now be set");
2264        assert!(due - now >= one_day, "due time is at least 1 day away");
2265
2266        Ok(())
2267    }
2268
2269    #[cfg(all(test, target_pointer_width = "64"))]
2270    #[test]
2271    fn sizes() {
2272        assert_eq!(std::mem::size_of::<Message>(), 8);
2273        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2274        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2275    }
2276}