message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20#[cfg(feature = "impl")]
21use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
22use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
23#[cfg(feature = "impl")]
24use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
25#[cfg(feature = "impl")]
26use mod_dns_resolver::get_resolver_instance;
27use parking_lot::Mutex;
28use prometheus::{Histogram, IntGauge};
29use serde::{Deserialize, Serialize};
30use serde_with::formats::PreferOne;
31use serde_with::{serde_as, OneOrMany};
32use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
33use std::hash::Hash;
34use std::sync::{Arc, LazyLock, Weak};
35use std::time::{Duration, Instant};
36use timeq::TimerEntryWithDelay;
37
38bitflags::bitflags! {
39    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
40    struct MessageFlags: u8 {
41        /// true if Metadata needs to be saved
42        const META_DIRTY = 1;
43        /// true if Data needs to be saved
44        const DATA_DIRTY = 2;
45        /// true if scheduling restrictions are present in the metadata
46        const SCHEDULED = 4;
47        /// true if high durability writes should always be used
48        const FORCE_SYNC = 8;
49    }
50}
51
52static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
53    prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
54});
55static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
56    prometheus::register_int_gauge!(
57        "message_meta_resident_count",
58        "total number of Message objects with metadata loaded"
59    )
60    .unwrap()
61});
62static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
63    prometheus::register_int_gauge!(
64        "message_data_resident_count",
65        "total number of Message objects with body data loaded"
66    )
67    .unwrap()
68});
69static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
70static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
71    prometheus::register_histogram!(
72        "message_save_latency",
73        "how long it takes to save a message to spool"
74    )
75    .unwrap()
76});
77static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
78    prometheus::register_histogram!(
79        "message_data_load_latency",
80        "how long it takes to load message data from spool"
81    )
82    .unwrap()
83});
84static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
85    prometheus::register_histogram!(
86        "message_meta_load_latency",
87        "how long it takes to load message metadata from spool"
88    )
89    .unwrap()
90});
91
92#[derive(Debug)]
93struct MessageInner {
94    metadata: Option<Box<MetaData>>,
95    data: Arc<Box<[u8]>>,
96    flags: MessageFlags,
97    num_attempts: u16,
98    due: Option<DateTime<Utc>>,
99}
100
101#[derive(Debug)]
102pub(crate) struct MessageWithId {
103    id: SpoolId,
104    inner: Mutex<MessageInner>,
105    link: LinkedListAtomicLink,
106}
107
108intrusive_adapter!(
109    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
110);
111
112/// A list of messages with an O(1) list overhead; no additional
113/// memory per-message is required to track this list.
114/// However, a given Message can only belong to one instance
115/// of such a list at a time.
116pub struct MessageList {
117    list: LinkedList<MessageWithIdAdapter>,
118    len: usize,
119}
120
121impl Default for MessageList {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl MessageList {
128    /// Create a new MessageList
129    pub fn new() -> Self {
130        Self {
131            list: LinkedList::new(MessageWithIdAdapter::default()),
132            len: 0,
133        }
134    }
135
136    /// Returns the number of elements contained in the list
137    pub fn len(&self) -> usize {
138        self.len
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.len == 0
143    }
144
145    /// Take all of the elements from this list and return them
146    /// in a new separate instance of MessageList.
147    pub fn take(&mut self) -> Self {
148        let new_list = Self {
149            list: self.list.take(),
150            len: self.len,
151        };
152        self.len = 0;
153        new_list
154    }
155
156    /// Push a message to the back of the list
157    pub fn push_back(&mut self, message: Message) {
158        self.list.push_back(message.msg_and_id);
159        self.len += 1;
160    }
161
162    /// Pop a message from the front of the list
163    pub fn pop_front(&mut self) -> Option<Message> {
164        self.list.pop_front().map(|msg_and_id| {
165            self.len -= 1;
166            Message { msg_and_id }
167        })
168    }
169
170    /// Pop a message from the back of the list
171    pub fn pop_back(&mut self) -> Option<Message> {
172        self.list.pop_back().map(|msg_and_id| {
173            self.len -= 1;
174            Message { msg_and_id }
175        })
176    }
177
178    /// Pop all of the messages from this list into a vector
179    /// of messages.
180    /// Memory usage is O(number-of-messages).
181    pub fn drain(&mut self) -> Vec<Message> {
182        let mut messages = Vec::with_capacity(self.len);
183        while let Some(msg) = self.pop_front() {
184            messages.push(msg);
185        }
186        messages
187    }
188
189    pub fn extend_from_iter<I>(&mut self, iter: I)
190    where
191        I: Iterator<Item = Message>,
192    {
193        for msg in iter {
194            self.push_back(msg)
195        }
196    }
197}
198
199impl IntoIterator for MessageList {
200    type Item = Message;
201    type IntoIter = MessageListIter;
202    fn into_iter(self) -> MessageListIter {
203        MessageListIter { list: self.list }
204    }
205}
206
207pub struct MessageListIter {
208    list: LinkedList<MessageWithIdAdapter>,
209}
210
211impl Iterator for MessageListIter {
212    type Item = Message;
213    fn next(&mut self) -> Option<Message> {
214        self.list
215            .pop_front()
216            .map(|msg_and_id| Message { msg_and_id })
217    }
218}
219
220#[derive(Clone, Debug)]
221#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
222pub struct Message {
223    pub(crate) msg_and_id: Arc<MessageWithId>,
224}
225
226impl PartialEq for Message {
227    fn eq(&self, other: &Self) -> bool {
228        self.id() == other.id()
229    }
230}
231impl Eq for Message {}
232
233impl Hash for Message {
234    fn hash<H>(&self, hasher: &mut H)
235    where
236        H: std::hash::Hasher,
237    {
238        self.id().hash(hasher)
239    }
240}
241
242#[derive(Clone, Debug)]
243pub struct WeakMessage {
244    weak: Weak<MessageWithId>,
245}
246
247impl WeakMessage {
248    pub fn upgrade(&self) -> Option<Message> {
249        Some(Message {
250            msg_and_id: self.weak.upgrade()?,
251        })
252    }
253}
254
255#[serde_as]
256#[derive(Debug, Serialize, Deserialize, Clone)]
257pub(crate) struct MetaData {
258    pub sender: EnvelopeAddress,
259    #[serde_as(as = "OneOrMany<_, PreferOne>")]
260    pub recipient: Vec<EnvelopeAddress>,
261    pub meta: serde_json::Value,
262    #[serde(default)]
263    pub schedule: Option<Scheduling>,
264}
265
266impl Drop for MessageInner {
267    fn drop(&mut self) {
268        if self.metadata.is_some() {
269            META_COUNT.dec();
270        }
271        if !self.data.is_empty() {
272            DATA_COUNT.dec();
273        }
274        MESSAGE_COUNT.dec();
275    }
276}
277
278impl Message {
279    /// Create a new message with the supplied data.
280    /// The message meta and data are marked as dirty
281    pub fn new_dirty(
282        id: SpoolId,
283        sender: EnvelopeAddress,
284        recipient: Vec<EnvelopeAddress>,
285        meta: serde_json::Value,
286        data: Arc<Box<[u8]>>,
287    ) -> anyhow::Result<Self> {
288        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
289        MESSAGE_COUNT.inc();
290        DATA_COUNT.inc();
291        META_COUNT.inc();
292        Ok(Self {
293            msg_and_id: Arc::new(MessageWithId {
294                id,
295                inner: Mutex::new(MessageInner {
296                    metadata: Some(Box::new(MetaData {
297                        sender,
298                        recipient,
299                        meta,
300                        schedule: None,
301                    })),
302                    data,
303                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
304                    num_attempts: 0,
305                    due: None,
306                }),
307                link: LinkedListAtomicLink::default(),
308            }),
309        })
310    }
311
312    pub fn weak(&self) -> WeakMessage {
313        WeakMessage {
314            weak: Arc::downgrade(&self.msg_and_id),
315        }
316    }
317
318    /// Helper for creating a message based on spool enumeration.
319    /// Given a spool id and the serialized metadata blob, returns
320    /// a message holding the deserialized version of that metadata.
321    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
322        let metadata: MetaData = serde_json::from_slice(&metadata)?;
323        MESSAGE_COUNT.inc();
324        META_COUNT.inc();
325
326        let flags = if metadata.schedule.is_some() {
327            MessageFlags::SCHEDULED
328        } else {
329            MessageFlags::empty()
330        };
331
332        Ok(Self {
333            msg_and_id: Arc::new(MessageWithId {
334                id,
335                inner: Mutex::new(MessageInner {
336                    metadata: Some(Box::new(metadata)),
337                    data: NO_DATA.clone(),
338                    flags,
339                    num_attempts: 0,
340                    due: None,
341                }),
342                link: LinkedListAtomicLink::default(),
343            }),
344        })
345    }
346
347    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
348        MESSAGE_COUNT.inc();
349        META_COUNT.inc();
350
351        let flags = if metadata.schedule.is_some() {
352            MessageFlags::SCHEDULED
353        } else {
354            MessageFlags::empty()
355        };
356
357        Self {
358            msg_and_id: Arc::new(MessageWithId {
359                id,
360                inner: Mutex::new(MessageInner {
361                    metadata: Some(Box::new(metadata)),
362                    data,
363                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
364                    num_attempts: 0,
365                    due: None,
366                }),
367                link: LinkedListAtomicLink::default(),
368            }),
369        }
370    }
371
372    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
373        let meta_spool = get_meta_spool();
374        let data = meta_spool.load(id).await?;
375        Self::new_from_spool(id, data)
376    }
377
378    pub fn get_num_attempts(&self) -> u16 {
379        let inner = self.msg_and_id.inner.lock();
380        inner.num_attempts
381    }
382
383    pub fn set_num_attempts(&self, num_attempts: u16) {
384        let mut inner = self.msg_and_id.inner.lock();
385        inner.num_attempts = num_attempts;
386    }
387
388    pub fn increment_num_attempts(&self) {
389        let mut inner = self.msg_and_id.inner.lock();
390        inner.num_attempts += 1;
391    }
392
393    pub async fn set_scheduling(
394        &self,
395        scheduling: Option<Scheduling>,
396    ) -> anyhow::Result<Option<Scheduling>> {
397        self.load_meta_if_needed().await?;
398        let mut inner = self.msg_and_id.inner.lock();
399        match &mut inner.metadata {
400            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
401            Some(meta) => {
402                meta.schedule = scheduling;
403                inner
404                    .flags
405                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
406                if let Some(sched) = scheduling {
407                    let due = inner.due.unwrap_or_else(Utc::now);
408                    inner.due = Some(sched.adjust_for_schedule(due));
409                }
410                Ok(scheduling)
411            }
412        }
413    }
414
415    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
416        self.load_meta_if_needed().await?;
417        let inner = self.msg_and_id.inner.lock();
418        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
419    }
420
421    pub fn get_due(&self) -> Option<DateTime<Utc>> {
422        let inner = self.msg_and_id.inner.lock();
423        inner.due
424    }
425
426    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
427        let scale = rand::random::<f32>();
428        let value = (scale * limit as f32) as i64;
429        self.delay_by(seconds(value)?).await
430    }
431
432    pub async fn delay_by(
433        &self,
434        duration: chrono::Duration,
435    ) -> anyhow::Result<Option<DateTime<Utc>>> {
436        let due = Utc::now() + duration;
437        self.set_due(Some(due)).await
438    }
439
440    /// Delay by requested duration, and add up to 1 minute of jitter
441    pub async fn delay_by_and_jitter(
442        &self,
443        duration: chrono::Duration,
444    ) -> anyhow::Result<Option<DateTime<Utc>>> {
445        let scale = rand::random::<f32>();
446        let value = (scale * 60.) as i64;
447        let due = Utc::now() + duration + seconds(value)?;
448        self.set_due(Some(due)).await
449    }
450
451    pub async fn set_due(
452        &self,
453        due: Option<DateTime<Utc>>,
454    ) -> anyhow::Result<Option<DateTime<Utc>>> {
455        let due = {
456            let mut inner = self.msg_and_id.inner.lock();
457
458            if !inner.flags.contains(MessageFlags::SCHEDULED) {
459                // This is the simple, fast-path, common case
460                inner.due = due;
461                return Ok(inner.due);
462            }
463
464            let due = due.unwrap_or_else(Utc::now);
465
466            if let Some(meta) = &inner.metadata {
467                inner.due = match &meta.schedule {
468                    Some(sched) => Some(sched.adjust_for_schedule(due)),
469                    None => Some(due),
470                };
471                return Ok(inner.due);
472            }
473
474            // We'll need to load the metadata to correctly
475            // update the schedule for this message
476            due
477        };
478
479        self.load_meta().await?;
480
481        {
482            let mut inner = self.msg_and_id.inner.lock();
483            match &inner.metadata {
484                Some(meta) => {
485                    inner.due = match &meta.schedule {
486                        Some(sched) => Some(sched.adjust_for_schedule(due)),
487                        None => Some(due),
488                    };
489                    Ok(inner.due)
490                }
491                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
492            }
493        }
494    }
495
496    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
497        let inner = self.msg_and_id.inner.lock();
498        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
499            Some(Arc::clone(&inner.data))
500        } else {
501            None
502        }
503    }
504
505    fn get_meta_if_dirty(&self) -> Option<MetaData> {
506        let inner = self.msg_and_id.inner.lock();
507        if inner.flags.contains(MessageFlags::META_DIRTY) {
508            inner.metadata.as_ref().map(|md| (**md).clone())
509        } else {
510            None
511        }
512    }
513
514    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
515        self.load_meta_if_needed().await?;
516        let inner = self.msg_and_id.inner.lock();
517        inner
518            .metadata
519            .as_ref()
520            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
521            .map(|md| (**md).clone())
522    }
523
524    pub fn set_force_sync(&self, force: bool) {
525        let mut inner = self.msg_and_id.inner.lock();
526        inner.flags.set(MessageFlags::FORCE_SYNC, force);
527    }
528
529    pub fn needs_save(&self) -> bool {
530        let inner = self.msg_and_id.inner.lock();
531        inner.flags.contains(MessageFlags::META_DIRTY)
532            || inner.flags.contains(MessageFlags::DATA_DIRTY)
533    }
534
535    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
536        let _timer = SAVE_HIST.start_timer();
537        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
538            .await
539    }
540
541    pub async fn save_to(
542        &self,
543        meta_spool: &(dyn Spool + Send + Sync),
544        data_spool: &(dyn Spool + Send + Sync),
545        deadline: Option<Instant>,
546    ) -> anyhow::Result<()> {
547        let force_sync = self
548            .msg_and_id
549            .inner
550            .lock()
551            .flags
552            .contains(MessageFlags::FORCE_SYNC);
553
554        let data_fut = if let Some(data) = self.get_data_if_dirty() {
555            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
556            data_spool
557                .store(self.msg_and_id.id, data, force_sync, deadline)
558                .map(|_| true)
559                .boxed()
560        } else {
561            futures::future::ready(false).boxed()
562        };
563        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
564            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
565            meta_spool
566                .store(self.msg_and_id.id, meta, force_sync, deadline)
567                .map(|_| true)
568                .boxed()
569        } else {
570            futures::future::ready(false).boxed()
571        };
572
573        // NOTE: if we have a deadline, it is tempting to want to use
574        // timeout_at here to enforce it, but the underlying spool
575        // futures are not guaranteed to be fully cancel safe, which
576        // is why we pass the deadline down to the save method to allow
577        // them to handle timeouts internally.
578        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
579
580        if data_res {
581            self.msg_and_id
582                .inner
583                .lock()
584                .flags
585                .remove(MessageFlags::DATA_DIRTY);
586        }
587        if meta_res {
588            self.msg_and_id
589                .inner
590                .lock()
591                .flags
592                .remove(MessageFlags::META_DIRTY);
593        }
594        Ok(())
595    }
596
597    pub fn id(&self) -> &SpoolId {
598        &self.msg_and_id.id
599    }
600
601    /// Save the data+meta if needed, then release both
602    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
603        self.save(None).await?;
604        self.shrink()
605    }
606
607    /// Save the data+meta if needed, then release just the data
608    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
609        self.save(None).await?;
610        self.shrink_data()
611    }
612
613    pub fn shrink_data(&self) -> anyhow::Result<bool> {
614        let mut inner = self.msg_and_id.inner.lock();
615        let mut did_shrink = false;
616        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
617            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
618        }
619        if !inner.data.is_empty() {
620            DATA_COUNT.dec();
621            did_shrink = true;
622        }
623        if !inner.data.is_empty() {
624            inner.data = NO_DATA.clone();
625            did_shrink = true;
626        }
627        Ok(did_shrink)
628    }
629
630    pub fn shrink(&self) -> anyhow::Result<bool> {
631        let mut inner = self.msg_and_id.inner.lock();
632        let mut did_shrink = false;
633        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
634            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
635        }
636        if inner.flags.contains(MessageFlags::META_DIRTY) {
637            anyhow::bail!("Cannot shrink message: META_DIRTY");
638        }
639        if inner.metadata.take().is_some() {
640            META_COUNT.dec();
641            did_shrink = true;
642        }
643        if !inner.data.is_empty() {
644            DATA_COUNT.dec();
645            did_shrink = true;
646        }
647        if !inner.data.is_empty() {
648            inner.data = NO_DATA.clone();
649            did_shrink = true;
650        }
651        Ok(did_shrink)
652    }
653
654    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
655        self.load_meta_if_needed().await?;
656        let inner = self.msg_and_id.inner.lock();
657        match &inner.metadata {
658            Some(meta) => Ok(meta.sender.clone()),
659            None => anyhow::bail!("Message::sender metadata is not loaded"),
660        }
661    }
662
663    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
664        self.load_meta_if_needed().await?;
665        let mut inner = self.msg_and_id.inner.lock();
666        match &mut inner.metadata {
667            Some(meta) => {
668                meta.sender = sender;
669                inner.flags.set(MessageFlags::DATA_DIRTY, true);
670                Ok(())
671            }
672            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
673        }
674    }
675
676    #[deprecated = "use recipient_list or first_recipient instead"]
677    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
678        self.first_recipient().await
679    }
680
681    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
682        self.load_meta_if_needed().await?;
683        let inner = self.msg_and_id.inner.lock();
684        match &inner.metadata {
685            Some(meta) => match meta.recipient.first() {
686                Some(recip) => Ok(recip.clone()),
687                None => anyhow::bail!("recipient list is empty!?"),
688            },
689            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
690        }
691    }
692
693    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
694        self.load_meta_if_needed().await?;
695        let inner = self.msg_and_id.inner.lock();
696        match &inner.metadata {
697            Some(meta) => Ok(meta.recipient.clone()),
698            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
699        }
700    }
701
702    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
703        self.load_meta_if_needed().await?;
704        let inner = self.msg_and_id.inner.lock();
705        match &inner.metadata {
706            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
707            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
708        }
709    }
710
711    #[deprecated = "use set_recipient_list instead"]
712    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
713        self.set_recipient_list(vec![recipient]).await
714    }
715
716    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
717        self.load_meta_if_needed().await?;
718
719        let mut inner = self.msg_and_id.inner.lock();
720        match &mut inner.metadata {
721            Some(meta) => {
722                meta.recipient = recipient;
723                inner.flags.set(MessageFlags::DATA_DIRTY, true);
724                Ok(())
725            }
726            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
727        }
728    }
729
730    pub fn is_meta_loaded(&self) -> bool {
731        self.msg_and_id.inner.lock().metadata.is_some()
732    }
733
734    pub fn is_data_loaded(&self) -> bool {
735        !self.msg_and_id.inner.lock().data.is_empty()
736    }
737
738    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
739        if self.is_meta_loaded() {
740            return Ok(());
741        }
742        self.load_meta().await
743    }
744
745    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
746        self.load_data_if_needed().await
747    }
748
749    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
750        if self.is_data_loaded() {
751            return Ok(self.get_data_maybe_not_loaded());
752        }
753        self.load_data().await
754    }
755
756    pub async fn load_meta(&self) -> anyhow::Result<()> {
757        let _timer = LOAD_META_HIST.start_timer();
758        self.load_meta_from(&**get_meta_spool()).await
759    }
760
761    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
762        let id = self.id();
763        let data = meta_spool.load(*id).await?;
764        let mut inner = self.msg_and_id.inner.lock();
765        let was_not_loaded = inner.metadata.is_none();
766        let metadata: MetaData = serde_json::from_slice(&data)?;
767        inner.metadata.replace(Box::new(metadata));
768        if was_not_loaded {
769            META_COUNT.inc();
770        }
771        Ok(())
772    }
773
774    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
775        let _timer = LOAD_DATA_HIST.start_timer();
776        self.load_data_from(&**get_data_spool()).await
777    }
778
779    async fn load_data_from(
780        &self,
781        data_spool: &(dyn Spool + Send + Sync),
782    ) -> anyhow::Result<Arc<Box<[u8]>>> {
783        let data = data_spool.load(*self.id()).await?;
784        let mut inner = self.msg_and_id.inner.lock();
785        let was_empty = inner.data.is_empty();
786        inner.data = Arc::new(data.into_boxed_slice());
787        if was_empty {
788            DATA_COUNT.inc();
789        }
790        Ok(inner.data.clone())
791    }
792
793    pub fn assign_data(&self, data: Vec<u8>) {
794        let mut inner = self.msg_and_id.inner.lock();
795        let was_empty = inner.data.is_empty();
796        inner.data = Arc::new(data.into_boxed_slice());
797        inner.flags.set(MessageFlags::DATA_DIRTY, true);
798        if was_empty {
799            DATA_COUNT.inc();
800        }
801    }
802
803    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
804        let inner = self.msg_and_id.inner.lock();
805        inner.data.clone()
806    }
807
808    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
809        &self,
810        key: S,
811        value: V,
812    ) -> anyhow::Result<()> {
813        self.load_meta_if_needed().await?;
814        let mut inner = self.msg_and_id.inner.lock();
815        match &mut inner.metadata {
816            None => anyhow::bail!("set_meta: metadata must be loaded first"),
817            Some(meta) => {
818                let key = key.as_ref();
819                let value = value.into();
820
821                match &mut meta.meta {
822                    serde_json::Value::Object(map) => {
823                        map.insert(key.to_string(), value);
824                    }
825                    _ => anyhow::bail!("metadata is somehow not a json object"),
826                }
827
828                inner.flags.set(MessageFlags::META_DIRTY, true);
829                Ok(())
830            }
831        }
832    }
833
834    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
835        self.load_meta_if_needed().await?;
836        let mut inner = self.msg_and_id.inner.lock();
837        match &mut inner.metadata {
838            None => anyhow::bail!("set_meta: metadata must be loaded first"),
839            Some(meta) => {
840                let key = key.as_ref();
841
842                match &mut meta.meta {
843                    serde_json::Value::Object(map) => {
844                        map.remove(key);
845                    }
846                    _ => anyhow::bail!("metadata is somehow not a json object"),
847                }
848
849                inner.flags.set(MessageFlags::META_DIRTY, true);
850                Ok(())
851            }
852        }
853    }
854
855    /// Retrieve `key` as a String.
856    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
857        &self,
858        key: S,
859    ) -> anyhow::Result<Option<String>> {
860        match self.get_meta(key).await {
861            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
862            Ok(serde_json::Value::Null) => Ok(None),
863            hmm => {
864                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
865            }
866        }
867    }
868
869    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
870        self.load_meta_if_needed().await?;
871        let inner = self.msg_and_id.inner.lock();
872        match &inner.metadata {
873            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
874            Some(meta) => Ok(meta.meta.clone()),
875        }
876    }
877
878    pub async fn get_meta<S: serde_json::value::Index>(
879        &self,
880        key: S,
881    ) -> anyhow::Result<serde_json::Value> {
882        self.load_meta_if_needed().await?;
883        let inner = self.msg_and_id.inner.lock();
884        match &inner.metadata {
885            None => anyhow::bail!("get_meta: metadata must be loaded first"),
886            Some(meta) => match meta.meta.get(key) {
887                Some(value) => Ok(value.clone()),
888                None => Ok(serde_json::Value::Null),
889            },
890        }
891    }
892
893    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
894        self.msg_and_id.id.age(now)
895    }
896
897    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
898        Ok(match self.get_meta_string("queue").await? {
899            Some(name) => name,
900            None => {
901                let name = QueueNameComponents::format(
902                    self.get_meta_string("campaign").await?,
903                    self.get_meta_string("tenant").await?,
904                    self.first_recipient()
905                        .await?
906                        .domain()
907                        .to_string()
908                        .to_lowercase(),
909                    self.get_meta_string("routing_domain").await?,
910                );
911                name.to_string()
912            }
913        })
914    }
915
916    #[cfg(feature = "impl")]
917    pub async fn arc_verify(
918        &self,
919        opt_resolver_name: Option<String>,
920    ) -> anyhow::Result<AuthenticationResult> {
921        let resolver = get_resolver_instance(&opt_resolver_name)?;
922        let data = self.data().await?;
923        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
924
925        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
926        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
927
928        let arc = ARC::verify(&message, &**resolver).await;
929        Ok(arc.authentication_result())
930    }
931
932    #[cfg(feature = "impl")]
933    pub async fn arc_seal(
934        &self,
935        signer: Signer,
936        auth_results: AuthenticationResults,
937        opt_resolver_name: Option<String>,
938    ) -> anyhow::Result<()> {
939        let resolver = get_resolver_instance(&opt_resolver_name)?;
940        let data = self.data().await?;
941        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
942        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
943        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
944        let arc = ARC::verify(&message, &**resolver).await;
945
946        let headers = arc.seal(&message, auth_results, signer.signer())?;
947        if !headers.is_empty() {
948            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
949
950            for hdr in headers {
951                hdr.write_header(&mut new_data).ok();
952            }
953            new_data.extend_from_slice(&data);
954            self.assign_data(new_data);
955        }
956
957        Ok(())
958    }
959
960    #[cfg(feature = "impl")]
961    pub async fn dkim_verify(
962        &self,
963        opt_resolver_name: Option<String>,
964    ) -> anyhow::Result<Vec<AuthenticationResult>> {
965        let resolver = get_resolver_instance(&opt_resolver_name)?;
966        let data = self.data().await?;
967        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
968
969        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
970        if parsed
971            .overall_conformance
972            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
973        {
974            return Ok(vec![AuthenticationResult {
975                method: "dkim".to_string(),
976                method_version: None,
977                result: "permerror".to_string(),
978                reason: Some("message has non-canonical line endings".to_string()),
979                props: Default::default(),
980            }]);
981        }
982        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
983
984        let from = match message.get_headers().from() {
985            Ok(Some(from)) if from.0.len() == 1 => from.0,
986            Ok(Some(from)) => {
987                return Ok(vec![AuthenticationResult {
988                    method: "dkim".to_string(),
989                    method_version: None,
990                    result: "permerror".to_string(),
991                    reason: Some(format!(
992                        "From header must have a single sender, found {}",
993                        from.len()
994                    )),
995                    props: Default::default(),
996                }]);
997            }
998            Ok(None) => {
999                return Ok(vec![AuthenticationResult {
1000                    method: "dkim".to_string(),
1001                    method_version: None,
1002                    result: "permerror".to_string(),
1003                    reason: Some("From header is missing".to_string()),
1004                    props: Default::default(),
1005                }]);
1006            }
1007            Err(err) => {
1008                return Ok(vec![AuthenticationResult {
1009                    method: "dkim".to_string(),
1010                    method_version: None,
1011                    result: "permerror".to_string(),
1012                    reason: Some(format!("From header is invalid: {err:#}")),
1013                    props: Default::default(),
1014                }]);
1015            }
1016        };
1017        let from_domain = &from[0].address.domain;
1018
1019        let results =
1020            kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1021        Ok(results)
1022    }
1023
1024    /// Parses the content into an owned MimePart.
1025    /// Changes to that MimePart are NOT reflected in the underlying
1026    /// message; you must re-assign the message data if you wish to modify
1027    /// the message content.
1028    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1029        let data = self.data().await?;
1030        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1031        Ok(MimePart::parse(owned_data)?)
1032    }
1033
1034    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1035        let data = self.data().await?;
1036        Report::parse(&data)
1037    }
1038
1039    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1040        let data = self.data().await?;
1041        ARFReport::parse(&data)
1042    }
1043
1044    pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1045        let data = self.data().await?;
1046        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1047        emit_header(&mut new_data, name, value);
1048        new_data.extend_from_slice(&data);
1049        self.assign_data(new_data);
1050        Ok(())
1051    }
1052
1053    pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1054        let data = self.data().await?;
1055        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1056        for (idx, window) in data.windows(4).enumerate() {
1057            if window == b"\r\n\r\n" {
1058                let headers = &data[0..idx + 2];
1059                let body = &data[idx + 2..];
1060
1061                new_data.extend_from_slice(headers);
1062                emit_header(&mut new_data, name, value);
1063                new_data.extend_from_slice(body);
1064                self.assign_data(new_data);
1065                return Ok(());
1066            }
1067        }
1068
1069        anyhow::bail!("append_header could not find the end of the header block");
1070    }
1071
1072    pub async fn get_address_header(
1073        &self,
1074        header_name: &str,
1075    ) -> anyhow::Result<Option<HeaderAddressList>> {
1076        let data = self.data().await?;
1077        let HeaderParseResult { headers, .. } =
1078            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1079
1080        match headers.get_first(header_name) {
1081            Some(hdr) => {
1082                let list = hdr.as_address_list()?;
1083                let result: HeaderAddressList = list.into();
1084                Ok(Some(result))
1085            }
1086            None => Ok(None),
1087        }
1088    }
1089
1090    pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1091        let data = self.data().await?;
1092        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1093
1094        match headers.get_first(name) {
1095            Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1096            None => Ok(None),
1097        }
1098    }
1099
1100    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1101        let data = self.data().await?;
1102        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1103
1104        let mut values = vec![];
1105        for hdr in headers.iter_named(name) {
1106            values.push(hdr.as_unstructured()?);
1107        }
1108        Ok(values)
1109    }
1110
1111    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1112        let data = self.data().await?;
1113        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1114
1115        let mut values = vec![];
1116        for hdr in headers.iter() {
1117            values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1118        }
1119        Ok(values)
1120    }
1121
1122    pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1123        &self,
1124        mut func: F,
1125    ) -> anyhow::Result<()> {
1126        let data = self.data().await?;
1127        let mut new_data = Vec::with_capacity(data.len());
1128        let HeaderParseResult {
1129            headers,
1130            body_offset,
1131            ..
1132        } = Header::parse_headers(data.as_ref().as_ref())?;
1133        for hdr in headers.iter() {
1134            let retain = (func)(hdr);
1135            if !retain {
1136                continue;
1137            }
1138            hdr.write_header(&mut new_data)?;
1139        }
1140        new_data.extend_from_slice(b"\r\n");
1141        new_data.extend_from_slice(&data[body_offset..]);
1142        self.assign_data(new_data);
1143        Ok(())
1144    }
1145
1146    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1147        let mut removed = false;
1148        self.retain_headers(|hdr| {
1149            if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1150                removed = true;
1151                false
1152            } else {
1153                true
1154            }
1155        })
1156        .await
1157    }
1158
1159    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1160        let data = self.data().await?;
1161        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1162
1163        for hdr in headers.iter() {
1164            let do_import = if names.is_empty() {
1165                is_x_header(hdr.get_name())
1166            } else {
1167                is_header_in_names_list(hdr.get_name(), &names)
1168            };
1169            if do_import {
1170                let name = imported_header_name(hdr.get_name());
1171                self.set_meta(name, hdr.as_unstructured()?).await?;
1172            }
1173        }
1174
1175        Ok(())
1176    }
1177
1178    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1179        self.retain_headers(|hdr| {
1180            if names.is_empty() {
1181                !is_x_header(hdr.get_name())
1182            } else {
1183                !is_header_in_names_list(hdr.get_name(), &names)
1184            }
1185        })
1186        .await
1187    }
1188
1189    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1190        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1191            .await
1192    }
1193
1194    #[cfg(feature = "impl")]
1195    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1196        let data = self.data().await?;
1197        let header = if let Some(runtime) = SIGN_POOL.get() {
1198            runtime.spawn_blocking(move || signer.sign(&data)).await??
1199        } else {
1200            signer.sign(&data)?
1201        };
1202        self.prepend_header(None, &header).await?;
1203        Ok(())
1204    }
1205
1206    pub async fn import_scheduling_header(
1207        &self,
1208        header_name: &str,
1209        remove: bool,
1210    ) -> anyhow::Result<Option<Scheduling>> {
1211        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1212            let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1213                format!("{value} from header {header_name} is not a valid Scheduling header")
1214            })?;
1215            let result = self.set_scheduling(Some(sched)).await?;
1216
1217            if remove {
1218                self.remove_all_named_headers(header_name).await?;
1219            }
1220
1221            Ok(result)
1222        } else {
1223            Ok(None)
1224        }
1225    }
1226
1227    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1228        let data = self.data().await?;
1229        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1230        let parts = msg.simplified_structure_pointers()?;
1231        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1232            match p.body()? {
1233                DecodedBody::Text(text) => {
1234                    let mut text = text.as_str().to_string();
1235                    text.push_str("\r\n");
1236                    text.push_str(content);
1237                    p.replace_text_body("text/plain", &text)?;
1238
1239                    let new_data = msg.to_message_string();
1240                    self.assign_data(new_data.into_bytes());
1241                    Ok(true)
1242                }
1243                DecodedBody::Binary(_) => {
1244                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1245                }
1246            }
1247        } else {
1248            Ok(false)
1249        }
1250    }
1251
1252    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1253        let data = self.data().await?;
1254        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1255        let parts = msg.simplified_structure_pointers()?;
1256        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1257            match p.body()? {
1258                DecodedBody::Text(text) => {
1259                    let mut text = text.as_str().to_string();
1260
1261                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1262                        Some(idx) => {
1263                            text.insert_str(idx, content);
1264                            text.insert_str(idx, "\r\n");
1265                        }
1266                        None => {
1267                            // Just append
1268                            text.push_str("\r\n");
1269                            text.push_str(content);
1270                        }
1271                    }
1272
1273                    p.replace_text_body("text/html", &text)?;
1274
1275                    let new_data = msg.to_message_string();
1276                    self.assign_data(new_data.into_bytes());
1277                    Ok(true)
1278                }
1279                DecodedBody::Binary(_) => {
1280                    anyhow::bail!("expected text/html part to be text, but it is binary");
1281                }
1282            }
1283        } else {
1284            Ok(false)
1285        }
1286    }
1287
1288    pub async fn check_fix_conformance(
1289        &self,
1290        check: MessageConformance,
1291        fix: MessageConformance,
1292        settings: Option<&CheckFixSettings>,
1293    ) -> anyhow::Result<()> {
1294        let data = self.data().await?;
1295        let data_bytes = data.as_ref().as_ref();
1296        let mut msg = MimePart::parse(data_bytes)?;
1297
1298        let conformance = msg.conformance();
1299
1300        // Don't raise errors for things that we're going to fix anyway
1301        let check = check - fix;
1302
1303        if check.intersects(conformance) {
1304            let problems = check.intersection(conformance).to_string();
1305            anyhow::bail!("Message has conformance issues: {problems}");
1306        }
1307
1308        if fix.intersects(conformance) {
1309            let to_fix = fix.intersection(conformance);
1310            let problems = to_fix.to_string();
1311
1312            let missing_headers_only = to_fix
1313                .difference(
1314                    MessageConformance::MISSING_DATE_HEADER
1315                        | MessageConformance::MISSING_MIME_VERSION
1316                        | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1317                )
1318                .is_empty();
1319
1320            if !missing_headers_only {
1321                if to_fix.contains(MessageConformance::NEEDS_TRANSFER_ENCODING) {
1322                    // Something is 8-bit. If we're lucky, it's simply UTF-8,
1323                    // but it could be some other "legacy" charset encoding.
1324                    // If we've been asked to detect an encoding, try that now,
1325                    // and re-parse the message with the re-coded input.
1326                    // Otherwise, we'll attempt a lossy conversion to UTF-8
1327                    // and the resulting message will likely include unicode
1328                    // replacement characters.
1329
1330                    if let Some(settings) = settings {
1331                        if settings.detect_encoding {
1332                            use charset_normalizer_rs::entity::NormalizerSettings;
1333
1334                            let norm_settings = NormalizerSettings {
1335                                include_encodings: settings.include_encodings.clone(),
1336                                exclude_encodings: settings.exclude_encodings.clone(),
1337                                ..Default::default()
1338                            };
1339
1340                            let guess = charset_normalizer_rs::from_bytes(
1341                                &data_bytes.to_vec(),
1342                                Some(norm_settings),
1343                            )
1344                            .map_err(|err| anyhow::anyhow!("{err}"))?;
1345                            if let Some(best) = guess.get_best() {
1346                                if let Some(decoded) = best.decoded_payload() {
1347                                    msg = MimePart::parse(decoded.to_string())?;
1348                                }
1349                            }
1350                        }
1351                    }
1352                }
1353
1354                msg = msg.rebuild().with_context(|| {
1355                    format!("Rebuilding message to correct conformance issues: {problems}")
1356                })?;
1357            }
1358
1359            if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1360                msg.headers_mut().set_date(Utc::now())?;
1361            }
1362
1363            if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1364                msg.headers_mut().set_mime_version("1.0")?;
1365            }
1366
1367            if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1368                let sender = self.sender().await?;
1369                let domain = sender.domain();
1370                let id = *self.id();
1371                msg.headers_mut()
1372                    .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")))?;
1373            }
1374
1375            let new_data = msg.to_message_string();
1376            self.assign_data(new_data.into_bytes());
1377        }
1378
1379        Ok(())
1380    }
1381}
1382
1383fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1384    for name in names {
1385        if hdr_name.eq_ignore_ascii_case(name) {
1386            return true;
1387        }
1388    }
1389    false
1390}
1391
1392fn imported_header_name(name: &str) -> String {
1393    name.chars()
1394        .map(|c| match c.to_ascii_lowercase() {
1395            '-' => '_',
1396            c => c,
1397        })
1398        .collect()
1399}
1400
1401fn is_x_header(name: &str) -> bool {
1402    name.starts_with("X-") || name.starts_with("x-")
1403}
1404
1405fn size_header(name: Option<&str>, value: &str) -> usize {
1406    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1407}
1408
1409fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1410    if let Some(name) = name {
1411        dest.extend_from_slice(name.as_bytes());
1412        dest.extend_from_slice(b": ");
1413    }
1414    dest.extend_from_slice(value.as_bytes());
1415    if !value.ends_with("\r\n") {
1416        dest.extend_from_slice(b"\r\n");
1417    }
1418}
1419
1420#[cfg(feature = "impl")]
1421impl UserData for Message {
1422    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1423        methods.add_async_method(
1424            "set_meta",
1425            move |_, this, (name, value): (String, mlua::Value)| async move {
1426                let value = serde_json::value::to_value(value).map_err(any_err)?;
1427                this.set_meta(name, value).await.map_err(any_err)?;
1428                Ok(())
1429            },
1430        );
1431        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1432            let value = this.get_meta(name).await.map_err(any_err)?;
1433            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1434        });
1435        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1436            let data = this.data().await.map_err(any_err)?;
1437            lua.create_string(&*data)
1438        });
1439        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1440            this.assign_data(data.as_bytes().to_vec());
1441            Ok(())
1442        });
1443
1444        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1445            let data = this.data().await.map_err(any_err)?;
1446            let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1447            let part = MimePart::parse(owned_data).map_err(any_err)?;
1448            Ok(mod_mimepart::PartRef::new(part))
1449        });
1450
1451        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1452            this.append_text_plain(&data).await.map_err(any_err)
1453        });
1454
1455        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1456            this.append_text_html(&data).await.map_err(any_err)
1457        });
1458
1459        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1460        methods.add_async_method("sender", move |_, this, _: ()| async move {
1461            this.sender().await.map_err(any_err)
1462        });
1463
1464        methods.add_method("num_attempts", move |_, this, _: ()| {
1465            Ok(this.get_num_attempts())
1466        });
1467
1468        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1469            this.get_queue_name().await.map_err(any_err)
1470        });
1471
1472        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1473            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1474            let revised_due = this.set_due(due).await.map_err(any_err)?;
1475            lua.to_value(&revised_due)
1476        });
1477
1478        methods.add_async_method(
1479            "set_sender",
1480            move |lua, this, value: mlua::Value| async move {
1481                let sender = match value {
1482                    mlua::Value::String(s) => {
1483                        let s = s.to_str()?;
1484                        EnvelopeAddress::parse(&s).map_err(any_err)?
1485                    }
1486                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1487                };
1488                this.set_sender(sender).await.map_err(any_err)
1489            },
1490        );
1491
1492        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1493            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1494            match recipients.len() {
1495                0 => Ok(mlua::Value::Nil),
1496                1 => {
1497                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1498                    recip.into_lua(&lua)
1499                }
1500                _ => recipients.into_lua(&lua),
1501            }
1502        });
1503
1504        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1505            let recipients = this.recipient_list().await.map_err(any_err)?;
1506            recipients.into_lua(&lua)
1507        });
1508
1509        methods.add_async_method(
1510            "set_recipient",
1511            move |lua, this, value: mlua::Value| async move {
1512                let recipients = match value {
1513                    mlua::Value::String(s) => {
1514                        let s = s.to_str()?;
1515                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1516                    }
1517                    _ => {
1518                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1519                            recips
1520                        } else {
1521                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1522                        }
1523                    }
1524                };
1525                this.set_recipient_list(recipients).await.map_err(any_err)
1526            },
1527        );
1528
1529        #[cfg(feature = "impl")]
1530        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1531            this.dkim_sign(signer).await.map_err(any_err)
1532        });
1533
1534        methods.add_async_method("shrink", |_, this, _: ()| async move {
1535            if this.needs_save() {
1536                this.save(None).await.map_err(any_err)?;
1537            }
1538            this.shrink().map_err(any_err)
1539        });
1540
1541        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1542            if this.needs_save() {
1543                this.save(None).await.map_err(any_err)?;
1544            }
1545            this.shrink_data().map_err(any_err)
1546        });
1547
1548        methods.add_async_method(
1549            "add_authentication_results",
1550            |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1551                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1552                let results = AuthenticationResults {
1553                    serv_id,
1554                    version: None,
1555                    results,
1556                };
1557
1558                this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1559                    .await
1560                    .map_err(any_err)?;
1561
1562                Ok(())
1563            },
1564        );
1565
1566        #[cfg(feature = "impl")]
1567        methods.add_async_method(
1568            "arc_verify",
1569            |lua, this, opt_resolver_name: Option<String>| async move {
1570                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1571                lua.to_value_with(&results, serialize_options())
1572            },
1573        );
1574
1575        #[cfg(feature = "impl")]
1576        methods.add_async_method(
1577            "arc_seal",
1578            |lua,
1579             this,
1580             (signer, serv_id, auth_res, opt_resolver_name): (
1581                Signer,
1582                String,
1583                mlua::Value,
1584                Option<String>,
1585            )| async move {
1586                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1587                this.arc_seal(
1588                    signer,
1589                    AuthenticationResults {
1590                        serv_id,
1591                        version: None,
1592                        results,
1593                    },
1594                    opt_resolver_name,
1595                )
1596                .await
1597                .map_err(any_err)
1598            },
1599        );
1600
1601        #[cfg(feature = "impl")]
1602        methods.add_async_method(
1603            "dkim_verify",
1604            |lua, this, opt_resolver_name: Option<String>| async move {
1605                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1606                lua.to_value_with(&results, serialize_options())
1607            },
1608        );
1609
1610        methods.add_async_method(
1611            "prepend_header",
1612            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1613                let encode = encode.unwrap_or(false);
1614                if encode {
1615                    let header = Header::new_unstructured(name.clone(), value);
1616                    this.prepend_header(Some(&name), header.get_raw_value())
1617                        .await
1618                        .map_err(any_err)?;
1619                } else {
1620                    this.prepend_header(Some(&name), &value)
1621                        .await
1622                        .map_err(any_err)?;
1623                }
1624                Ok(())
1625            },
1626        );
1627        methods.add_async_method(
1628            "append_header",
1629            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1630                let encode = encode.unwrap_or(false);
1631                if encode {
1632                    let header = Header::new_unstructured(name.clone(), value);
1633                    this.append_header(Some(&name), header.get_raw_value())
1634                        .await
1635                        .map_err(any_err)?;
1636                } else {
1637                    this.append_header(Some(&name), &value)
1638                        .await
1639                        .map_err(any_err)?;
1640                }
1641                Ok(())
1642            },
1643        );
1644        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1645            this.get_address_header(&name).await.map_err(any_err)
1646        });
1647        methods.add_async_method("from_header", |_, this, ()| async move {
1648            this.get_address_header("From").await.map_err(any_err)
1649        });
1650        methods.add_async_method("to_header", |_, this, ()| async move {
1651            this.get_address_header("To").await.map_err(any_err)
1652        });
1653
1654        methods.add_async_method(
1655            "get_first_named_header_value",
1656            |_, this, name: String| async move {
1657                this.get_first_named_header_value(&name)
1658                    .await
1659                    .map_err(any_err)
1660            },
1661        );
1662        methods.add_async_method(
1663            "get_all_named_header_values",
1664            |_, this, name: String| async move {
1665                this.get_all_named_header_values(&name)
1666                    .await
1667                    .map_err(any_err)
1668            },
1669        );
1670        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1671            Ok(this
1672                .get_all_headers()
1673                .await
1674                .map_err(any_err)?
1675                .into_iter()
1676                .map(|(name, value)| vec![name, value])
1677                .collect::<Vec<Vec<String>>>())
1678        });
1679        methods.add_async_method(
1680            "import_x_headers",
1681            |_, this, names: Option<Vec<String>>| async move {
1682                this.import_x_headers(names.unwrap_or_default())
1683                    .await
1684                    .map_err(any_err)
1685            },
1686        );
1687
1688        methods.add_async_method(
1689            "remove_x_headers",
1690            |_, this, names: Option<Vec<String>>| async move {
1691                this.remove_x_headers(names.unwrap_or_default())
1692                    .await
1693                    .map_err(any_err)
1694            },
1695        );
1696        methods.add_async_method(
1697            "remove_all_named_headers",
1698            |_, this, name: String| async move {
1699                this.remove_all_named_headers(&name).await.map_err(any_err)
1700            },
1701        );
1702
1703        methods.add_async_method(
1704            "import_scheduling_header",
1705            |lua, this, (header_name, remove): (String, bool)| async move {
1706                let opt_schedule = this
1707                    .import_scheduling_header(&header_name, remove)
1708                    .await
1709                    .map_err(any_err)?;
1710                lua.to_value(&opt_schedule)
1711            },
1712        );
1713
1714        methods.add_async_method(
1715            "set_scheduling",
1716            move |lua, this, params: mlua::Value| async move {
1717                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1718                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1719                lua.to_value(&opt_schedule)
1720            },
1721        );
1722
1723        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1724            let report = this.parse_rfc3464().await.map_err(any_err)?;
1725            match report {
1726                Some(report) => lua.to_value_with(&report, serialize_options()),
1727                None => Ok(mlua::Value::Nil),
1728            }
1729        });
1730
1731        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1732            let report = this.parse_rfc5965().await.map_err(any_err)?;
1733            match report {
1734                Some(report) => lua.to_value_with(&report, serialize_options()),
1735                None => Ok(mlua::Value::Nil),
1736            }
1737        });
1738
1739        methods.add_async_method("save", |_, this, ()| async move {
1740            this.save(None).await.map_err(any_err)
1741        });
1742
1743        methods.add_method("set_force_sync", move |_, this, force: bool| {
1744            this.set_force_sync(force);
1745            Ok(())
1746        });
1747
1748        methods.add_async_method(
1749            "check_fix_conformance",
1750            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1751                use std::str::FromStr;
1752                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1753                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1754
1755                let settings = match settings {
1756                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1757                    None => None,
1758                };
1759
1760                match this
1761                    .check_fix_conformance(check, fix, settings.as_ref())
1762                    .await
1763                {
1764                    Ok(_) => Ok(None),
1765                    Err(err) => Ok(Some(format!("{err:#}"))),
1766                }
1767            },
1768        );
1769    }
1770}
1771
1772#[derive(Default, Debug, Clone)]
1773#[cfg_attr(feature = "impl", derive(Deserialize))]
1774pub struct CheckFixSettings {
1775    pub detect_encoding: bool,
1776    pub include_encodings: Vec<String>,
1777    pub exclude_encodings: Vec<String>,
1778}
1779
1780impl TimerEntryWithDelay for WeakMessage {
1781    fn delay(&self) -> Duration {
1782        match self.upgrade() {
1783            None => {
1784                // Dangling/Cancelled. Make it appear due immediately
1785                Duration::from_millis(0)
1786            }
1787            Some(msg) => msg.delay(),
1788        }
1789    }
1790}
1791
1792impl TimerEntryWithDelay for Message {
1793    fn delay(&self) -> Duration {
1794        let inner = self.msg_and_id.inner.lock();
1795        match inner.due {
1796            Some(time) => {
1797                let now = Utc::now();
1798                let delta = time - now;
1799                delta.to_std().unwrap_or(Duration::from_millis(0))
1800            }
1801            None => Duration::from_millis(0),
1802        }
1803    }
1804}
1805
1806#[cfg(test)]
1807pub(crate) mod test {
1808    use super::*;
1809    use serde_json::json;
1810
1811    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1812        Message::new_dirty(
1813            SpoolId::new(),
1814            EnvelopeAddress::parse("sender@example.com").unwrap(),
1815            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1816            serde_json::json!({}),
1817            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1818        )
1819        .unwrap()
1820    }
1821
1822    fn data_as_string(msg: &Message) -> String {
1823        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1824    }
1825
1826    const X_HDR_CONTENT: &str =
1827        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1828
1829    #[tokio::test]
1830    async fn import_all_x_headers() {
1831        let msg = new_msg_body(X_HDR_CONTENT);
1832
1833        msg.import_x_headers(vec![]).await.unwrap();
1834        k9::assert_equal!(
1835            msg.get_meta_obj().await.unwrap(),
1836            json!({
1837                "x_hello": "there",
1838                "x_header": "value",
1839            })
1840        );
1841    }
1842
1843    #[tokio::test]
1844    async fn meta_and_nil() {
1845        let msg = new_msg_body(X_HDR_CONTENT);
1846        // Ensure that json null round-trips
1847        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1848        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1849
1850        // and that it is exposed to lua as nil
1851        let lua = mlua::Lua::new();
1852        lua.globals().set("msg", msg).unwrap();
1853        lua.load("assert(msg:get_meta('test') == nil)")
1854            .exec()
1855            .unwrap();
1856    }
1857
1858    #[tokio::test]
1859    async fn import_some_x_headers() {
1860        let msg = new_msg_body(X_HDR_CONTENT);
1861
1862        msg.import_x_headers(vec!["x-hello".to_string()])
1863            .await
1864            .unwrap();
1865        k9::assert_equal!(
1866            msg.get_meta_obj().await.unwrap(),
1867            json!({
1868                "x_hello": "there",
1869            })
1870        );
1871    }
1872
1873    #[tokio::test]
1874    async fn remove_all_x_headers() {
1875        let msg = new_msg_body(X_HDR_CONTENT);
1876
1877        msg.remove_x_headers(vec![]).await.unwrap();
1878        k9::assert_equal!(
1879            data_as_string(&msg),
1880            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1881        );
1882    }
1883
1884    #[tokio::test]
1885    async fn prepend_header_2_params() {
1886        let msg = new_msg_body(X_HDR_CONTENT);
1887
1888        msg.prepend_header(Some("Date"), "Today").await.unwrap();
1889        k9::assert_equal!(
1890            data_as_string(&msg),
1891            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1892        );
1893    }
1894
1895    #[tokio::test]
1896    async fn prepend_header_1_params() {
1897        let msg = new_msg_body(X_HDR_CONTENT);
1898
1899        msg.prepend_header(None, "Date: Today").await.unwrap();
1900        k9::assert_equal!(
1901            data_as_string(&msg),
1902            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1903        );
1904    }
1905
1906    #[tokio::test]
1907    async fn append_header_2_params() {
1908        let msg = new_msg_body(X_HDR_CONTENT);
1909
1910        msg.append_header(Some("Date"), "Today").await.unwrap();
1911        k9::assert_equal!(
1912            data_as_string(&msg),
1913            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1914        );
1915    }
1916
1917    #[tokio::test]
1918    async fn append_header_1_params() {
1919        let msg = new_msg_body(X_HDR_CONTENT);
1920
1921        msg.append_header(None, "Date: Today").await.unwrap();
1922        k9::assert_equal!(
1923            data_as_string(&msg),
1924            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1925        );
1926    }
1927
1928    const MULTI_HEADER_CONTENT: &str =
1929        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1930
1931    #[tokio::test]
1932    async fn get_first_header() {
1933        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1934        k9::assert_equal!(
1935            msg.get_first_named_header_value("X-header")
1936                .await
1937                .unwrap()
1938                .unwrap(),
1939            "value"
1940        );
1941    }
1942
1943    #[tokio::test]
1944    async fn get_all_header() {
1945        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1946        k9::assert_equal!(
1947            msg.get_all_named_header_values("X-header").await.unwrap(),
1948            vec!["value".to_string(), "another value".to_string()]
1949        );
1950    }
1951
1952    #[tokio::test]
1953    async fn remove_first() {
1954        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1955        msg.remove_first_named_header("X-header").await.unwrap();
1956        k9::assert_equal!(
1957            data_as_string(&msg),
1958            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1959        );
1960    }
1961
1962    #[tokio::test]
1963    async fn remove_all() {
1964        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1965        msg.remove_all_named_headers("X-header").await.unwrap();
1966        k9::assert_equal!(
1967            data_as_string(&msg),
1968            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1969        );
1970    }
1971
1972    #[tokio::test]
1973    async fn append_text_plain() {
1974        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1975        msg.append_text_plain("I am at the bottom").await.unwrap();
1976        k9::assert_equal!(
1977            data_as_string(&msg),
1978            "X-Hello: there\r\n\
1979             X-Header: value\r\n\
1980             Subject: Hello\r\n\
1981             X-Header: another value\r\n\
1982             From :Someone@somewhere\r\n\
1983             Content-Type: text/plain;\r\n\
1984             \tcharset=\"us-ascii\"\r\n\
1985             \r\n\
1986             Body\r\n\
1987             I am at the bottom\r\n"
1988        );
1989    }
1990
1991    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1992\tboundary=\"my-boundary\"\r\n\
1993\r\n\
1994--my-boundary\r\n\
1995Content-Type: text/plain;\r\n\
1996\tcharset=\"us-ascii\"\r\n\
1997\r\n\
1998plain text\r\n\
1999--my-boundary\r\n\
2000Content-Type: text/html;\r\n\
2001\tcharset=\"us-ascii\"\r\n\
2002\r\n\
2003<b>rich</b> text\r\n\
2004--my-boundary\r\n\
2005Content-Type: application/octet-stream\r\n\
2006Content-Transfer-Encoding: base64\r\n\
2007Content-Disposition: attachment;\r\n\
2008\tfilename=\"woot.bin\"\r\n\
2009Content-ID: <woot.id@somewhere>\r\n\
2010\r\n\
2011AAECAw==\r\n\
2012--my-boundary--\r\n\
2013\r\n";
2014
2015    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2016\tboundary=\"my-boundary\"\r\n\
2017\r\n\
2018--my-boundary\r\n\
2019Content-Type: text/plain;\r\n\
2020\tcharset=\"us-ascii\"\r\n\
2021\r\n\
2022plain text\r\n\
2023--my-boundary\r\n\
2024Content-Type: text/html;\r\n\
2025\tcharset=\"us-ascii\"\r\n\
2026\r\n\
2027<BODY>\r\n\
2028<b>rich</b> text\r\n\
2029</BODY>\r\n\
2030--my-boundary\r\n\
2031Content-Type: application/octet-stream\r\n\
2032Content-Transfer-Encoding: base64\r\n\
2033Content-Disposition: attachment;\r\n\
2034\tfilename=\"woot.bin\"\r\n\
2035Content-ID: <woot.id>\r\n\
2036\r\n\
2037AAECAw==\r\n\
2038--my-boundary--\r\n\
2039\r\n";
2040
2041    #[tokio::test]
2042    async fn append_text_html() {
2043        let msg = new_msg_body(MIXED_CONTENT);
2044        msg.append_text_html("bottom html").await.unwrap();
2045        k9::snapshot!(
2046            data_as_string(&msg),
2047            r#"
2048Content-Type: multipart/mixed;\r
2049\tboundary="my-boundary"\r
2050\r
2051--my-boundary\r
2052Content-Type: text/plain;\r
2053\tcharset="us-ascii"\r
2054\r
2055plain text\r
2056--my-boundary\r
2057Content-Type: text/html;\r
2058\tcharset="us-ascii"\r
2059\r
2060<b>rich</b> text\r
2061\r
2062bottom html\r
2063--my-boundary\r
2064Content-Type: application/octet-stream\r
2065Content-Transfer-Encoding: base64\r
2066Content-Disposition: attachment;\r
2067\tfilename="woot.bin"\r
2068Content-ID: <woot.id@somewhere>\r
2069\r
2070AAECAw==\r
2071--my-boundary--\r
2072\r
2073
2074"#
2075        );
2076
2077        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2078        msg.append_text_html("bottom html 👻").await.unwrap();
2079        k9::snapshot!(
2080            data_as_string(&msg),
2081            r#"
2082Content-Type: multipart/mixed;\r
2083\tboundary="my-boundary"\r
2084\r
2085--my-boundary\r
2086Content-Type: text/plain;\r
2087\tcharset="us-ascii"\r
2088\r
2089plain text\r
2090--my-boundary\r
2091Content-Type: text/html;\r
2092\tcharset="utf-8"\r
2093Content-Transfer-Encoding: quoted-printable\r
2094\r
2095<BODY>\r
2096<b>rich</b> text\r
2097\r
2098bottom html =F0=9F=91=BB</BODY>\r
2099--my-boundary\r
2100Content-Type: application/octet-stream\r
2101Content-Transfer-Encoding: base64\r
2102Content-Disposition: attachment;\r
2103\tfilename="woot.bin"\r
2104Content-ID: <woot.id>\r
2105\r
2106AAECAw==\r
2107--my-boundary--\r
2108\r
2109
2110"#
2111        );
2112    }
2113
2114    #[tokio::test]
2115    async fn append_text_plain_mixed() {
2116        let msg = new_msg_body(MIXED_CONTENT);
2117        msg.append_text_plain("bottom text 👾").await.unwrap();
2118        k9::snapshot!(
2119            data_as_string(&msg),
2120            r#"
2121Content-Type: multipart/mixed;\r
2122\tboundary="my-boundary"\r
2123\r
2124--my-boundary\r
2125Content-Type: text/plain;\r
2126\tcharset="utf-8"\r
2127Content-Transfer-Encoding: quoted-printable\r
2128\r
2129plain text\r
2130\r
2131bottom text =F0=9F=91=BE\r
2132--my-boundary\r
2133Content-Type: text/html;\r
2134\tcharset="us-ascii"\r
2135\r
2136<b>rich</b> text\r
2137--my-boundary\r
2138Content-Type: application/octet-stream\r
2139Content-Transfer-Encoding: base64\r
2140Content-Disposition: attachment;\r
2141\tfilename="woot.bin"\r
2142Content-ID: <woot.id@somewhere>\r
2143\r
2144AAECAw==\r
2145--my-boundary--\r
2146\r
2147
2148"#
2149        );
2150    }
2151
2152    #[tokio::test]
2153    async fn check_conformance_angle_msg_id() {
2154        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2155Message-ID: <<1234@example.com>>\r
2156\r
2157Hello";
2158        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2159        k9::snapshot!(
2160            msg.check_fix_conformance(
2161                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2162                MessageConformance::empty(),
2163                None,
2164            )
2165            .await
2166            .unwrap_err(),
2167            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2168        );
2169
2170        msg.check_fix_conformance(
2171            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2172            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2173            None,
2174        )
2175        .await
2176        .unwrap();
2177
2178        // Can't use a snapshot test here because the fixed header
2179        // has a unique random component
2180        /*
2181                k9::snapshot!(
2182                    data_as_string(&msg),
2183                    r#"
2184        Subject: hello\r
2185        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2186        \r
2187        Hello
2188        "#
2189                );
2190        */
2191
2192        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2193Message-ID: <<1234@example.com>>\r
2194\r
2195Hello this is a really long line Hello this is a really long line \
2196Hello this is a really long line Hello this is a really long line \
2197Hello this is a really long line Hello this is a really long line \
2198Hello this is a really long line Hello this is a really long line \
2199Hello this is a really long line Hello this is a really long line \
2200Hello this is a really long line Hello this is a really long line \
2201Hello this is a really long line Hello this is a really long line
2202";
2203        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2204        msg.check_fix_conformance(
2205            MessageConformance::MISSING_COLON_VALUE,
2206            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2207            None,
2208        )
2209        .await
2210        .unwrap();
2211
2212        // Can't use a snapshot test here because the fixed header
2213        // has a random component
2214        /*
2215                k9::snapshot!(
2216                    data_as_string(&msg),
2217                    r#"
2218        Content-Type: text/plain;\r
2219        \tcharset="us-ascii"\r
2220        Content-Transfer-Encoding: quoted-printable\r
2221        Subject: hello\r
2222        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2223        \r
2224        Hello this is a really long line Hello this is a really long line Hello thi=\r
2225        s is a really long line Hello this is a really long line Hello this is a re=\r
2226        ally long line Hello this is a really long line Hello this is a really long=\r
2227         line Hello this is a really long line Hello this is a really long line Hel=\r
2228        lo this is a really long line Hello this is a really long line Hello this i=\r
2229        s a really long line Hello this is a really long line Hello this is a reall=\r
2230        y long line=0A\r
2231
2232        "#
2233                );
2234        */
2235    }
2236
2237    #[tokio::test]
2238    async fn check_conformance() {
2239        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2240        msg.check_fix_conformance(
2241            MessageConformance::default(),
2242            MessageConformance::MISSING_MIME_VERSION,
2243            None,
2244        )
2245        .await
2246        .unwrap();
2247        k9::snapshot!(
2248            data_as_string(&msg),
2249            r#"
2250X-Hello: there\r
2251X-Header: value\r
2252Subject: Hello\r
2253X-Header: another value\r
2254From :Someone@somewhere\r
2255Mime-Version: 1.0\r
2256\r
2257Body
2258"#
2259        );
2260
2261        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2262        msg.check_fix_conformance(
2263            MessageConformance::default(),
2264            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2265            None,
2266        )
2267        .await
2268        .unwrap();
2269        k9::snapshot!(
2270            data_as_string(&msg),
2271            r#"
2272Content-Type: text/plain;\r
2273\tcharset="us-ascii"\r
2274X-Hello: there\r
2275X-Header: value\r
2276Subject: Hello\r
2277X-Header: another value\r
2278From: <Someone@somewhere>\r
2279Mime-Version: 1.0\r
2280\r
2281Body\r
2282
2283"#
2284        );
2285    }
2286
2287    #[tokio::test]
2288    async fn check_fix_latin_input() {
2289        // Note that the encoding_rs crate unifies latin1 into cp1252
2290        const POUNDS: &str = "Subject: £\r\n\r\nGBP\r\n";
2291        let (content, _, _) = encoding_rs::WINDOWS_1252.encode(POUNDS);
2292        let msg = new_msg_body(&*content);
2293        msg.check_fix_conformance(
2294            MessageConformance::default(),
2295            MessageConformance::NEEDS_TRANSFER_ENCODING,
2296            Some(&CheckFixSettings {
2297                detect_encoding: true,
2298                include_encodings: vec!["iso-8859-1".to_string()],
2299                ..Default::default()
2300            }),
2301        )
2302        .await
2303        .unwrap();
2304
2305        let subject = msg
2306            .get_first_named_header_value("subject")
2307            .await
2308            .unwrap()
2309            .unwrap();
2310        assert_eq!(subject, "£");
2311    }
2312
2313    #[tokio::test]
2314    async fn set_scheduling() -> anyhow::Result<()> {
2315        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2316        assert!(msg.get_due().is_none(), "due is implicitly now");
2317
2318        let now = Utc::now();
2319        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2320
2321        msg.set_scheduling(Some(Scheduling {
2322            restriction: None,
2323            first_attempt: Some((now + one_day).into()),
2324            expires: None,
2325        }))
2326        .await?;
2327
2328        let due = msg.get_due().expect("due to now be set");
2329        assert!(due - now >= one_day, "due time is at least 1 day away");
2330
2331        Ok(())
2332    }
2333
2334    #[cfg(all(test, target_pointer_width = "64"))]
2335    #[test]
2336    fn sizes() {
2337        assert_eq!(std::mem::size_of::<Message>(), 8);
2338        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2339        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2340    }
2341}