message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24    CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use serde::{Deserialize, Serialize};
32use serde_with::formats::PreferOne;
33use serde_with::{serde_as, OneOrMany};
34use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
35use std::hash::Hash;
36use std::sync::{Arc, LazyLock, Weak};
37use std::time::{Duration, Instant};
38use timeq::TimerEntryWithDelay;
39
40bitflags::bitflags! {
41    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
42    struct MessageFlags: u8 {
43        /// true if Metadata needs to be saved
44        const META_DIRTY = 1;
45        /// true if Data needs to be saved
46        const DATA_DIRTY = 2;
47        /// true if scheduling restrictions are present in the metadata
48        const SCHEDULED = 4;
49        /// true if high durability writes should always be used
50        const FORCE_SYNC = 8;
51    }
52}
53
54declare_metric! {
55/// Total number of Message objects.
56///
57/// This encompasses all Message objects in various states, whether
58/// they are in a queue, moving between queues, being built as part
59/// of an injection, pending logging, message metadata and/or data
60/// may be either resident or offloaded to spool.
61static MESSAGE_COUNT: IntGauge("message_count");
62}
63
64declare_metric! {
65/// Total number of Message objects with metadata loaded.
66///
67/// Tracks how many messages have their `meta` data resident
68/// in memory.  This may be because they have not yet saved
69/// it, or because the message is being processed and the
70/// metadata is required for that processing.
71static META_COUNT: IntGauge("message_meta_resident_count");
72}
73
74declare_metric! {
75/// Total number of Message objects with body data loaded.
76///
77/// Tracks how many messages have their `data` resident
78/// in memory.  This may be because they have not yet saved
79/// it, or because the message is being processed and the
80/// data is either required to be in memory in order to
81/// deliver the message, or because logging or other
82/// post-injection policy is configured to operate on
83/// the message.
84static DATA_COUNT: IntGauge("message_data_resident_count");
85}
86
87static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
88
89declare_metric! {
90/// How many seconds it takes to save a message to spool.
91///
92/// This metric encompasses the elapsed time to saved
93/// either or both the `meta` and `data` portions of a
94/// message to spool.
95///
96/// High values indicate IO pressure which may be
97/// alleviated by tuning other constraints and/or
98/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
99static SAVE_HIST: Histogram("message_save_latency");
100}
101
102declare_metric! {
103/// How many seconds it takes to load message data from spool.
104///
105/// High values indicate IO pressure which may be caused
106/// by policy that operates on the message body post-reception.
107/// We recommend *avoiding* logging header values as that is
108/// the most common cause of this metric spiking and has
109/// the biggest impact in resolving it.
110///
111/// IO pressure may also be alleviated by tuning other constraints and/or
112/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
113static LOAD_DATA_HIST: Histogram("message_data_load_latency");
114}
115
116declare_metric! {
117/// How long it takes to load message metadata from spool
118///
119/// High values indicate IO pressure which may be
120/// alleviated by tuning other constraints and/or
121/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
122static LOAD_META_HIST: Histogram("message_meta_load_latency");
123}
124
125#[derive(Debug)]
126struct MessageInner {
127    metadata: Option<Box<MetaData>>,
128    data: Arc<Box<[u8]>>,
129    flags: MessageFlags,
130    num_attempts: u16,
131    due: Option<DateTime<Utc>>,
132}
133
134#[derive(Debug)]
135pub(crate) struct MessageWithId {
136    id: SpoolId,
137    inner: Mutex<MessageInner>,
138    link: LinkedListAtomicLink,
139}
140
141intrusive_adapter!(
142    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
143);
144
145/// A list of messages with an O(1) list overhead; no additional
146/// memory per-message is required to track this list.
147/// However, a given Message can only belong to one instance
148/// of such a list at a time.
149pub struct MessageList {
150    list: LinkedList<MessageWithIdAdapter>,
151    len: usize,
152}
153
154impl Default for MessageList {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160impl MessageList {
161    /// Create a new MessageList
162    pub fn new() -> Self {
163        Self {
164            list: LinkedList::new(MessageWithIdAdapter::default()),
165            len: 0,
166        }
167    }
168
169    /// Returns the number of elements contained in the list
170    pub fn len(&self) -> usize {
171        self.len
172    }
173
174    pub fn is_empty(&self) -> bool {
175        self.len == 0
176    }
177
178    /// Take all of the elements from this list and return them
179    /// in a new separate instance of MessageList.
180    pub fn take(&mut self) -> Self {
181        let new_list = Self {
182            list: self.list.take(),
183            len: self.len,
184        };
185        self.len = 0;
186        new_list
187    }
188
189    /// Push a message to the back of the list
190    pub fn push_back(&mut self, message: Message) {
191        self.list.push_back(message.msg_and_id);
192        self.len += 1;
193    }
194
195    /// Pop a message from the front of the list
196    pub fn pop_front(&mut self) -> Option<Message> {
197        self.list.pop_front().map(|msg_and_id| {
198            self.len -= 1;
199            Message { msg_and_id }
200        })
201    }
202
203    /// Pop a message from the back of the list
204    pub fn pop_back(&mut self) -> Option<Message> {
205        self.list.pop_back().map(|msg_and_id| {
206            self.len -= 1;
207            Message { msg_and_id }
208        })
209    }
210
211    /// Pop all of the messages from this list into a vector
212    /// of messages.
213    /// Memory usage is O(number-of-messages).
214    pub fn drain(&mut self) -> Vec<Message> {
215        let mut messages = Vec::with_capacity(self.len);
216        while let Some(msg) = self.pop_front() {
217            messages.push(msg);
218        }
219        messages
220    }
221
222    pub fn extend_from_iter<I>(&mut self, iter: I)
223    where
224        I: Iterator<Item = Message>,
225    {
226        for msg in iter {
227            self.push_back(msg)
228        }
229    }
230}
231
232impl IntoIterator for MessageList {
233    type Item = Message;
234    type IntoIter = MessageListIter;
235    fn into_iter(self) -> MessageListIter {
236        MessageListIter { list: self.list }
237    }
238}
239
240pub struct MessageListIter {
241    list: LinkedList<MessageWithIdAdapter>,
242}
243
244impl Iterator for MessageListIter {
245    type Item = Message;
246    fn next(&mut self) -> Option<Message> {
247        self.list
248            .pop_front()
249            .map(|msg_and_id| Message { msg_and_id })
250    }
251}
252
253#[derive(Clone, Debug)]
254#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
255pub struct Message {
256    pub(crate) msg_and_id: Arc<MessageWithId>,
257}
258
259impl PartialEq for Message {
260    fn eq(&self, other: &Self) -> bool {
261        self.id() == other.id()
262    }
263}
264impl Eq for Message {}
265
266impl Hash for Message {
267    fn hash<H>(&self, hasher: &mut H)
268    where
269        H: std::hash::Hasher,
270    {
271        self.id().hash(hasher)
272    }
273}
274
275#[derive(Clone, Debug)]
276pub struct WeakMessage {
277    weak: Weak<MessageWithId>,
278}
279
280impl WeakMessage {
281    pub fn upgrade(&self) -> Option<Message> {
282        Some(Message {
283            msg_and_id: self.weak.upgrade()?,
284        })
285    }
286}
287
288#[serde_as]
289#[derive(Debug, Serialize, Deserialize, Clone)]
290pub(crate) struct MetaData {
291    pub sender: EnvelopeAddress,
292    #[serde_as(as = "OneOrMany<_, PreferOne>")]
293    pub recipient: Vec<EnvelopeAddress>,
294    pub meta: serde_json::Value,
295    #[serde(default)]
296    pub schedule: Option<Scheduling>,
297}
298
299impl Drop for MessageInner {
300    fn drop(&mut self) {
301        if self.metadata.is_some() {
302            META_COUNT.dec();
303        }
304        if !self.data.is_empty() {
305            DATA_COUNT.dec();
306        }
307        MESSAGE_COUNT.dec();
308    }
309}
310
311impl Message {
312    /// Create a new message with the supplied data.
313    /// The message meta and data are marked as dirty
314    pub fn new_dirty(
315        id: SpoolId,
316        sender: EnvelopeAddress,
317        recipient: Vec<EnvelopeAddress>,
318        meta: serde_json::Value,
319        data: Arc<Box<[u8]>>,
320    ) -> anyhow::Result<Self> {
321        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
322        MESSAGE_COUNT.inc();
323        DATA_COUNT.inc();
324        META_COUNT.inc();
325        Ok(Self {
326            msg_and_id: Arc::new(MessageWithId {
327                id,
328                inner: Mutex::new(MessageInner {
329                    metadata: Some(Box::new(MetaData {
330                        sender,
331                        recipient,
332                        meta,
333                        schedule: None,
334                    })),
335                    data,
336                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
337                    num_attempts: 0,
338                    due: None,
339                }),
340                link: LinkedListAtomicLink::default(),
341            }),
342        })
343    }
344
345    pub fn weak(&self) -> WeakMessage {
346        WeakMessage {
347            weak: Arc::downgrade(&self.msg_and_id),
348        }
349    }
350
351    /// Helper for creating a message based on spool enumeration.
352    /// Given a spool id and the serialized metadata blob, returns
353    /// a message holding the deserialized version of that metadata.
354    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
355        let metadata: MetaData = serde_json::from_slice(&metadata)?;
356        MESSAGE_COUNT.inc();
357        META_COUNT.inc();
358
359        let flags = if metadata.schedule.is_some() {
360            MessageFlags::SCHEDULED
361        } else {
362            MessageFlags::empty()
363        };
364
365        Ok(Self {
366            msg_and_id: Arc::new(MessageWithId {
367                id,
368                inner: Mutex::new(MessageInner {
369                    metadata: Some(Box::new(metadata)),
370                    data: NO_DATA.clone(),
371                    flags,
372                    num_attempts: 0,
373                    due: None,
374                }),
375                link: LinkedListAtomicLink::default(),
376            }),
377        })
378    }
379
380    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
381        MESSAGE_COUNT.inc();
382        META_COUNT.inc();
383
384        let flags = if metadata.schedule.is_some() {
385            MessageFlags::SCHEDULED
386        } else {
387            MessageFlags::empty()
388        };
389
390        Self {
391            msg_and_id: Arc::new(MessageWithId {
392                id,
393                inner: Mutex::new(MessageInner {
394                    metadata: Some(Box::new(metadata)),
395                    data,
396                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
397                    num_attempts: 0,
398                    due: None,
399                }),
400                link: LinkedListAtomicLink::default(),
401            }),
402        }
403    }
404
405    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
406        let meta_spool = get_meta_spool();
407        let data = meta_spool.load(id).await?;
408        Self::new_from_spool(id, data)
409    }
410
411    pub fn get_num_attempts(&self) -> u16 {
412        let inner = self.msg_and_id.inner.lock();
413        inner.num_attempts
414    }
415
416    pub fn set_num_attempts(&self, num_attempts: u16) {
417        let mut inner = self.msg_and_id.inner.lock();
418        inner.num_attempts = num_attempts;
419    }
420
421    pub fn increment_num_attempts(&self) {
422        let mut inner = self.msg_and_id.inner.lock();
423        inner.num_attempts += 1;
424    }
425
426    pub async fn set_scheduling(
427        &self,
428        scheduling: Option<Scheduling>,
429    ) -> anyhow::Result<Option<Scheduling>> {
430        self.load_meta_if_needed().await?;
431        let mut inner = self.msg_and_id.inner.lock();
432        match &mut inner.metadata {
433            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
434            Some(meta) => {
435                meta.schedule = scheduling;
436                inner
437                    .flags
438                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
439                if let Some(sched) = scheduling {
440                    let due = inner.due.unwrap_or_else(Utc::now);
441                    inner.due = Some(sched.adjust_for_schedule(due));
442                }
443                Ok(scheduling)
444            }
445        }
446    }
447
448    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
449        self.load_meta_if_needed().await?;
450        let inner = self.msg_and_id.inner.lock();
451        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
452    }
453
454    pub fn get_due(&self) -> Option<DateTime<Utc>> {
455        let inner = self.msg_and_id.inner.lock();
456        inner.due
457    }
458
459    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
460        let scale = rand::random::<f32>();
461        let value = (scale * limit as f32) as i64;
462        self.delay_by(seconds(value)?).await
463    }
464
465    pub async fn delay_by(
466        &self,
467        duration: chrono::Duration,
468    ) -> anyhow::Result<Option<DateTime<Utc>>> {
469        let due = Utc::now() + duration;
470        self.set_due(Some(due)).await
471    }
472
473    /// Delay by requested duration, and add up to 1 minute of jitter
474    pub async fn delay_by_and_jitter(
475        &self,
476        duration: chrono::Duration,
477    ) -> anyhow::Result<Option<DateTime<Utc>>> {
478        let scale = rand::random::<f32>();
479        let value = (scale * 60.) as i64;
480        let due = Utc::now() + duration + seconds(value)?;
481        self.set_due(Some(due)).await
482    }
483
484    pub async fn set_due(
485        &self,
486        due: Option<DateTime<Utc>>,
487    ) -> anyhow::Result<Option<DateTime<Utc>>> {
488        let due = {
489            let mut inner = self.msg_and_id.inner.lock();
490
491            if !inner.flags.contains(MessageFlags::SCHEDULED) {
492                // This is the simple, fast-path, common case
493                inner.due = due;
494                return Ok(inner.due);
495            }
496
497            let due = due.unwrap_or_else(Utc::now);
498
499            if let Some(meta) = &inner.metadata {
500                inner.due = match &meta.schedule {
501                    Some(sched) => Some(sched.adjust_for_schedule(due)),
502                    None => Some(due),
503                };
504                return Ok(inner.due);
505            }
506
507            // We'll need to load the metadata to correctly
508            // update the schedule for this message
509            due
510        };
511
512        self.load_meta().await?;
513
514        {
515            let mut inner = self.msg_and_id.inner.lock();
516            match &inner.metadata {
517                Some(meta) => {
518                    inner.due = match &meta.schedule {
519                        Some(sched) => Some(sched.adjust_for_schedule(due)),
520                        None => Some(due),
521                    };
522                    Ok(inner.due)
523                }
524                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
525            }
526        }
527    }
528
529    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
530        let inner = self.msg_and_id.inner.lock();
531        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
532            Some(Arc::clone(&inner.data))
533        } else {
534            None
535        }
536    }
537
538    fn get_meta_if_dirty(&self) -> Option<MetaData> {
539        let inner = self.msg_and_id.inner.lock();
540        if inner.flags.contains(MessageFlags::META_DIRTY) {
541            inner.metadata.as_ref().map(|md| (**md).clone())
542        } else {
543            None
544        }
545    }
546
547    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
548        self.load_meta_if_needed().await?;
549        let inner = self.msg_and_id.inner.lock();
550        inner
551            .metadata
552            .as_ref()
553            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
554            .map(|md| (**md).clone())
555    }
556
557    pub fn set_force_sync(&self, force: bool) {
558        let mut inner = self.msg_and_id.inner.lock();
559        inner.flags.set(MessageFlags::FORCE_SYNC, force);
560    }
561
562    pub fn needs_save(&self) -> bool {
563        let inner = self.msg_and_id.inner.lock();
564        inner.flags.contains(MessageFlags::META_DIRTY)
565            || inner.flags.contains(MessageFlags::DATA_DIRTY)
566    }
567
568    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
569        let _timer = SAVE_HIST.start_timer();
570        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
571            .await
572    }
573
574    pub async fn save_to(
575        &self,
576        meta_spool: &(dyn Spool + Send + Sync),
577        data_spool: &(dyn Spool + Send + Sync),
578        deadline: Option<Instant>,
579    ) -> anyhow::Result<()> {
580        let force_sync = self
581            .msg_and_id
582            .inner
583            .lock()
584            .flags
585            .contains(MessageFlags::FORCE_SYNC);
586
587        let data_fut = if let Some(data) = self.get_data_if_dirty() {
588            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
589            data_spool
590                .store(self.msg_and_id.id, data, force_sync, deadline)
591                .map(|_| true)
592                .boxed()
593        } else {
594            futures::future::ready(false).boxed()
595        };
596        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
597            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
598            meta_spool
599                .store(self.msg_and_id.id, meta, force_sync, deadline)
600                .map(|_| true)
601                .boxed()
602        } else {
603            futures::future::ready(false).boxed()
604        };
605
606        // NOTE: if we have a deadline, it is tempting to want to use
607        // timeout_at here to enforce it, but the underlying spool
608        // futures are not guaranteed to be fully cancel safe, which
609        // is why we pass the deadline down to the save method to allow
610        // them to handle timeouts internally.
611        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
612
613        if data_res {
614            self.msg_and_id
615                .inner
616                .lock()
617                .flags
618                .remove(MessageFlags::DATA_DIRTY);
619        }
620        if meta_res {
621            self.msg_and_id
622                .inner
623                .lock()
624                .flags
625                .remove(MessageFlags::META_DIRTY);
626        }
627        Ok(())
628    }
629
630    pub fn id(&self) -> &SpoolId {
631        &self.msg_and_id.id
632    }
633
634    /// Save the data+meta if needed, then release both
635    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
636        self.save(None).await?;
637        self.shrink()
638    }
639
640    /// Save the data+meta if needed, then release just the data
641    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
642        self.save(None).await?;
643        self.shrink_data()
644    }
645
646    pub fn shrink_data(&self) -> anyhow::Result<bool> {
647        let mut inner = self.msg_and_id.inner.lock();
648        let mut did_shrink = false;
649        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
650            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
651        }
652        if !inner.data.is_empty() {
653            DATA_COUNT.dec();
654            did_shrink = true;
655        }
656        if !inner.data.is_empty() {
657            inner.data = NO_DATA.clone();
658            did_shrink = true;
659        }
660        Ok(did_shrink)
661    }
662
663    pub fn shrink(&self) -> anyhow::Result<bool> {
664        let mut inner = self.msg_and_id.inner.lock();
665        let mut did_shrink = false;
666        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
667            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
668        }
669        if inner.flags.contains(MessageFlags::META_DIRTY) {
670            anyhow::bail!("Cannot shrink message: META_DIRTY");
671        }
672        if inner.metadata.take().is_some() {
673            META_COUNT.dec();
674            did_shrink = true;
675        }
676        if !inner.data.is_empty() {
677            DATA_COUNT.dec();
678            did_shrink = true;
679        }
680        if !inner.data.is_empty() {
681            inner.data = NO_DATA.clone();
682            did_shrink = true;
683        }
684        Ok(did_shrink)
685    }
686
687    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
688        self.load_meta_if_needed().await?;
689        let inner = self.msg_and_id.inner.lock();
690        match &inner.metadata {
691            Some(meta) => Ok(meta.sender.clone()),
692            None => anyhow::bail!("Message::sender metadata is not loaded"),
693        }
694    }
695
696    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
697        self.load_meta_if_needed().await?;
698        let mut inner = self.msg_and_id.inner.lock();
699        match &mut inner.metadata {
700            Some(meta) => {
701                meta.sender = sender;
702                inner.flags.set(MessageFlags::DATA_DIRTY, true);
703                Ok(())
704            }
705            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
706        }
707    }
708
709    #[deprecated = "use recipient_list or first_recipient instead"]
710    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
711        self.first_recipient().await
712    }
713
714    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
715        self.load_meta_if_needed().await?;
716        let inner = self.msg_and_id.inner.lock();
717        match &inner.metadata {
718            Some(meta) => match meta.recipient.first() {
719                Some(recip) => Ok(recip.clone()),
720                None => anyhow::bail!("recipient list is empty!?"),
721            },
722            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
723        }
724    }
725
726    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
727        self.load_meta_if_needed().await?;
728        let inner = self.msg_and_id.inner.lock();
729        match &inner.metadata {
730            Some(meta) => Ok(meta.recipient.clone()),
731            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
732        }
733    }
734
735    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
736        self.load_meta_if_needed().await?;
737        let inner = self.msg_and_id.inner.lock();
738        match &inner.metadata {
739            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
740            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
741        }
742    }
743
744    #[deprecated = "use set_recipient_list instead"]
745    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
746        self.set_recipient_list(vec![recipient]).await
747    }
748
749    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
750        self.load_meta_if_needed().await?;
751
752        let mut inner = self.msg_and_id.inner.lock();
753        match &mut inner.metadata {
754            Some(meta) => {
755                meta.recipient = recipient;
756                inner.flags.set(MessageFlags::DATA_DIRTY, true);
757                Ok(())
758            }
759            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
760        }
761    }
762
763    pub fn is_meta_loaded(&self) -> bool {
764        self.msg_and_id.inner.lock().metadata.is_some()
765    }
766
767    pub fn is_data_loaded(&self) -> bool {
768        !self.msg_and_id.inner.lock().data.is_empty()
769    }
770
771    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
772        if self.is_meta_loaded() {
773            return Ok(());
774        }
775        self.load_meta().await
776    }
777
778    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
779        self.load_data_if_needed().await
780    }
781
782    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
783        if self.is_data_loaded() {
784            return Ok(self.get_data_maybe_not_loaded());
785        }
786        self.load_data().await
787    }
788
789    pub async fn load_meta(&self) -> anyhow::Result<()> {
790        let _timer = LOAD_META_HIST.start_timer();
791        self.load_meta_from(&**get_meta_spool()).await
792    }
793
794    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
795        let id = self.id();
796        let data = meta_spool.load(*id).await?;
797        let mut inner = self.msg_and_id.inner.lock();
798        let was_not_loaded = inner.metadata.is_none();
799        let metadata: MetaData = serde_json::from_slice(&data)?;
800        inner.metadata.replace(Box::new(metadata));
801        if was_not_loaded {
802            META_COUNT.inc();
803        }
804        Ok(())
805    }
806
807    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
808        let _timer = LOAD_DATA_HIST.start_timer();
809        self.load_data_from(&**get_data_spool()).await
810    }
811
812    async fn load_data_from(
813        &self,
814        data_spool: &(dyn Spool + Send + Sync),
815    ) -> anyhow::Result<Arc<Box<[u8]>>> {
816        let data = data_spool.load(*self.id()).await?;
817        let mut inner = self.msg_and_id.inner.lock();
818        let was_empty = inner.data.is_empty();
819        inner.data = Arc::new(data.into_boxed_slice());
820        if was_empty {
821            DATA_COUNT.inc();
822        }
823        Ok(inner.data.clone())
824    }
825
826    pub fn assign_data(&self, data: Vec<u8>) {
827        let mut inner = self.msg_and_id.inner.lock();
828        let was_empty = inner.data.is_empty();
829        inner.data = Arc::new(data.into_boxed_slice());
830        inner.flags.set(MessageFlags::DATA_DIRTY, true);
831        if was_empty {
832            DATA_COUNT.inc();
833        }
834    }
835
836    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
837        let inner = self.msg_and_id.inner.lock();
838        inner.data.clone()
839    }
840
841    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
842        &self,
843        key: S,
844        value: V,
845    ) -> anyhow::Result<()> {
846        self.load_meta_if_needed().await?;
847        let mut inner = self.msg_and_id.inner.lock();
848        match &mut inner.metadata {
849            None => anyhow::bail!("set_meta: metadata must be loaded first"),
850            Some(meta) => {
851                let key = key.as_ref();
852                let value = value.into();
853
854                match &mut meta.meta {
855                    serde_json::Value::Object(map) => {
856                        map.insert(key.to_string(), value);
857                    }
858                    _ => anyhow::bail!("metadata is somehow not a json object"),
859                }
860
861                inner.flags.set(MessageFlags::META_DIRTY, true);
862                Ok(())
863            }
864        }
865    }
866
867    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
868        self.load_meta_if_needed().await?;
869        let mut inner = self.msg_and_id.inner.lock();
870        match &mut inner.metadata {
871            None => anyhow::bail!("set_meta: metadata must be loaded first"),
872            Some(meta) => {
873                let key = key.as_ref();
874
875                match &mut meta.meta {
876                    serde_json::Value::Object(map) => {
877                        map.remove(key);
878                    }
879                    _ => anyhow::bail!("metadata is somehow not a json object"),
880                }
881
882                inner.flags.set(MessageFlags::META_DIRTY, true);
883                Ok(())
884            }
885        }
886    }
887
888    /// Retrieve `key` as a String.
889    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
890        &self,
891        key: S,
892    ) -> anyhow::Result<Option<String>> {
893        match self.get_meta(key).await {
894            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
895            Ok(serde_json::Value::Null) => Ok(None),
896            hmm => {
897                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
898            }
899        }
900    }
901
902    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
903        self.load_meta_if_needed().await?;
904        let inner = self.msg_and_id.inner.lock();
905        match &inner.metadata {
906            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
907            Some(meta) => Ok(meta.meta.clone()),
908        }
909    }
910
911    pub async fn get_meta<S: serde_json::value::Index>(
912        &self,
913        key: S,
914    ) -> anyhow::Result<serde_json::Value> {
915        self.load_meta_if_needed().await?;
916        let inner = self.msg_and_id.inner.lock();
917        match &inner.metadata {
918            None => anyhow::bail!("get_meta: metadata must be loaded first"),
919            Some(meta) => match meta.meta.get(key) {
920                Some(value) => Ok(value.clone()),
921                None => Ok(serde_json::Value::Null),
922            },
923        }
924    }
925
926    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
927        self.msg_and_id.id.age(now)
928    }
929
930    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
931        Ok(match self.get_meta_string("queue").await? {
932            Some(name) => name,
933            None => {
934                let name = QueueNameComponents::format(
935                    self.get_meta_string("campaign").await?,
936                    self.get_meta_string("tenant").await?,
937                    self.first_recipient()
938                        .await?
939                        .domain()
940                        .to_string()
941                        .to_lowercase(),
942                    self.get_meta_string("routing_domain").await?,
943                );
944                name.to_string()
945            }
946        })
947    }
948
949    #[cfg(feature = "impl")]
950    pub async fn arc_verify(
951        &self,
952        opt_resolver_name: Option<String>,
953    ) -> anyhow::Result<AuthenticationResult> {
954        let resolver = get_resolver_instance(&opt_resolver_name)?;
955        let data = self.data().await?;
956        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
957
958        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
959        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
960
961        let arc = ARC::verify(&message, &**resolver).await;
962        Ok(arc.authentication_result())
963    }
964
965    #[cfg(feature = "impl")]
966    pub async fn arc_seal(
967        &self,
968        signer: Signer,
969        auth_results: AuthenticationResults,
970        opt_resolver_name: Option<String>,
971    ) -> anyhow::Result<()> {
972        let resolver = get_resolver_instance(&opt_resolver_name)?;
973        let data = self.data().await?;
974        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
975        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
976        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
977        let arc = ARC::verify(&message, &**resolver).await;
978
979        let headers = arc.seal(&message, auth_results, signer.signer())?;
980        if !headers.is_empty() {
981            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
982
983            for hdr in headers {
984                hdr.write_header(&mut new_data).ok();
985            }
986            new_data.extend_from_slice(&data);
987            self.assign_data(new_data);
988        }
989
990        Ok(())
991    }
992
993    #[cfg(feature = "impl")]
994    pub async fn dkim_verify(
995        &self,
996        opt_resolver_name: Option<String>,
997    ) -> anyhow::Result<Vec<AuthenticationResult>> {
998        let resolver = get_resolver_instance(&opt_resolver_name)?;
999        let data = self.data().await?;
1000        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1001
1002        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1003        if parsed
1004            .overall_conformance
1005            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1006        {
1007            return Ok(vec![AuthenticationResult {
1008                method: "dkim".to_string(),
1009                method_version: None,
1010                result: "permerror".to_string(),
1011                reason: Some("message has non-canonical line endings".to_string()),
1012                props: Default::default(),
1013            }]);
1014        }
1015        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1016
1017        let from = match message.get_headers().from() {
1018            Ok(Some(from)) if from.0.len() == 1 => from.0,
1019            Ok(Some(from)) => {
1020                return Ok(vec![AuthenticationResult {
1021                    method: "dkim".to_string(),
1022                    method_version: None,
1023                    result: "permerror".to_string(),
1024                    reason: Some(format!(
1025                        "From header must have a single sender, found {}",
1026                        from.len()
1027                    )),
1028                    props: Default::default(),
1029                }]);
1030            }
1031            Ok(None) => {
1032                return Ok(vec![AuthenticationResult {
1033                    method: "dkim".to_string(),
1034                    method_version: None,
1035                    result: "permerror".to_string(),
1036                    reason: Some("From header is missing".to_string()),
1037                    props: Default::default(),
1038                }]);
1039            }
1040            Err(err) => {
1041                return Ok(vec![AuthenticationResult {
1042                    method: "dkim".to_string(),
1043                    method_version: None,
1044                    result: "permerror".to_string(),
1045                    reason: Some(format!("From header is invalid: {err:#}")),
1046                    props: Default::default(),
1047                }]);
1048            }
1049        };
1050        let from_domain = &from[0].address.domain;
1051
1052        let results =
1053            kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1054        Ok(results)
1055    }
1056
1057    /// Parses the content into an owned MimePart.
1058    /// Changes to that MimePart are NOT reflected in the underlying
1059    /// message; you must re-assign the message data if you wish to modify
1060    /// the message content.
1061    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1062        let data = self.data().await?;
1063        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1064        Ok(MimePart::parse(owned_data)?)
1065    }
1066
1067    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1068        let data = self.data().await?;
1069        Report::parse(&data)
1070    }
1071
1072    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1073        let data = self.data().await?;
1074        ARFReport::parse(&data)
1075    }
1076
1077    pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1078        let data = self.data().await?;
1079        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1080        emit_header(&mut new_data, name, value);
1081        new_data.extend_from_slice(&data);
1082        self.assign_data(new_data);
1083        Ok(())
1084    }
1085
1086    pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1087        let data = self.data().await?;
1088        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1089        for (idx, window) in data.windows(4).enumerate() {
1090            if window == b"\r\n\r\n" {
1091                let headers = &data[0..idx + 2];
1092                let body = &data[idx + 2..];
1093
1094                new_data.extend_from_slice(headers);
1095                emit_header(&mut new_data, name, value);
1096                new_data.extend_from_slice(body);
1097                self.assign_data(new_data);
1098                return Ok(());
1099            }
1100        }
1101
1102        anyhow::bail!("append_header could not find the end of the header block");
1103    }
1104
1105    pub async fn get_address_header(
1106        &self,
1107        header_name: &str,
1108    ) -> anyhow::Result<Option<HeaderAddressList>> {
1109        let data = self.data().await?;
1110        let HeaderParseResult { headers, .. } =
1111            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1112
1113        match headers.get_first(header_name) {
1114            Some(hdr) => {
1115                let list = hdr.as_address_list()?;
1116                let result: HeaderAddressList = list.into();
1117                Ok(Some(result))
1118            }
1119            None => Ok(None),
1120        }
1121    }
1122
1123    pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1124        let data = self.data().await?;
1125        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1126
1127        match headers.get_first(name) {
1128            Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1129            None => Ok(None),
1130        }
1131    }
1132
1133    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1134        let data = self.data().await?;
1135        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1136
1137        let mut values = vec![];
1138        for hdr in headers.iter_named(name) {
1139            values.push(hdr.as_unstructured()?);
1140        }
1141        Ok(values)
1142    }
1143
1144    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1145        let data = self.data().await?;
1146        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1147
1148        let mut values = vec![];
1149        for hdr in headers.iter() {
1150            values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1151        }
1152        Ok(values)
1153    }
1154
1155    pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1156        &self,
1157        mut func: F,
1158    ) -> anyhow::Result<()> {
1159        let data = self.data().await?;
1160        let mut new_data = Vec::with_capacity(data.len());
1161        let HeaderParseResult {
1162            headers,
1163            body_offset,
1164            ..
1165        } = Header::parse_headers(data.as_ref().as_ref())?;
1166        for hdr in headers.iter() {
1167            let retain = (func)(hdr);
1168            if !retain {
1169                continue;
1170            }
1171            hdr.write_header(&mut new_data)?;
1172        }
1173        new_data.extend_from_slice(b"\r\n");
1174        new_data.extend_from_slice(&data[body_offset..]);
1175        self.assign_data(new_data);
1176        Ok(())
1177    }
1178
1179    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1180        let mut removed = false;
1181        self.retain_headers(|hdr| {
1182            if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1183                removed = true;
1184                false
1185            } else {
1186                true
1187            }
1188        })
1189        .await
1190    }
1191
1192    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1193        let data = self.data().await?;
1194        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1195
1196        for hdr in headers.iter() {
1197            let do_import = if names.is_empty() {
1198                is_x_header(hdr.get_name())
1199            } else {
1200                is_header_in_names_list(hdr.get_name(), &names)
1201            };
1202            if do_import {
1203                let name = imported_header_name(hdr.get_name());
1204                self.set_meta(name, hdr.as_unstructured()?).await?;
1205            }
1206        }
1207
1208        Ok(())
1209    }
1210
1211    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1212        self.retain_headers(|hdr| {
1213            if names.is_empty() {
1214                !is_x_header(hdr.get_name())
1215            } else {
1216                !is_header_in_names_list(hdr.get_name(), &names)
1217            }
1218        })
1219        .await
1220    }
1221
1222    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1223        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1224            .await
1225    }
1226
1227    #[cfg(feature = "impl")]
1228    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1229        let data = self.data().await?;
1230        let header = if let Some(runtime) = SIGN_POOL.get() {
1231            runtime.spawn_blocking(move || signer.sign(&data)).await??
1232        } else {
1233            signer.sign(&data)?
1234        };
1235        self.prepend_header(None, &header).await?;
1236        Ok(())
1237    }
1238
1239    pub async fn import_scheduling_header(
1240        &self,
1241        header_name: &str,
1242        remove: bool,
1243    ) -> anyhow::Result<Option<Scheduling>> {
1244        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1245            let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1246                format!("{value} from header {header_name} is not a valid Scheduling header")
1247            })?;
1248            let result = self.set_scheduling(Some(sched)).await?;
1249
1250            if remove {
1251                self.remove_all_named_headers(header_name).await?;
1252            }
1253
1254            Ok(result)
1255        } else {
1256            Ok(None)
1257        }
1258    }
1259
1260    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1261        let data = self.data().await?;
1262        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1263        let parts = msg.simplified_structure_pointers()?;
1264        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1265            match p.body()? {
1266                DecodedBody::Text(text) => {
1267                    let mut text = text.as_str().to_string();
1268                    text.push_str("\r\n");
1269                    text.push_str(content);
1270                    p.replace_text_body("text/plain", &text)?;
1271
1272                    let new_data = msg.to_message_string();
1273                    self.assign_data(new_data.into_bytes());
1274                    Ok(true)
1275                }
1276                DecodedBody::Binary(_) => {
1277                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1278                }
1279            }
1280        } else {
1281            Ok(false)
1282        }
1283    }
1284
1285    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1286        let data = self.data().await?;
1287        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1288        let parts = msg.simplified_structure_pointers()?;
1289        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1290            match p.body()? {
1291                DecodedBody::Text(text) => {
1292                    let mut text = text.as_str().to_string();
1293
1294                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1295                        Some(idx) => {
1296                            text.insert_str(idx, content);
1297                            text.insert_str(idx, "\r\n");
1298                        }
1299                        None => {
1300                            // Just append
1301                            text.push_str("\r\n");
1302                            text.push_str(content);
1303                        }
1304                    }
1305
1306                    p.replace_text_body("text/html", &text)?;
1307
1308                    let new_data = msg.to_message_string();
1309                    self.assign_data(new_data.into_bytes());
1310                    Ok(true)
1311                }
1312                DecodedBody::Binary(_) => {
1313                    anyhow::bail!("expected text/html part to be text, but it is binary");
1314                }
1315            }
1316        } else {
1317            Ok(false)
1318        }
1319    }
1320
1321    pub async fn check_fix_conformance(
1322        &self,
1323        check: MessageConformance,
1324        fix: MessageConformance,
1325        settings: Option<&CheckFixSettings>,
1326    ) -> anyhow::Result<()> {
1327        let data = self.data().await?;
1328        let data_bytes = data.as_ref().as_ref();
1329        let msg = MimePart::parse(data_bytes)?;
1330
1331        let mut settings = settings.map(Clone::clone).unwrap_or_default();
1332
1333        if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1334            && settings.message_id.is_none()
1335            && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1336        {
1337            let sender = self.sender().await?;
1338            let domain = sender.domain();
1339            let id = *self.id();
1340            settings.message_id.replace(format!("{id}@{domain}"));
1341        }
1342
1343        if settings.detect_encoding {
1344            settings.data_bytes.replace(data.clone());
1345        }
1346
1347        let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1348
1349        if let Some(msg) = opt_msg {
1350            let new_data = msg.to_message_string();
1351            self.assign_data(new_data.into_bytes());
1352        }
1353
1354        Ok(())
1355    }
1356}
1357
1358fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1359    for name in names {
1360        if hdr_name.eq_ignore_ascii_case(name) {
1361            return true;
1362        }
1363    }
1364    false
1365}
1366
1367fn imported_header_name(name: &str) -> String {
1368    name.chars()
1369        .map(|c| match c.to_ascii_lowercase() {
1370            '-' => '_',
1371            c => c,
1372        })
1373        .collect()
1374}
1375
1376fn is_x_header(name: &str) -> bool {
1377    name.starts_with("X-") || name.starts_with("x-")
1378}
1379
1380fn size_header(name: Option<&str>, value: &str) -> usize {
1381    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1382}
1383
1384fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1385    if let Some(name) = name {
1386        dest.extend_from_slice(name.as_bytes());
1387        dest.extend_from_slice(b": ");
1388    }
1389    dest.extend_from_slice(value.as_bytes());
1390    if !value.ends_with("\r\n") {
1391        dest.extend_from_slice(b"\r\n");
1392    }
1393}
1394
1395#[cfg(feature = "impl")]
1396impl UserData for Message {
1397    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1398        methods.add_async_method(
1399            "set_meta",
1400            move |_, this, (name, value): (String, mlua::Value)| async move {
1401                let value = serde_json::value::to_value(value).map_err(any_err)?;
1402                this.set_meta(name, value).await.map_err(any_err)?;
1403                Ok(())
1404            },
1405        );
1406        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1407            let value = this.get_meta(name).await.map_err(any_err)?;
1408            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1409        });
1410        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1411            let data = this.data().await.map_err(any_err)?;
1412            lua.create_string(&*data)
1413        });
1414        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1415            this.assign_data(data.as_bytes().to_vec());
1416            Ok(())
1417        });
1418
1419        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1420            let data = this.data().await.map_err(any_err)?;
1421            let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1422            let part = MimePart::parse(owned_data).map_err(any_err)?;
1423            Ok(mod_mimepart::PartRef::new(part))
1424        });
1425
1426        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1427            this.append_text_plain(&data).await.map_err(any_err)
1428        });
1429
1430        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1431            this.append_text_html(&data).await.map_err(any_err)
1432        });
1433
1434        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1435        methods.add_async_method("sender", move |_, this, _: ()| async move {
1436            this.sender().await.map_err(any_err)
1437        });
1438
1439        methods.add_method("num_attempts", move |_, this, _: ()| {
1440            Ok(this.get_num_attempts())
1441        });
1442        methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1443            Ok(this.increment_num_attempts())
1444        });
1445
1446        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1447            this.get_queue_name().await.map_err(any_err)
1448        });
1449
1450        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1451            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1452            let revised_due = this.set_due(due).await.map_err(any_err)?;
1453            lua.to_value(&revised_due)
1454        });
1455
1456        methods.add_async_method(
1457            "set_sender",
1458            move |lua, this, value: mlua::Value| async move {
1459                let sender = match value {
1460                    mlua::Value::String(s) => {
1461                        let s = s.to_str()?;
1462                        EnvelopeAddress::parse(&s).map_err(any_err)?
1463                    }
1464                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1465                };
1466                this.set_sender(sender).await.map_err(any_err)
1467            },
1468        );
1469
1470        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1471            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1472            match recipients.len() {
1473                0 => Ok(mlua::Value::Nil),
1474                1 => {
1475                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1476                    recip.into_lua(&lua)
1477                }
1478                _ => recipients.into_lua(&lua),
1479            }
1480        });
1481
1482        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1483            let recipients = this.recipient_list().await.map_err(any_err)?;
1484            recipients.into_lua(&lua)
1485        });
1486
1487        methods.add_async_method(
1488            "set_recipient",
1489            move |lua, this, value: mlua::Value| async move {
1490                let recipients = match value {
1491                    mlua::Value::String(s) => {
1492                        let s = s.to_str()?;
1493                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1494                    }
1495                    _ => {
1496                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1497                            recips
1498                        } else {
1499                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1500                        }
1501                    }
1502                };
1503                this.set_recipient_list(recipients).await.map_err(any_err)
1504            },
1505        );
1506
1507        #[cfg(feature = "impl")]
1508        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1509            this.dkim_sign(signer).await.map_err(any_err)
1510        });
1511
1512        methods.add_async_method("shrink", |_, this, _: ()| async move {
1513            if this.needs_save() {
1514                this.save(None).await.map_err(any_err)?;
1515            }
1516            this.shrink().map_err(any_err)
1517        });
1518
1519        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1520            if this.needs_save() {
1521                this.save(None).await.map_err(any_err)?;
1522            }
1523            this.shrink_data().map_err(any_err)
1524        });
1525
1526        methods.add_async_method(
1527            "add_authentication_results",
1528            |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1529                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1530                let results = AuthenticationResults {
1531                    serv_id,
1532                    version: None,
1533                    results,
1534                };
1535
1536                this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1537                    .await
1538                    .map_err(any_err)?;
1539
1540                Ok(())
1541            },
1542        );
1543
1544        #[cfg(feature = "impl")]
1545        methods.add_async_method(
1546            "arc_verify",
1547            |lua, this, opt_resolver_name: Option<String>| async move {
1548                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1549                lua.to_value_with(&results, serialize_options())
1550            },
1551        );
1552
1553        #[cfg(feature = "impl")]
1554        methods.add_async_method(
1555            "arc_seal",
1556            |lua,
1557             this,
1558             (signer, serv_id, auth_res, opt_resolver_name): (
1559                Signer,
1560                String,
1561                mlua::Value,
1562                Option<String>,
1563            )| async move {
1564                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1565                this.arc_seal(
1566                    signer,
1567                    AuthenticationResults {
1568                        serv_id,
1569                        version: None,
1570                        results,
1571                    },
1572                    opt_resolver_name,
1573                )
1574                .await
1575                .map_err(any_err)
1576            },
1577        );
1578
1579        #[cfg(feature = "impl")]
1580        methods.add_async_method(
1581            "dkim_verify",
1582            |lua, this, opt_resolver_name: Option<String>| async move {
1583                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1584                lua.to_value_with(&results, serialize_options())
1585            },
1586        );
1587
1588        methods.add_async_method(
1589            "prepend_header",
1590            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1591                let encode = encode.unwrap_or(false);
1592                if encode {
1593                    let header = Header::new_unstructured(name.clone(), value);
1594                    this.prepend_header(Some(&name), header.get_raw_value())
1595                        .await
1596                        .map_err(any_err)?;
1597                } else {
1598                    this.prepend_header(Some(&name), &value)
1599                        .await
1600                        .map_err(any_err)?;
1601                }
1602                Ok(())
1603            },
1604        );
1605        methods.add_async_method(
1606            "append_header",
1607            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1608                let encode = encode.unwrap_or(false);
1609                if encode {
1610                    let header = Header::new_unstructured(name.clone(), value);
1611                    this.append_header(Some(&name), header.get_raw_value())
1612                        .await
1613                        .map_err(any_err)?;
1614                } else {
1615                    this.append_header(Some(&name), &value)
1616                        .await
1617                        .map_err(any_err)?;
1618                }
1619                Ok(())
1620            },
1621        );
1622        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1623            this.get_address_header(&name).await.map_err(any_err)
1624        });
1625        methods.add_async_method("from_header", |_, this, ()| async move {
1626            this.get_address_header("From").await.map_err(any_err)
1627        });
1628        methods.add_async_method("to_header", |_, this, ()| async move {
1629            this.get_address_header("To").await.map_err(any_err)
1630        });
1631
1632        methods.add_async_method(
1633            "get_first_named_header_value",
1634            |_, this, name: String| async move {
1635                this.get_first_named_header_value(&name)
1636                    .await
1637                    .map_err(any_err)
1638            },
1639        );
1640        methods.add_async_method(
1641            "get_all_named_header_values",
1642            |_, this, name: String| async move {
1643                this.get_all_named_header_values(&name)
1644                    .await
1645                    .map_err(any_err)
1646            },
1647        );
1648        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1649            Ok(this
1650                .get_all_headers()
1651                .await
1652                .map_err(any_err)?
1653                .into_iter()
1654                .map(|(name, value)| vec![name, value])
1655                .collect::<Vec<Vec<String>>>())
1656        });
1657        methods.add_async_method(
1658            "import_x_headers",
1659            |_, this, names: Option<Vec<String>>| async move {
1660                this.import_x_headers(names.unwrap_or_default())
1661                    .await
1662                    .map_err(any_err)
1663            },
1664        );
1665
1666        methods.add_async_method(
1667            "remove_x_headers",
1668            |_, this, names: Option<Vec<String>>| async move {
1669                this.remove_x_headers(names.unwrap_or_default())
1670                    .await
1671                    .map_err(any_err)
1672            },
1673        );
1674        methods.add_async_method(
1675            "remove_all_named_headers",
1676            |_, this, name: String| async move {
1677                this.remove_all_named_headers(&name).await.map_err(any_err)
1678            },
1679        );
1680
1681        methods.add_async_method(
1682            "import_scheduling_header",
1683            |lua, this, (header_name, remove): (String, bool)| async move {
1684                let opt_schedule = this
1685                    .import_scheduling_header(&header_name, remove)
1686                    .await
1687                    .map_err(any_err)?;
1688                lua.to_value(&opt_schedule)
1689            },
1690        );
1691
1692        methods.add_async_method(
1693            "set_scheduling",
1694            move |lua, this, params: mlua::Value| async move {
1695                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1696                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1697                lua.to_value(&opt_schedule)
1698            },
1699        );
1700
1701        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1702            let report = this.parse_rfc3464().await.map_err(any_err)?;
1703            match report {
1704                Some(report) => lua.to_value_with(&report, serialize_options()),
1705                None => Ok(mlua::Value::Nil),
1706            }
1707        });
1708
1709        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1710            let report = this.parse_rfc5965().await.map_err(any_err)?;
1711            match report {
1712                Some(report) => lua.to_value_with(&report, serialize_options()),
1713                None => Ok(mlua::Value::Nil),
1714            }
1715        });
1716
1717        methods.add_async_method("save", |_, this, ()| async move {
1718            this.save(None).await.map_err(any_err)
1719        });
1720
1721        methods.add_method("set_force_sync", move |_, this, force: bool| {
1722            this.set_force_sync(force);
1723            Ok(())
1724        });
1725
1726        methods.add_async_method(
1727            "check_fix_conformance",
1728            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1729                use std::str::FromStr;
1730                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1731                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1732
1733                let settings = match settings {
1734                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1735                    None => None,
1736                };
1737
1738                match this
1739                    .check_fix_conformance(check, fix, settings.as_ref())
1740                    .await
1741                {
1742                    Ok(_) => Ok(None),
1743                    Err(err) => Ok(Some(format!("{err:#}"))),
1744                }
1745            },
1746        );
1747    }
1748}
1749
1750impl TimerEntryWithDelay for WeakMessage {
1751    fn delay(&self) -> Duration {
1752        match self.upgrade() {
1753            None => {
1754                // Dangling/Cancelled. Make it appear due immediately
1755                Duration::from_millis(0)
1756            }
1757            Some(msg) => msg.delay(),
1758        }
1759    }
1760}
1761
1762impl TimerEntryWithDelay for Message {
1763    fn delay(&self) -> Duration {
1764        let inner = self.msg_and_id.inner.lock();
1765        match inner.due {
1766            Some(time) => {
1767                let now = Utc::now();
1768                let delta = time - now;
1769                delta.to_std().unwrap_or(Duration::from_millis(0))
1770            }
1771            None => Duration::from_millis(0),
1772        }
1773    }
1774}
1775
1776#[cfg(test)]
1777pub(crate) mod test {
1778    use super::*;
1779    use serde_json::json;
1780
1781    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1782        Message::new_dirty(
1783            SpoolId::new(),
1784            EnvelopeAddress::parse("sender@example.com").unwrap(),
1785            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1786            serde_json::json!({}),
1787            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1788        )
1789        .unwrap()
1790    }
1791
1792    fn data_as_string(msg: &Message) -> String {
1793        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1794    }
1795
1796    const X_HDR_CONTENT: &str =
1797        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1798
1799    #[tokio::test]
1800    async fn import_all_x_headers() {
1801        let msg = new_msg_body(X_HDR_CONTENT);
1802
1803        msg.import_x_headers(vec![]).await.unwrap();
1804        k9::assert_equal!(
1805            msg.get_meta_obj().await.unwrap(),
1806            json!({
1807                "x_hello": "there",
1808                "x_header": "value",
1809            })
1810        );
1811    }
1812
1813    #[tokio::test]
1814    async fn meta_and_nil() {
1815        let msg = new_msg_body(X_HDR_CONTENT);
1816        // Ensure that json null round-trips
1817        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1818        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1819
1820        // and that it is exposed to lua as nil
1821        let lua = mlua::Lua::new();
1822        lua.globals().set("msg", msg).unwrap();
1823        lua.load("assert(msg:get_meta('test') == nil)")
1824            .exec()
1825            .unwrap();
1826    }
1827
1828    #[tokio::test]
1829    async fn import_some_x_headers() {
1830        let msg = new_msg_body(X_HDR_CONTENT);
1831
1832        msg.import_x_headers(vec!["x-hello".to_string()])
1833            .await
1834            .unwrap();
1835        k9::assert_equal!(
1836            msg.get_meta_obj().await.unwrap(),
1837            json!({
1838                "x_hello": "there",
1839            })
1840        );
1841    }
1842
1843    #[tokio::test]
1844    async fn remove_all_x_headers() {
1845        let msg = new_msg_body(X_HDR_CONTENT);
1846
1847        msg.remove_x_headers(vec![]).await.unwrap();
1848        k9::assert_equal!(
1849            data_as_string(&msg),
1850            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1851        );
1852    }
1853
1854    #[tokio::test]
1855    async fn prepend_header_2_params() {
1856        let msg = new_msg_body(X_HDR_CONTENT);
1857
1858        msg.prepend_header(Some("Date"), "Today").await.unwrap();
1859        k9::assert_equal!(
1860            data_as_string(&msg),
1861            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1862        );
1863    }
1864
1865    #[tokio::test]
1866    async fn prepend_header_1_params() {
1867        let msg = new_msg_body(X_HDR_CONTENT);
1868
1869        msg.prepend_header(None, "Date: Today").await.unwrap();
1870        k9::assert_equal!(
1871            data_as_string(&msg),
1872            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1873        );
1874    }
1875
1876    #[tokio::test]
1877    async fn append_header_2_params() {
1878        let msg = new_msg_body(X_HDR_CONTENT);
1879
1880        msg.append_header(Some("Date"), "Today").await.unwrap();
1881        k9::assert_equal!(
1882            data_as_string(&msg),
1883            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1884        );
1885    }
1886
1887    #[tokio::test]
1888    async fn append_header_1_params() {
1889        let msg = new_msg_body(X_HDR_CONTENT);
1890
1891        msg.append_header(None, "Date: Today").await.unwrap();
1892        k9::assert_equal!(
1893            data_as_string(&msg),
1894            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1895        );
1896    }
1897
1898    const MULTI_HEADER_CONTENT: &str =
1899        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1900
1901    #[tokio::test]
1902    async fn get_first_header() {
1903        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1904        k9::assert_equal!(
1905            msg.get_first_named_header_value("X-header")
1906                .await
1907                .unwrap()
1908                .unwrap(),
1909            "value"
1910        );
1911    }
1912
1913    #[tokio::test]
1914    async fn get_all_header() {
1915        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1916        k9::assert_equal!(
1917            msg.get_all_named_header_values("X-header").await.unwrap(),
1918            vec!["value".to_string(), "another value".to_string()]
1919        );
1920    }
1921
1922    #[tokio::test]
1923    async fn remove_first() {
1924        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1925        msg.remove_first_named_header("X-header").await.unwrap();
1926        k9::assert_equal!(
1927            data_as_string(&msg),
1928            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1929        );
1930    }
1931
1932    #[tokio::test]
1933    async fn remove_all() {
1934        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1935        msg.remove_all_named_headers("X-header").await.unwrap();
1936        k9::assert_equal!(
1937            data_as_string(&msg),
1938            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1939        );
1940    }
1941
1942    #[tokio::test]
1943    async fn append_text_plain() {
1944        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1945        msg.append_text_plain("I am at the bottom").await.unwrap();
1946        k9::assert_equal!(
1947            data_as_string(&msg),
1948            "X-Hello: there\r\n\
1949             X-Header: value\r\n\
1950             Subject: Hello\r\n\
1951             X-Header: another value\r\n\
1952             From :Someone@somewhere\r\n\
1953             Content-Type: text/plain;\r\n\
1954             \tcharset=\"us-ascii\"\r\n\
1955             \r\n\
1956             Body\r\n\
1957             I am at the bottom\r\n"
1958        );
1959    }
1960
1961    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1962\tboundary=\"my-boundary\"\r\n\
1963\r\n\
1964--my-boundary\r\n\
1965Content-Type: text/plain;\r\n\
1966\tcharset=\"us-ascii\"\r\n\
1967\r\n\
1968plain text\r\n\
1969--my-boundary\r\n\
1970Content-Type: text/html;\r\n\
1971\tcharset=\"us-ascii\"\r\n\
1972\r\n\
1973<b>rich</b> text\r\n\
1974--my-boundary\r\n\
1975Content-Type: application/octet-stream\r\n\
1976Content-Transfer-Encoding: base64\r\n\
1977Content-Disposition: attachment;\r\n\
1978\tfilename=\"woot.bin\"\r\n\
1979Content-ID: <woot.id@somewhere>\r\n\
1980\r\n\
1981AAECAw==\r\n\
1982--my-boundary--\r\n\
1983\r\n";
1984
1985    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1986\tboundary=\"my-boundary\"\r\n\
1987\r\n\
1988--my-boundary\r\n\
1989Content-Type: text/plain;\r\n\
1990\tcharset=\"us-ascii\"\r\n\
1991\r\n\
1992plain text\r\n\
1993--my-boundary\r\n\
1994Content-Type: text/html;\r\n\
1995\tcharset=\"us-ascii\"\r\n\
1996\r\n\
1997<BODY>\r\n\
1998<b>rich</b> text\r\n\
1999</BODY>\r\n\
2000--my-boundary\r\n\
2001Content-Type: application/octet-stream\r\n\
2002Content-Transfer-Encoding: base64\r\n\
2003Content-Disposition: attachment;\r\n\
2004\tfilename=\"woot.bin\"\r\n\
2005Content-ID: <woot.id>\r\n\
2006\r\n\
2007AAECAw==\r\n\
2008--my-boundary--\r\n\
2009\r\n";
2010
2011    #[tokio::test]
2012    async fn append_text_html() {
2013        let msg = new_msg_body(MIXED_CONTENT);
2014        msg.append_text_html("bottom html").await.unwrap();
2015        k9::snapshot!(
2016            data_as_string(&msg),
2017            r#"
2018Content-Type: multipart/mixed;\r
2019\tboundary="my-boundary"\r
2020\r
2021--my-boundary\r
2022Content-Type: text/plain;\r
2023\tcharset="us-ascii"\r
2024\r
2025plain text\r
2026--my-boundary\r
2027Content-Type: text/html;\r
2028\tcharset="us-ascii"\r
2029\r
2030<b>rich</b> text\r
2031\r
2032bottom html\r
2033--my-boundary\r
2034Content-Type: application/octet-stream\r
2035Content-Transfer-Encoding: base64\r
2036Content-Disposition: attachment;\r
2037\tfilename="woot.bin"\r
2038Content-ID: <woot.id@somewhere>\r
2039\r
2040AAECAw==\r
2041--my-boundary--\r
2042\r
2043
2044"#
2045        );
2046
2047        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2048        msg.append_text_html("bottom html 👻").await.unwrap();
2049        k9::snapshot!(
2050            data_as_string(&msg),
2051            r#"
2052Content-Type: multipart/mixed;\r
2053\tboundary="my-boundary"\r
2054\r
2055--my-boundary\r
2056Content-Type: text/plain;\r
2057\tcharset="us-ascii"\r
2058\r
2059plain text\r
2060--my-boundary\r
2061Content-Type: text/html;\r
2062\tcharset="utf-8"\r
2063Content-Transfer-Encoding: quoted-printable\r
2064\r
2065<BODY>\r
2066<b>rich</b> text\r
2067\r
2068bottom html =F0=9F=91=BB</BODY>\r
2069--my-boundary\r
2070Content-Type: application/octet-stream\r
2071Content-Transfer-Encoding: base64\r
2072Content-Disposition: attachment;\r
2073\tfilename="woot.bin"\r
2074Content-ID: <woot.id>\r
2075\r
2076AAECAw==\r
2077--my-boundary--\r
2078\r
2079
2080"#
2081        );
2082    }
2083
2084    #[tokio::test]
2085    async fn append_text_plain_mixed() {
2086        let msg = new_msg_body(MIXED_CONTENT);
2087        msg.append_text_plain("bottom text 👾").await.unwrap();
2088        k9::snapshot!(
2089            data_as_string(&msg),
2090            r#"
2091Content-Type: multipart/mixed;\r
2092\tboundary="my-boundary"\r
2093\r
2094--my-boundary\r
2095Content-Type: text/plain;\r
2096\tcharset="utf-8"\r
2097Content-Transfer-Encoding: quoted-printable\r
2098\r
2099plain text\r
2100\r
2101bottom text =F0=9F=91=BE\r
2102--my-boundary\r
2103Content-Type: text/html;\r
2104\tcharset="us-ascii"\r
2105\r
2106<b>rich</b> text\r
2107--my-boundary\r
2108Content-Type: application/octet-stream\r
2109Content-Transfer-Encoding: base64\r
2110Content-Disposition: attachment;\r
2111\tfilename="woot.bin"\r
2112Content-ID: <woot.id@somewhere>\r
2113\r
2114AAECAw==\r
2115--my-boundary--\r
2116\r
2117
2118"#
2119        );
2120    }
2121
2122    #[tokio::test]
2123    async fn check_conformance_angle_msg_id() {
2124        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2125Message-ID: <<1234@example.com>>\r
2126\r
2127Hello";
2128        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2129        k9::snapshot!(
2130            msg.check_fix_conformance(
2131                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2132                MessageConformance::empty(),
2133                None,
2134            )
2135            .await
2136            .unwrap_err()
2137            .to_string(),
2138            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2139        );
2140
2141        msg.check_fix_conformance(
2142            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2143            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2144            None,
2145        )
2146        .await
2147        .unwrap();
2148
2149        // Can't use a snapshot test here because the fixed header
2150        // has a unique random component
2151        /*
2152                k9::snapshot!(
2153                    data_as_string(&msg),
2154                    r#"
2155        Subject: hello\r
2156        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2157        \r
2158        Hello
2159        "#
2160                );
2161        */
2162
2163        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2164Message-ID: <<1234@example.com>>\r
2165\r
2166Hello this is a really long line Hello this is a really long line \
2167Hello this is a really long line Hello this is a really long line \
2168Hello this is a really long line Hello this is a really long line \
2169Hello this is a really long line Hello this is a really long line \
2170Hello this is a really long line Hello this is a really long line \
2171Hello this is a really long line Hello this is a really long line \
2172Hello this is a really long line Hello this is a really long line
2173";
2174        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2175        msg.check_fix_conformance(
2176            MessageConformance::MISSING_COLON_VALUE,
2177            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2178            None,
2179        )
2180        .await
2181        .unwrap();
2182
2183        // Can't use a snapshot test here because the fixed header
2184        // has a random component
2185        /*
2186                k9::snapshot!(
2187                    data_as_string(&msg),
2188                    r#"
2189        Content-Type: text/plain;\r
2190        \tcharset="us-ascii"\r
2191        Content-Transfer-Encoding: quoted-printable\r
2192        Subject: hello\r
2193        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2194        \r
2195        Hello this is a really long line Hello this is a really long line Hello thi=\r
2196        s is a really long line Hello this is a really long line Hello this is a re=\r
2197        ally long line Hello this is a really long line Hello this is a really long=\r
2198         line Hello this is a really long line Hello this is a really long line Hel=\r
2199        lo this is a really long line Hello this is a really long line Hello this i=\r
2200        s a really long line Hello this is a really long line Hello this is a reall=\r
2201        y long line=0A\r
2202
2203        "#
2204                );
2205        */
2206    }
2207
2208    #[tokio::test]
2209    async fn check_conformance() {
2210        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2211        msg.check_fix_conformance(
2212            MessageConformance::default(),
2213            MessageConformance::MISSING_MIME_VERSION,
2214            None,
2215        )
2216        .await
2217        .unwrap();
2218        k9::snapshot!(
2219            data_as_string(&msg),
2220            r#"
2221X-Hello: there\r
2222X-Header: value\r
2223Subject: Hello\r
2224X-Header: another value\r
2225From :Someone@somewhere\r
2226Mime-Version: 1.0\r
2227\r
2228Body
2229"#
2230        );
2231
2232        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2233        msg.check_fix_conformance(
2234            MessageConformance::default(),
2235            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2236            None,
2237        )
2238        .await
2239        .unwrap();
2240        k9::snapshot!(
2241            data_as_string(&msg),
2242            r#"
2243Content-Type: text/plain;\r
2244\tcharset="us-ascii"\r
2245X-Hello: there\r
2246X-Header: value\r
2247Subject: Hello\r
2248X-Header: another value\r
2249From: <Someone@somewhere>\r
2250Mime-Version: 1.0\r
2251\r
2252Body\r
2253
2254"#
2255        );
2256    }
2257
2258    #[tokio::test]
2259    async fn check_fix_latin_input() {
2260        const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2261        let msg = new_msg_body(&*POUNDS);
2262        msg.check_fix_conformance(
2263            MessageConformance::default(),
2264            MessageConformance::NEEDS_TRANSFER_ENCODING,
2265            Some(&CheckFixSettings {
2266                detect_encoding: true,
2267                include_encodings: vec!["iso-8859-1".to_string()],
2268                ..Default::default()
2269            }),
2270        )
2271        .await
2272        .unwrap();
2273
2274        let subject = msg
2275            .get_first_named_header_value("subject")
2276            .await
2277            .unwrap()
2278            .unwrap();
2279        assert_eq!(subject, "£");
2280    }
2281
2282    #[tokio::test]
2283    async fn set_scheduling() -> anyhow::Result<()> {
2284        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2285        assert!(msg.get_due().is_none(), "due is implicitly now");
2286
2287        let now = Utc::now();
2288        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2289
2290        msg.set_scheduling(Some(Scheduling {
2291            restriction: None,
2292            first_attempt: Some((now + one_day).into()),
2293            expires: None,
2294        }))
2295        .await?;
2296
2297        let due = msg.get_due().expect("due to now be set");
2298        assert!(due - now >= one_day, "due time is at least 1 day away");
2299
2300        Ok(())
2301    }
2302
2303    #[cfg(all(test, target_pointer_width = "64"))]
2304    #[test]
2305    fn sizes() {
2306        assert_eq!(std::mem::size_of::<Message>(), 8);
2307        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2308        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2309    }
2310}