message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24    CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::hash::Hash;
37use std::sync::{Arc, LazyLock, Weak};
38use std::time::{Duration, Instant};
39use timeq::TimerEntryWithDelay;
40
41bitflags::bitflags! {
42    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
43    struct MessageFlags: u8 {
44        /// true if Metadata needs to be saved
45        const META_DIRTY = 1;
46        /// true if Data needs to be saved
47        const DATA_DIRTY = 2;
48        /// true if scheduling restrictions are present in the metadata
49        const SCHEDULED = 4;
50        /// true if high durability writes should always be used
51        const FORCE_SYNC = 8;
52    }
53}
54
55declare_metric! {
56/// Total number of Message objects.
57///
58/// This encompasses all Message objects in various states, whether
59/// they are in a queue, moving between queues, being built as part
60/// of an injection, pending logging, message metadata and/or data
61/// may be either resident or offloaded to spool.
62static MESSAGE_COUNT: IntGauge("message_count");
63}
64
65declare_metric! {
66/// Total number of Message objects with metadata loaded.
67///
68/// Tracks how many messages have their `meta` data resident
69/// in memory.  This may be because they have not yet saved
70/// it, or because the message is being processed and the
71/// metadata is required for that processing.
72static META_COUNT: IntGauge("message_meta_resident_count");
73}
74
75declare_metric! {
76/// Total number of Message objects with body data loaded.
77///
78/// Tracks how many messages have their `data` resident
79/// in memory.  This may be because they have not yet saved
80/// it, or because the message is being processed and the
81/// data is either required to be in memory in order to
82/// deliver the message, or because logging or other
83/// post-injection policy is configured to operate on
84/// the message.
85static DATA_COUNT: IntGauge("message_data_resident_count");
86}
87
88static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
89
90declare_metric! {
91/// How many seconds it takes to save a message to spool.
92///
93/// This metric encompasses the elapsed time to saved
94/// either or both the `meta` and `data` portions of a
95/// message to spool.
96///
97/// High values indicate IO pressure which may be
98/// alleviated by tuning other constraints and/or
99/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
100static SAVE_HIST: Histogram("message_save_latency");
101}
102
103declare_metric! {
104/// How many seconds it takes to load message data from spool.
105///
106/// High values indicate IO pressure which may be caused
107/// by policy that operates on the message body post-reception.
108/// We recommend *avoiding* logging header values as that is
109/// the most common cause of this metric spiking and has
110/// the biggest impact in resolving it.
111///
112/// IO pressure may also be alleviated by tuning other constraints and/or
113/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
114static LOAD_DATA_HIST: Histogram("message_data_load_latency");
115}
116
117declare_metric! {
118/// How long it takes to load message metadata from spool
119///
120/// High values indicate IO pressure which may be
121/// alleviated by tuning other constraints and/or
122/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
123static LOAD_META_HIST: Histogram("message_meta_load_latency");
124}
125
126#[derive(Debug)]
127struct MessageInner {
128    metadata: Option<Box<MetaData>>,
129    data: Arc<Box<[u8]>>,
130    flags: MessageFlags,
131    num_attempts: u16,
132    due: Option<DateTime<Utc>>,
133}
134
135#[derive(Debug)]
136pub(crate) struct MessageWithId {
137    id: SpoolId,
138    inner: Mutex<MessageInner>,
139    link: LinkedListAtomicLink,
140}
141
142intrusive_adapter!(
143    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
144);
145
146/// A list of messages with an O(1) list overhead; no additional
147/// memory per-message is required to track this list.
148/// However, a given Message can only belong to one instance
149/// of such a list at a time.
150pub struct MessageList {
151    list: LinkedList<MessageWithIdAdapter>,
152    len: usize,
153}
154
155impl Default for MessageList {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161impl MessageList {
162    /// Create a new MessageList
163    pub fn new() -> Self {
164        Self {
165            list: LinkedList::new(MessageWithIdAdapter::default()),
166            len: 0,
167        }
168    }
169
170    /// Returns the number of elements contained in the list
171    pub fn len(&self) -> usize {
172        self.len
173    }
174
175    pub fn is_empty(&self) -> bool {
176        self.len == 0
177    }
178
179    /// Take all of the elements from this list and return them
180    /// in a new separate instance of MessageList.
181    pub fn take(&mut self) -> Self {
182        let new_list = Self {
183            list: self.list.take(),
184            len: self.len,
185        };
186        self.len = 0;
187        new_list
188    }
189
190    /// Push a message to the back of the list
191    pub fn push_back(&mut self, message: Message) {
192        self.list.push_back(message.msg_and_id);
193        self.len += 1;
194    }
195
196    /// Pop a message from the front of the list
197    pub fn pop_front(&mut self) -> Option<Message> {
198        self.list.pop_front().map(|msg_and_id| {
199            self.len -= 1;
200            Message { msg_and_id }
201        })
202    }
203
204    /// Pop a message from the back of the list
205    pub fn pop_back(&mut self) -> Option<Message> {
206        self.list.pop_back().map(|msg_and_id| {
207            self.len -= 1;
208            Message { msg_and_id }
209        })
210    }
211
212    /// Pop all of the messages from this list into a vector
213    /// of messages.
214    /// Memory usage is O(number-of-messages).
215    pub fn drain(&mut self) -> Vec<Message> {
216        let mut messages = Vec::with_capacity(self.len);
217        while let Some(msg) = self.pop_front() {
218            messages.push(msg);
219        }
220        messages
221    }
222
223    pub fn extend_from_iter<I>(&mut self, iter: I)
224    where
225        I: Iterator<Item = Message>,
226    {
227        for msg in iter {
228            self.push_back(msg)
229        }
230    }
231}
232
233impl IntoIterator for MessageList {
234    type Item = Message;
235    type IntoIter = MessageListIter;
236    fn into_iter(self) -> MessageListIter {
237        MessageListIter { list: self.list }
238    }
239}
240
241pub struct MessageListIter {
242    list: LinkedList<MessageWithIdAdapter>,
243}
244
245impl Iterator for MessageListIter {
246    type Item = Message;
247    fn next(&mut self) -> Option<Message> {
248        self.list
249            .pop_front()
250            .map(|msg_and_id| Message { msg_and_id })
251    }
252}
253
254#[derive(Clone, Debug)]
255#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
256pub struct Message {
257    pub(crate) msg_and_id: Arc<MessageWithId>,
258}
259
260impl PartialEq for Message {
261    fn eq(&self, other: &Self) -> bool {
262        self.id() == other.id()
263    }
264}
265impl Eq for Message {}
266
267impl Hash for Message {
268    fn hash<H>(&self, hasher: &mut H)
269    where
270        H: std::hash::Hasher,
271    {
272        self.id().hash(hasher)
273    }
274}
275
276#[derive(Clone, Debug)]
277pub struct WeakMessage {
278    weak: Weak<MessageWithId>,
279}
280
281impl WeakMessage {
282    pub fn upgrade(&self) -> Option<Message> {
283        Some(Message {
284            msg_and_id: self.weak.upgrade()?,
285        })
286    }
287}
288
289#[serde_as]
290#[derive(Debug, Serialize, Deserialize, Clone)]
291pub(crate) struct MetaData {
292    pub sender: EnvelopeAddress,
293    #[serde_as(as = "OneOrMany<_, PreferOne>")]
294    pub recipient: Vec<EnvelopeAddress>,
295    pub meta: serde_json::Value,
296    #[serde(default)]
297    pub schedule: Option<Scheduling>,
298}
299
300impl Drop for MessageInner {
301    fn drop(&mut self) {
302        if self.metadata.is_some() {
303            META_COUNT.dec();
304        }
305        if !self.data.is_empty() {
306            DATA_COUNT.dec();
307        }
308        MESSAGE_COUNT.dec();
309    }
310}
311
312impl Message {
313    /// Create a new message with the supplied data.
314    /// The message meta and data are marked as dirty
315    pub fn new_dirty(
316        id: SpoolId,
317        sender: EnvelopeAddress,
318        recipient: Vec<EnvelopeAddress>,
319        meta: serde_json::Value,
320        data: Arc<Box<[u8]>>,
321    ) -> anyhow::Result<Self> {
322        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
323        MESSAGE_COUNT.inc();
324        DATA_COUNT.inc();
325        META_COUNT.inc();
326        Ok(Self {
327            msg_and_id: Arc::new(MessageWithId {
328                id,
329                inner: Mutex::new(MessageInner {
330                    metadata: Some(Box::new(MetaData {
331                        sender,
332                        recipient,
333                        meta,
334                        schedule: None,
335                    })),
336                    data,
337                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
338                    num_attempts: 0,
339                    due: None,
340                }),
341                link: LinkedListAtomicLink::default(),
342            }),
343        })
344    }
345
346    pub fn weak(&self) -> WeakMessage {
347        WeakMessage {
348            weak: Arc::downgrade(&self.msg_and_id),
349        }
350    }
351
352    /// Helper for creating a message based on spool enumeration.
353    /// Given a spool id and the serialized metadata blob, returns
354    /// a message holding the deserialized version of that metadata.
355    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
356        let metadata: MetaData = serde_json::from_slice(&metadata)?;
357        MESSAGE_COUNT.inc();
358        META_COUNT.inc();
359
360        let flags = if metadata.schedule.is_some() {
361            MessageFlags::SCHEDULED
362        } else {
363            MessageFlags::empty()
364        };
365
366        Ok(Self {
367            msg_and_id: Arc::new(MessageWithId {
368                id,
369                inner: Mutex::new(MessageInner {
370                    metadata: Some(Box::new(metadata)),
371                    data: NO_DATA.clone(),
372                    flags,
373                    num_attempts: 0,
374                    due: None,
375                }),
376                link: LinkedListAtomicLink::default(),
377            }),
378        })
379    }
380
381    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
382        MESSAGE_COUNT.inc();
383        META_COUNT.inc();
384
385        let flags = if metadata.schedule.is_some() {
386            MessageFlags::SCHEDULED
387        } else {
388            MessageFlags::empty()
389        };
390
391        Self {
392            msg_and_id: Arc::new(MessageWithId {
393                id,
394                inner: Mutex::new(MessageInner {
395                    metadata: Some(Box::new(metadata)),
396                    data,
397                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
398                    num_attempts: 0,
399                    due: None,
400                }),
401                link: LinkedListAtomicLink::default(),
402            }),
403        }
404    }
405
406    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
407        let meta_spool = get_meta_spool();
408        let data = meta_spool.load(id).await?;
409        Self::new_from_spool(id, data)
410    }
411
412    pub fn get_num_attempts(&self) -> u16 {
413        let inner = self.msg_and_id.inner.lock();
414        inner.num_attempts
415    }
416
417    pub fn set_num_attempts(&self, num_attempts: u16) {
418        let mut inner = self.msg_and_id.inner.lock();
419        inner.num_attempts = num_attempts;
420    }
421
422    pub fn increment_num_attempts(&self) {
423        let mut inner = self.msg_and_id.inner.lock();
424        inner.num_attempts += 1;
425    }
426
427    pub async fn set_scheduling(
428        &self,
429        scheduling: Option<Scheduling>,
430    ) -> anyhow::Result<Option<Scheduling>> {
431        self.load_meta_if_needed().await?;
432        let mut inner = self.msg_and_id.inner.lock();
433        match &mut inner.metadata {
434            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
435            Some(meta) => {
436                meta.schedule = scheduling;
437                inner
438                    .flags
439                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
440                if let Some(sched) = scheduling {
441                    let due = inner.due.unwrap_or_else(Utc::now);
442                    inner.due = Some(sched.adjust_for_schedule(due));
443                }
444                Ok(scheduling)
445            }
446        }
447    }
448
449    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
450        self.load_meta_if_needed().await?;
451        let inner = self.msg_and_id.inner.lock();
452        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
453    }
454
455    pub fn get_due(&self) -> Option<DateTime<Utc>> {
456        let inner = self.msg_and_id.inner.lock();
457        inner.due
458    }
459
460    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
461        let scale = rand::random::<f32>();
462        let value = (scale * limit as f32) as i64;
463        self.delay_by(seconds(value)?).await
464    }
465
466    pub async fn delay_by(
467        &self,
468        duration: chrono::Duration,
469    ) -> anyhow::Result<Option<DateTime<Utc>>> {
470        let due = Utc::now() + duration;
471        self.set_due(Some(due)).await
472    }
473
474    /// Delay by requested duration, and add up to 1 minute of jitter
475    pub async fn delay_by_and_jitter(
476        &self,
477        duration: chrono::Duration,
478    ) -> anyhow::Result<Option<DateTime<Utc>>> {
479        let scale = rand::random::<f32>();
480        let value = (scale * 60.) as i64;
481        let due = Utc::now() + duration + seconds(value)?;
482        self.set_due(Some(due)).await
483    }
484
485    pub async fn set_due(
486        &self,
487        due: Option<DateTime<Utc>>,
488    ) -> anyhow::Result<Option<DateTime<Utc>>> {
489        let due = {
490            let mut inner = self.msg_and_id.inner.lock();
491
492            if !inner.flags.contains(MessageFlags::SCHEDULED) {
493                // This is the simple, fast-path, common case
494                inner.due = due;
495                return Ok(inner.due);
496            }
497
498            let due = due.unwrap_or_else(Utc::now);
499
500            if let Some(meta) = &inner.metadata {
501                inner.due = match &meta.schedule {
502                    Some(sched) => Some(sched.adjust_for_schedule(due)),
503                    None => Some(due),
504                };
505                return Ok(inner.due);
506            }
507
508            // We'll need to load the metadata to correctly
509            // update the schedule for this message
510            due
511        };
512
513        self.load_meta().await?;
514
515        {
516            let mut inner = self.msg_and_id.inner.lock();
517            match &inner.metadata {
518                Some(meta) => {
519                    inner.due = match &meta.schedule {
520                        Some(sched) => Some(sched.adjust_for_schedule(due)),
521                        None => Some(due),
522                    };
523                    Ok(inner.due)
524                }
525                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
526            }
527        }
528    }
529
530    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
531        let inner = self.msg_and_id.inner.lock();
532        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
533            Some(Arc::clone(&inner.data))
534        } else {
535            None
536        }
537    }
538
539    fn get_meta_if_dirty(&self) -> Option<MetaData> {
540        let inner = self.msg_and_id.inner.lock();
541        if inner.flags.contains(MessageFlags::META_DIRTY) {
542            inner.metadata.as_ref().map(|md| (**md).clone())
543        } else {
544            None
545        }
546    }
547
548    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
549        self.load_meta_if_needed().await?;
550        let inner = self.msg_and_id.inner.lock();
551        inner
552            .metadata
553            .as_ref()
554            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
555            .map(|md| (**md).clone())
556    }
557
558    pub fn set_force_sync(&self, force: bool) {
559        let mut inner = self.msg_and_id.inner.lock();
560        inner.flags.set(MessageFlags::FORCE_SYNC, force);
561    }
562
563    pub fn needs_save(&self) -> bool {
564        let inner = self.msg_and_id.inner.lock();
565        inner.flags.contains(MessageFlags::META_DIRTY)
566            || inner.flags.contains(MessageFlags::DATA_DIRTY)
567    }
568
569    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
570        let _timer = SAVE_HIST.start_timer();
571        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
572            .await
573    }
574
575    pub async fn save_to(
576        &self,
577        meta_spool: &(dyn Spool + Send + Sync),
578        data_spool: &(dyn Spool + Send + Sync),
579        deadline: Option<Instant>,
580    ) -> anyhow::Result<()> {
581        let force_sync = self
582            .msg_and_id
583            .inner
584            .lock()
585            .flags
586            .contains(MessageFlags::FORCE_SYNC);
587
588        let data_fut = if let Some(data) = self.get_data_if_dirty() {
589            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
590            data_spool
591                .store(self.msg_and_id.id, data, force_sync, deadline)
592                .map(|_| true)
593                .boxed()
594        } else {
595            futures::future::ready(false).boxed()
596        };
597        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
598            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
599            meta_spool
600                .store(self.msg_and_id.id, meta, force_sync, deadline)
601                .map(|_| true)
602                .boxed()
603        } else {
604            futures::future::ready(false).boxed()
605        };
606
607        // NOTE: if we have a deadline, it is tempting to want to use
608        // timeout_at here to enforce it, but the underlying spool
609        // futures are not guaranteed to be fully cancel safe, which
610        // is why we pass the deadline down to the save method to allow
611        // them to handle timeouts internally.
612        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
613
614        if data_res {
615            self.msg_and_id
616                .inner
617                .lock()
618                .flags
619                .remove(MessageFlags::DATA_DIRTY);
620        }
621        if meta_res {
622            self.msg_and_id
623                .inner
624                .lock()
625                .flags
626                .remove(MessageFlags::META_DIRTY);
627        }
628        Ok(())
629    }
630
631    pub fn id(&self) -> &SpoolId {
632        &self.msg_and_id.id
633    }
634
635    /// Save the data+meta if needed, then release both
636    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
637        self.save(None).await?;
638        self.shrink()
639    }
640
641    /// Save the data+meta if needed, then release just the data
642    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
643        self.save(None).await?;
644        self.shrink_data()
645    }
646
647    pub fn shrink_data(&self) -> anyhow::Result<bool> {
648        let mut inner = self.msg_and_id.inner.lock();
649        let mut did_shrink = false;
650        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
651            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
652        }
653        if !inner.data.is_empty() {
654            DATA_COUNT.dec();
655            did_shrink = true;
656        }
657        if !inner.data.is_empty() {
658            inner.data = NO_DATA.clone();
659            did_shrink = true;
660        }
661        Ok(did_shrink)
662    }
663
664    pub fn shrink(&self) -> anyhow::Result<bool> {
665        let mut inner = self.msg_and_id.inner.lock();
666        let mut did_shrink = false;
667        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
668            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
669        }
670        if inner.flags.contains(MessageFlags::META_DIRTY) {
671            anyhow::bail!("Cannot shrink message: META_DIRTY");
672        }
673        if inner.metadata.take().is_some() {
674            META_COUNT.dec();
675            did_shrink = true;
676        }
677        if !inner.data.is_empty() {
678            DATA_COUNT.dec();
679            did_shrink = true;
680        }
681        if !inner.data.is_empty() {
682            inner.data = NO_DATA.clone();
683            did_shrink = true;
684        }
685        Ok(did_shrink)
686    }
687
688    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
689        self.load_meta_if_needed().await?;
690        let inner = self.msg_and_id.inner.lock();
691        match &inner.metadata {
692            Some(meta) => Ok(meta.sender.clone()),
693            None => anyhow::bail!("Message::sender metadata is not loaded"),
694        }
695    }
696
697    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
698        self.load_meta_if_needed().await?;
699        let mut inner = self.msg_and_id.inner.lock();
700        match &mut inner.metadata {
701            Some(meta) => {
702                meta.sender = sender;
703                inner.flags.set(MessageFlags::DATA_DIRTY, true);
704                Ok(())
705            }
706            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
707        }
708    }
709
710    #[deprecated = "use recipient_list or first_recipient instead"]
711    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
712        self.first_recipient().await
713    }
714
715    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
716        self.load_meta_if_needed().await?;
717        let inner = self.msg_and_id.inner.lock();
718        match &inner.metadata {
719            Some(meta) => match meta.recipient.first() {
720                Some(recip) => Ok(recip.clone()),
721                None => anyhow::bail!("recipient list is empty!?"),
722            },
723            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
724        }
725    }
726
727    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
728        self.load_meta_if_needed().await?;
729        let inner = self.msg_and_id.inner.lock();
730        match &inner.metadata {
731            Some(meta) => Ok(meta.recipient.clone()),
732            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
733        }
734    }
735
736    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
737        self.load_meta_if_needed().await?;
738        let inner = self.msg_and_id.inner.lock();
739        match &inner.metadata {
740            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
741            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
742        }
743    }
744
745    #[deprecated = "use set_recipient_list instead"]
746    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
747        self.set_recipient_list(vec![recipient]).await
748    }
749
750    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
751        self.load_meta_if_needed().await?;
752
753        let mut inner = self.msg_and_id.inner.lock();
754        match &mut inner.metadata {
755            Some(meta) => {
756                meta.recipient = recipient;
757                inner.flags.set(MessageFlags::DATA_DIRTY, true);
758                Ok(())
759            }
760            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
761        }
762    }
763
764    pub fn is_meta_loaded(&self) -> bool {
765        self.msg_and_id.inner.lock().metadata.is_some()
766    }
767
768    pub fn is_data_loaded(&self) -> bool {
769        !self.msg_and_id.inner.lock().data.is_empty()
770    }
771
772    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
773        if self.is_meta_loaded() {
774            return Ok(());
775        }
776        self.load_meta().await
777    }
778
779    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
780        self.load_data_if_needed().await
781    }
782
783    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
784        if self.is_data_loaded() {
785            return Ok(self.get_data_maybe_not_loaded());
786        }
787        self.load_data().await
788    }
789
790    pub async fn load_meta(&self) -> anyhow::Result<()> {
791        let _timer = LOAD_META_HIST.start_timer();
792        self.load_meta_from(&**get_meta_spool()).await
793    }
794
795    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
796        let id = self.id();
797        let data = meta_spool.load(*id).await?;
798        let mut inner = self.msg_and_id.inner.lock();
799        let was_not_loaded = inner.metadata.is_none();
800        let metadata: MetaData = serde_json::from_slice(&data)?;
801        inner.metadata.replace(Box::new(metadata));
802        if was_not_loaded {
803            META_COUNT.inc();
804        }
805        Ok(())
806    }
807
808    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
809        let _timer = LOAD_DATA_HIST.start_timer();
810        self.load_data_from(&**get_data_spool()).await
811    }
812
813    async fn load_data_from(
814        &self,
815        data_spool: &(dyn Spool + Send + Sync),
816    ) -> anyhow::Result<Arc<Box<[u8]>>> {
817        let data = data_spool.load(*self.id()).await?;
818        let mut inner = self.msg_and_id.inner.lock();
819        let was_empty = inner.data.is_empty();
820        inner.data = Arc::new(data.into_boxed_slice());
821        if was_empty {
822            DATA_COUNT.inc();
823        }
824        Ok(inner.data.clone())
825    }
826
827    pub fn assign_data(&self, data: Vec<u8>) {
828        let mut inner = self.msg_and_id.inner.lock();
829        let was_empty = inner.data.is_empty();
830        inner.data = Arc::new(data.into_boxed_slice());
831        inner.flags.set(MessageFlags::DATA_DIRTY, true);
832        if was_empty {
833            DATA_COUNT.inc();
834        }
835    }
836
837    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
838        let inner = self.msg_and_id.inner.lock();
839        inner.data.clone()
840    }
841
842    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
843        &self,
844        key: S,
845        value: V,
846    ) -> anyhow::Result<()> {
847        self.load_meta_if_needed().await?;
848        let mut inner = self.msg_and_id.inner.lock();
849        match &mut inner.metadata {
850            None => anyhow::bail!("set_meta: metadata must be loaded first"),
851            Some(meta) => {
852                let key = key.as_ref();
853                let value = value.into();
854
855                match &mut meta.meta {
856                    serde_json::Value::Object(map) => {
857                        map.insert(key.to_string(), value);
858                    }
859                    _ => anyhow::bail!("metadata is somehow not a json object"),
860                }
861
862                inner.flags.set(MessageFlags::META_DIRTY, true);
863                Ok(())
864            }
865        }
866    }
867
868    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
869        self.load_meta_if_needed().await?;
870        let mut inner = self.msg_and_id.inner.lock();
871        match &mut inner.metadata {
872            None => anyhow::bail!("set_meta: metadata must be loaded first"),
873            Some(meta) => {
874                let key = key.as_ref();
875
876                match &mut meta.meta {
877                    serde_json::Value::Object(map) => {
878                        map.remove(key);
879                    }
880                    _ => anyhow::bail!("metadata is somehow not a json object"),
881                }
882
883                inner.flags.set(MessageFlags::META_DIRTY, true);
884                Ok(())
885            }
886        }
887    }
888
889    /// Retrieve `key` as a String.
890    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
891        &self,
892        key: S,
893    ) -> anyhow::Result<Option<String>> {
894        match self.get_meta(key).await {
895            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
896            Ok(serde_json::Value::Null) => Ok(None),
897            hmm => {
898                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
899            }
900        }
901    }
902
903    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
904        self.load_meta_if_needed().await?;
905        let inner = self.msg_and_id.inner.lock();
906        match &inner.metadata {
907            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
908            Some(meta) => Ok(meta.meta.clone()),
909        }
910    }
911
912    pub async fn get_meta<S: serde_json::value::Index>(
913        &self,
914        key: S,
915    ) -> anyhow::Result<serde_json::Value> {
916        self.load_meta_if_needed().await?;
917        let inner = self.msg_and_id.inner.lock();
918        match &inner.metadata {
919            None => anyhow::bail!("get_meta: metadata must be loaded first"),
920            Some(meta) => match meta.meta.get(key) {
921                Some(value) => Ok(value.clone()),
922                None => Ok(serde_json::Value::Null),
923            },
924        }
925    }
926
927    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
928        self.msg_and_id.id.age(now)
929    }
930
931    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
932        Ok(match self.get_meta_string("queue").await? {
933            Some(name) => name,
934            None => {
935                let name = QueueNameComponents::format(
936                    self.get_meta_string("campaign").await?,
937                    self.get_meta_string("tenant").await?,
938                    self.first_recipient()
939                        .await?
940                        .domain()
941                        .to_string()
942                        .to_lowercase(),
943                    self.get_meta_string("routing_domain").await?,
944                );
945                name.to_string()
946            }
947        })
948    }
949
950    #[cfg(feature = "impl")]
951    pub async fn arc_verify(
952        &self,
953        opt_resolver_name: Option<String>,
954    ) -> anyhow::Result<AuthenticationResult> {
955        let resolver = get_resolver_instance(&opt_resolver_name)?;
956        let data = self.data().await?;
957        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
958
959        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
960        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
961
962        let arc = ARC::verify(&message, &**resolver).await;
963        Ok(arc.authentication_result())
964    }
965
966    #[cfg(feature = "impl")]
967    pub async fn arc_seal(
968        &self,
969        signer: Signer,
970        auth_results: AuthenticationResults,
971        opt_resolver_name: Option<String>,
972    ) -> anyhow::Result<()> {
973        let resolver = get_resolver_instance(&opt_resolver_name)?;
974        let data = self.data().await?;
975        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
976        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
977        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
978        let arc = ARC::verify(&message, &**resolver).await;
979
980        let headers = arc.seal(&message, auth_results, signer.signer())?;
981        if !headers.is_empty() {
982            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
983
984            for hdr in headers {
985                hdr.write_header(&mut new_data).ok();
986            }
987            new_data.extend_from_slice(&data);
988            self.assign_data(new_data);
989        }
990
991        Ok(())
992    }
993
994    #[cfg(feature = "impl")]
995    pub async fn dkim_verify(
996        &self,
997        opt_resolver_name: Option<String>,
998    ) -> anyhow::Result<Vec<AuthenticationResult>> {
999        let resolver = get_resolver_instance(&opt_resolver_name)?;
1000        let data = self.data().await?;
1001        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1002
1003        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1004        if parsed
1005            .overall_conformance
1006            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1007        {
1008            return Ok(vec![AuthenticationResult {
1009                method: "dkim".into(),
1010                method_version: None,
1011                result: "permerror".into(),
1012                reason: Some("message has non-canonical line endings".into()),
1013                props: Default::default(),
1014            }]);
1015        }
1016        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1017
1018        let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1019        Ok(results)
1020    }
1021
1022    /// Parses the content into an owned MimePart.
1023    /// Changes to that MimePart are NOT reflected in the underlying
1024    /// message; you must re-assign the message data if you wish to modify
1025    /// the message content.
1026    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1027        let data = self.data().await?;
1028        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1029        Ok(MimePart::parse(owned_data)?)
1030    }
1031
1032    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1033        let data = self.data().await?;
1034        Report::parse(&data)
1035    }
1036
1037    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1038        let data = self.data().await?;
1039        ARFReport::parse(&data)
1040    }
1041
1042    pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1043        let data = self.data().await?;
1044        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1045        emit_header(&mut new_data, name, value);
1046        new_data.extend_from_slice(&data);
1047        self.assign_data(new_data);
1048        Ok(())
1049    }
1050
1051    pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1052        let data = self.data().await?;
1053        let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1054        for (idx, window) in data.windows(4).enumerate() {
1055            if window == b"\r\n\r\n" {
1056                let headers = &data[0..idx + 2];
1057                let body = &data[idx + 2..];
1058
1059                new_data.extend_from_slice(headers);
1060                emit_header(&mut new_data, name, value);
1061                new_data.extend_from_slice(body);
1062                self.assign_data(new_data);
1063                return Ok(());
1064            }
1065        }
1066
1067        anyhow::bail!("append_header could not find the end of the header block");
1068    }
1069
1070    pub async fn get_address_header(
1071        &self,
1072        header_name: &str,
1073    ) -> anyhow::Result<Option<HeaderAddressList>> {
1074        let data = self.data().await?;
1075        let HeaderParseResult { headers, .. } =
1076            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1077
1078        match headers.get_first(header_name) {
1079            Some(hdr) => {
1080                let list = hdr.as_address_list()?;
1081                let result: HeaderAddressList = list.into();
1082                Ok(Some(result))
1083            }
1084            None => Ok(None),
1085        }
1086    }
1087
1088    pub async fn get_first_named_header_value(
1089        &self,
1090        name: &str,
1091    ) -> anyhow::Result<Option<BString>> {
1092        let data = self.data().await?;
1093        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1094
1095        match headers.get_first(name) {
1096            Some(hdr) => Ok(Some(
1097                hdr.as_unstructured()
1098                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1099            )),
1100            None => Ok(None),
1101        }
1102    }
1103
1104    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1105        let data = self.data().await?;
1106        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1107
1108        let mut values = vec![];
1109        for hdr in headers.iter_named(name) {
1110            values.push(
1111                hdr.as_unstructured()
1112                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1113            );
1114        }
1115        Ok(values)
1116    }
1117
1118    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1119        let data = self.data().await?;
1120        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1121
1122        let mut values = vec![];
1123        for hdr in headers.iter() {
1124            values.push((
1125                hdr.get_name().into(),
1126                hdr.as_unstructured()
1127                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1128            ));
1129        }
1130        Ok(values)
1131    }
1132
1133    pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1134        &self,
1135        mut func: F,
1136    ) -> anyhow::Result<()> {
1137        let data = self.data().await?;
1138        let mut new_data = Vec::with_capacity(data.len());
1139        let HeaderParseResult {
1140            headers,
1141            body_offset,
1142            ..
1143        } = Header::parse_headers(data.as_ref().as_ref())?;
1144        for hdr in headers.iter() {
1145            let retain = (func)(hdr);
1146            if !retain {
1147                continue;
1148            }
1149            hdr.write_header(&mut new_data)?;
1150        }
1151        new_data.extend_from_slice(b"\r\n");
1152        new_data.extend_from_slice(&data[body_offset..]);
1153        self.assign_data(new_data);
1154        Ok(())
1155    }
1156
1157    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1158        let mut removed = false;
1159        self.retain_headers(|hdr| {
1160            if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1161                removed = true;
1162                false
1163            } else {
1164                true
1165            }
1166        })
1167        .await
1168    }
1169
1170    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1171        let data = self.data().await?;
1172        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1173
1174        for hdr in headers.iter() {
1175            let do_import = if names.is_empty() {
1176                is_x_header(hdr.get_name())
1177            } else {
1178                is_header_in_names_list(hdr.get_name(), &names)
1179            };
1180            if do_import {
1181                let name = imported_header_name(&hdr.get_name_lossy());
1182                self.set_meta(name, hdr.as_unstructured()?.to_str_lossy())
1183                    .await?;
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1191        self.retain_headers(|hdr| {
1192            if names.is_empty() {
1193                !is_x_header(hdr.get_name())
1194            } else {
1195                !is_header_in_names_list(hdr.get_name(), &names)
1196            }
1197        })
1198        .await
1199    }
1200
1201    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1202        self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1203            .await
1204    }
1205
1206    #[cfg(feature = "impl")]
1207    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1208        let data = self.data().await?;
1209        let header = if let Some(runtime) = SIGN_POOL.get() {
1210            runtime.spawn_blocking(move || signer.sign(&data)).await??
1211        } else {
1212            signer.sign(&data)?
1213        };
1214        self.prepend_header(None, header.as_bytes()).await?;
1215        Ok(())
1216    }
1217
1218    pub async fn import_scheduling_header(
1219        &self,
1220        header_name: &str,
1221        remove: bool,
1222    ) -> anyhow::Result<Option<Scheduling>> {
1223        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1224            let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1225                format!("{value} from header {header_name} is not a valid Scheduling header")
1226            })?;
1227            let result = self.set_scheduling(Some(sched)).await?;
1228
1229            if remove {
1230                self.remove_all_named_headers(header_name).await?;
1231            }
1232
1233            Ok(result)
1234        } else {
1235            Ok(None)
1236        }
1237    }
1238
1239    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1240        let data = self.data().await?;
1241        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1242        let parts = msg.simplified_structure_pointers()?;
1243        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1244            match p.body()? {
1245                DecodedBody::Text(text) => {
1246                    let mut text = text.as_bytes().to_vec();
1247                    text.push_str("\r\n");
1248                    text.push_str(content);
1249                    p.replace_text_body("text/plain", &*text)?;
1250
1251                    let new_data = msg.to_message_bytes();
1252                    self.assign_data(new_data);
1253                    Ok(true)
1254                }
1255                DecodedBody::Binary(_) => {
1256                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1257                }
1258            }
1259        } else {
1260            Ok(false)
1261        }
1262    }
1263
1264    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1265        let data = self.data().await?;
1266        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1267        let parts = msg.simplified_structure_pointers()?;
1268        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1269            match p.body()? {
1270                DecodedBody::Text(text) => {
1271                    let mut text = text.as_bytes().to_vec();
1272
1273                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1274                        Some(idx) => {
1275                            text.insert_str(idx, content);
1276                            text.insert_str(idx, "\r\n");
1277                        }
1278                        None => {
1279                            // Just append
1280                            text.push_str("\r\n");
1281                            text.push_str(content);
1282                        }
1283                    }
1284
1285                    p.replace_text_body("text/html", &*text)?;
1286
1287                    let new_data = msg.to_message_bytes();
1288                    self.assign_data(new_data);
1289                    Ok(true)
1290                }
1291                DecodedBody::Binary(_) => {
1292                    anyhow::bail!("expected text/html part to be text, but it is binary");
1293                }
1294            }
1295        } else {
1296            Ok(false)
1297        }
1298    }
1299
1300    pub async fn check_fix_conformance(
1301        &self,
1302        check: MessageConformance,
1303        fix: MessageConformance,
1304        settings: Option<&CheckFixSettings>,
1305    ) -> anyhow::Result<()> {
1306        let data = self.data().await?;
1307        let data_bytes = data.as_ref().as_ref();
1308        let msg = MimePart::parse(data_bytes)?;
1309
1310        let mut settings = settings.map(Clone::clone).unwrap_or_default();
1311
1312        if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1313            && settings.message_id.is_none()
1314            && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1315        {
1316            let sender = self.sender().await?;
1317            let domain = sender.domain();
1318            let id = *self.id();
1319            settings.message_id.replace(format!("{id}@{domain}"));
1320        }
1321
1322        if settings.detect_encoding {
1323            settings.data_bytes.replace(data.clone());
1324        }
1325
1326        let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1327
1328        if let Some(msg) = opt_msg {
1329            let new_data = msg.to_message_bytes();
1330            self.assign_data(new_data);
1331        }
1332
1333        Ok(())
1334    }
1335}
1336
1337fn is_header_in_names_list(hdr_name: &[u8], names: &[String]) -> bool {
1338    for name in names {
1339        if hdr_name.eq_ignore_ascii_case(name.as_bytes()) {
1340            return true;
1341        }
1342    }
1343    false
1344}
1345
1346fn imported_header_name(name: &str) -> String {
1347    name.chars()
1348        .map(|c| match c.to_ascii_lowercase() {
1349            '-' => '_',
1350            c => c,
1351        })
1352        .collect()
1353}
1354
1355fn is_x_header(name: &[u8]) -> bool {
1356    name.starts_with_str("X-") || name.starts_with_str("x-")
1357}
1358
1359fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1360    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1361}
1362
1363fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1364    if let Some(name) = name {
1365        dest.extend_from_slice(name.as_bytes());
1366        dest.extend_from_slice(b": ");
1367    }
1368    dest.extend_from_slice(value);
1369    if !value.ends_with_str("\r\n") {
1370        dest.extend_from_slice(b"\r\n");
1371    }
1372}
1373
1374#[cfg(feature = "impl")]
1375impl UserData for Message {
1376    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1377        methods.add_async_method(
1378            "set_meta",
1379            move |_, this, (name, value): (String, mlua::Value)| async move {
1380                let value = serde_json::value::to_value(value).map_err(any_err)?;
1381                this.set_meta(name, value).await.map_err(any_err)?;
1382                Ok(())
1383            },
1384        );
1385        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1386            let value = this.get_meta(name).await.map_err(any_err)?;
1387            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1388        });
1389        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1390            let data = this.data().await.map_err(any_err)?;
1391            lua.create_string(&*data)
1392        });
1393        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1394            this.assign_data(data.as_bytes().to_vec());
1395            Ok(())
1396        });
1397
1398        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1399            let data = this.data().await.map_err(any_err)?;
1400            let owned_data = BString::new(data.as_ref().to_vec());
1401            let part = MimePart::parse(owned_data).map_err(any_err)?;
1402            Ok(mod_mimepart::PartRef::new(part))
1403        });
1404
1405        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1406            this.append_text_plain(&data).await.map_err(any_err)
1407        });
1408
1409        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1410            this.append_text_html(&data).await.map_err(any_err)
1411        });
1412
1413        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1414        methods.add_async_method("sender", move |_, this, _: ()| async move {
1415            this.sender().await.map_err(any_err)
1416        });
1417
1418        methods.add_method("num_attempts", move |_, this, _: ()| {
1419            Ok(this.get_num_attempts())
1420        });
1421        methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1422            Ok(this.increment_num_attempts())
1423        });
1424
1425        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1426            this.get_queue_name().await.map_err(any_err)
1427        });
1428
1429        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1430            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1431            let revised_due = this.set_due(due).await.map_err(any_err)?;
1432            lua.to_value(&revised_due)
1433        });
1434
1435        methods.add_async_method(
1436            "set_sender",
1437            move |lua, this, value: mlua::Value| async move {
1438                let sender = match value {
1439                    mlua::Value::String(s) => {
1440                        let s = s.to_str()?;
1441                        EnvelopeAddress::parse(&s).map_err(any_err)?
1442                    }
1443                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1444                };
1445                this.set_sender(sender).await.map_err(any_err)
1446            },
1447        );
1448
1449        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1450            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1451            match recipients.len() {
1452                0 => Ok(mlua::Value::Nil),
1453                1 => {
1454                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1455                    recip.into_lua(&lua)
1456                }
1457                _ => recipients.into_lua(&lua),
1458            }
1459        });
1460
1461        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1462            let recipients = this.recipient_list().await.map_err(any_err)?;
1463            recipients.into_lua(&lua)
1464        });
1465
1466        methods.add_async_method(
1467            "set_recipient",
1468            move |lua, this, value: mlua::Value| async move {
1469                let recipients = match value {
1470                    mlua::Value::String(s) => {
1471                        let s = s.to_str()?;
1472                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1473                    }
1474                    _ => {
1475                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1476                            recips
1477                        } else {
1478                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1479                        }
1480                    }
1481                };
1482                this.set_recipient_list(recipients).await.map_err(any_err)
1483            },
1484        );
1485
1486        #[cfg(feature = "impl")]
1487        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1488            this.dkim_sign(signer).await.map_err(any_err)
1489        });
1490
1491        methods.add_async_method("shrink", |_, this, _: ()| async move {
1492            if this.needs_save() {
1493                this.save(None).await.map_err(any_err)?;
1494            }
1495            this.shrink().map_err(any_err)
1496        });
1497
1498        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1499            if this.needs_save() {
1500                this.save(None).await.map_err(any_err)?;
1501            }
1502            this.shrink_data().map_err(any_err)
1503        });
1504
1505        methods.add_async_method(
1506            "add_authentication_results",
1507            |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1508                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1509                let results = AuthenticationResults {
1510                    serv_id: serv_id.as_bytes().as_ref().into(),
1511                    version: None,
1512                    results,
1513                };
1514
1515                this.prepend_header(
1516                    Some("Authentication-Results"),
1517                    results.encode_value().as_bytes(),
1518                )
1519                .await
1520                .map_err(any_err)?;
1521
1522                Ok(())
1523            },
1524        );
1525
1526        #[cfg(feature = "impl")]
1527        methods.add_async_method(
1528            "arc_verify",
1529            |lua, this, opt_resolver_name: Option<String>| async move {
1530                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1531                lua.to_value_with(&results, serialize_options())
1532            },
1533        );
1534
1535        #[cfg(feature = "impl")]
1536        methods.add_async_method(
1537            "arc_seal",
1538            |lua,
1539             this,
1540             (signer, serv_id, auth_res, opt_resolver_name): (
1541                Signer,
1542                mlua::String,
1543                mlua::Value,
1544                Option<String>,
1545            )| async move {
1546                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1547                this.arc_seal(
1548                    signer,
1549                    AuthenticationResults {
1550                        serv_id: serv_id.as_bytes().as_ref().into(),
1551                        version: None,
1552                        results,
1553                    },
1554                    opt_resolver_name,
1555                )
1556                .await
1557                .map_err(any_err)
1558            },
1559        );
1560
1561        #[cfg(feature = "impl")]
1562        methods.add_async_method(
1563            "dkim_verify",
1564            |lua, this, opt_resolver_name: Option<String>| async move {
1565                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1566                lua.to_value_with(&results, serialize_options())
1567            },
1568        );
1569
1570        methods.add_async_method(
1571            "prepend_header",
1572            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1573                let encode = encode.unwrap_or(false);
1574                if encode {
1575                    let header = Header::new_unstructured(name.clone(), value);
1576                    this.prepend_header(Some(&name), header.get_raw_value())
1577                        .await
1578                        .map_err(any_err)?;
1579                } else {
1580                    this.prepend_header(Some(&name), value.as_bytes())
1581                        .await
1582                        .map_err(any_err)?;
1583                }
1584                Ok(())
1585            },
1586        );
1587        methods.add_async_method(
1588            "append_header",
1589            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1590                let encode = encode.unwrap_or(false);
1591                if encode {
1592                    let header = Header::new_unstructured(name.clone(), value);
1593                    this.append_header(Some(&name), header.get_raw_value())
1594                        .await
1595                        .map_err(any_err)?;
1596                } else {
1597                    this.append_header(Some(&name), value.as_bytes())
1598                        .await
1599                        .map_err(any_err)?;
1600                }
1601                Ok(())
1602            },
1603        );
1604        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1605            this.get_address_header(&name).await.map_err(any_err)
1606        });
1607        methods.add_async_method("from_header", |_, this, ()| async move {
1608            this.get_address_header("From").await.map_err(any_err)
1609        });
1610        methods.add_async_method("to_header", |_, this, ()| async move {
1611            this.get_address_header("To").await.map_err(any_err)
1612        });
1613
1614        methods.add_async_method(
1615            "get_first_named_header_value",
1616            |_, this, name: String| async move {
1617                this.get_first_named_header_value(&name)
1618                    .await
1619                    .map_err(any_err)
1620            },
1621        );
1622        methods.add_async_method(
1623            "get_all_named_header_values",
1624            |_, this, name: String| async move {
1625                this.get_all_named_header_values(&name)
1626                    .await
1627                    .map_err(any_err)
1628            },
1629        );
1630        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1631            Ok(this
1632                .get_all_headers()
1633                .await
1634                .map_err(any_err)?
1635                .into_iter()
1636                .map(|(name, value)| vec![name, value])
1637                .collect::<Vec<Vec<BString>>>())
1638        });
1639        methods.add_async_method(
1640            "import_x_headers",
1641            |_, this, names: Option<Vec<String>>| async move {
1642                this.import_x_headers(names.unwrap_or_default())
1643                    .await
1644                    .map_err(any_err)
1645            },
1646        );
1647
1648        methods.add_async_method(
1649            "remove_x_headers",
1650            |_, this, names: Option<Vec<String>>| async move {
1651                this.remove_x_headers(names.unwrap_or_default())
1652                    .await
1653                    .map_err(any_err)
1654            },
1655        );
1656        methods.add_async_method(
1657            "remove_all_named_headers",
1658            |_, this, name: String| async move {
1659                this.remove_all_named_headers(&name).await.map_err(any_err)
1660            },
1661        );
1662
1663        methods.add_async_method(
1664            "import_scheduling_header",
1665            |lua, this, (header_name, remove): (String, bool)| async move {
1666                let opt_schedule = this
1667                    .import_scheduling_header(&header_name, remove)
1668                    .await
1669                    .map_err(any_err)?;
1670                lua.to_value(&opt_schedule)
1671            },
1672        );
1673
1674        methods.add_async_method(
1675            "set_scheduling",
1676            move |lua, this, params: mlua::Value| async move {
1677                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1678                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1679                lua.to_value(&opt_schedule)
1680            },
1681        );
1682
1683        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1684            let report = this.parse_rfc3464().await.map_err(any_err)?;
1685            match report {
1686                Some(report) => lua.to_value_with(&report, serialize_options()),
1687                None => Ok(mlua::Value::Nil),
1688            }
1689        });
1690
1691        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1692            let report = this.parse_rfc5965().await.map_err(any_err)?;
1693            match report {
1694                Some(report) => lua.to_value_with(&report, serialize_options()),
1695                None => Ok(mlua::Value::Nil),
1696            }
1697        });
1698
1699        methods.add_async_method("save", |_, this, ()| async move {
1700            this.save(None).await.map_err(any_err)
1701        });
1702
1703        methods.add_method("set_force_sync", move |_, this, force: bool| {
1704            this.set_force_sync(force);
1705            Ok(())
1706        });
1707
1708        methods.add_async_method(
1709            "check_fix_conformance",
1710            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1711                use std::str::FromStr;
1712                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1713                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1714
1715                let settings = match settings {
1716                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1717                    None => None,
1718                };
1719
1720                match this
1721                    .check_fix_conformance(check, fix, settings.as_ref())
1722                    .await
1723                {
1724                    Ok(_) => Ok(None),
1725                    Err(err) => Ok(Some(format!("{err:#}"))),
1726                }
1727            },
1728        );
1729    }
1730}
1731
1732impl TimerEntryWithDelay for WeakMessage {
1733    fn delay(&self) -> Duration {
1734        match self.upgrade() {
1735            None => {
1736                // Dangling/Cancelled. Make it appear due immediately
1737                Duration::from_millis(0)
1738            }
1739            Some(msg) => msg.delay(),
1740        }
1741    }
1742}
1743
1744impl TimerEntryWithDelay for Message {
1745    fn delay(&self) -> Duration {
1746        let inner = self.msg_and_id.inner.lock();
1747        match inner.due {
1748            Some(time) => {
1749                let now = Utc::now();
1750                let delta = time - now;
1751                delta.to_std().unwrap_or(Duration::from_millis(0))
1752            }
1753            None => Duration::from_millis(0),
1754        }
1755    }
1756}
1757
1758#[cfg(test)]
1759pub(crate) mod test {
1760    use super::*;
1761    use serde_json::json;
1762
1763    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1764        Message::new_dirty(
1765            SpoolId::new(),
1766            EnvelopeAddress::parse("sender@example.com").unwrap(),
1767            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1768            serde_json::json!({}),
1769            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1770        )
1771        .unwrap()
1772    }
1773
1774    fn data_as_string(msg: &Message) -> String {
1775        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1776    }
1777
1778    const X_HDR_CONTENT: &str =
1779        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1780
1781    #[tokio::test]
1782    async fn import_all_x_headers() {
1783        let msg = new_msg_body(X_HDR_CONTENT);
1784
1785        msg.import_x_headers(vec![]).await.unwrap();
1786        k9::assert_equal!(
1787            msg.get_meta_obj().await.unwrap(),
1788            json!({
1789                "x_hello": "there",
1790                "x_header": "value",
1791            })
1792        );
1793    }
1794
1795    #[tokio::test]
1796    async fn meta_and_nil() {
1797        let msg = new_msg_body(X_HDR_CONTENT);
1798        // Ensure that json null round-trips
1799        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1800        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1801
1802        // and that it is exposed to lua as nil
1803        let lua = mlua::Lua::new();
1804        lua.globals().set("msg", msg).unwrap();
1805        lua.load("assert(msg:get_meta('test') == nil)")
1806            .exec()
1807            .unwrap();
1808    }
1809
1810    #[tokio::test]
1811    async fn import_some_x_headers() {
1812        let msg = new_msg_body(X_HDR_CONTENT);
1813
1814        msg.import_x_headers(vec!["x-hello".to_string()])
1815            .await
1816            .unwrap();
1817        k9::assert_equal!(
1818            msg.get_meta_obj().await.unwrap(),
1819            json!({
1820                "x_hello": "there",
1821            })
1822        );
1823    }
1824
1825    #[tokio::test]
1826    async fn remove_all_x_headers() {
1827        let msg = new_msg_body(X_HDR_CONTENT);
1828
1829        msg.remove_x_headers(vec![]).await.unwrap();
1830        k9::assert_equal!(
1831            data_as_string(&msg),
1832            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1833        );
1834    }
1835
1836    #[tokio::test]
1837    async fn prepend_header_2_params() {
1838        let msg = new_msg_body(X_HDR_CONTENT);
1839
1840        msg.prepend_header(Some("Date"), b"Today").await.unwrap();
1841        k9::assert_equal!(
1842            data_as_string(&msg),
1843            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1844        );
1845    }
1846
1847    #[tokio::test]
1848    async fn prepend_header_1_params() {
1849        let msg = new_msg_body(X_HDR_CONTENT);
1850
1851        msg.prepend_header(None, b"Date: Today").await.unwrap();
1852        k9::assert_equal!(
1853            data_as_string(&msg),
1854            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1855        );
1856    }
1857
1858    #[tokio::test]
1859    async fn append_header_2_params() {
1860        let msg = new_msg_body(X_HDR_CONTENT);
1861
1862        msg.append_header(Some("Date"), b"Today").await.unwrap();
1863        k9::assert_equal!(
1864            data_as_string(&msg),
1865            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1866        );
1867    }
1868
1869    #[tokio::test]
1870    async fn append_header_1_params() {
1871        let msg = new_msg_body(X_HDR_CONTENT);
1872
1873        msg.append_header(None, b"Date: Today").await.unwrap();
1874        k9::assert_equal!(
1875            data_as_string(&msg),
1876            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1877        );
1878    }
1879
1880    const MULTI_HEADER_CONTENT: &str =
1881        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1882
1883    #[tokio::test]
1884    async fn get_first_header() {
1885        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1886        k9::assert_equal!(
1887            msg.get_first_named_header_value("X-header")
1888                .await
1889                .unwrap()
1890                .unwrap(),
1891            "value"
1892        );
1893    }
1894
1895    #[tokio::test]
1896    async fn get_all_header() {
1897        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1898        k9::assert_equal!(
1899            msg.get_all_named_header_values("X-header").await.unwrap(),
1900            vec!["value".to_string(), "another value".to_string()]
1901        );
1902    }
1903
1904    #[tokio::test]
1905    async fn remove_first() {
1906        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1907        msg.remove_first_named_header("X-header").await.unwrap();
1908        k9::assert_equal!(
1909            data_as_string(&msg),
1910            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1911        );
1912    }
1913
1914    #[tokio::test]
1915    async fn remove_all() {
1916        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1917        msg.remove_all_named_headers("X-header").await.unwrap();
1918        k9::assert_equal!(
1919            data_as_string(&msg),
1920            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1921        );
1922    }
1923
1924    #[tokio::test]
1925    async fn append_text_plain() {
1926        let msg = new_msg_body(MULTI_HEADER_CONTENT);
1927        msg.append_text_plain("I am at the bottom").await.unwrap();
1928        k9::assert_equal!(
1929            data_as_string(&msg),
1930            "X-Hello: there\r\n\
1931             X-Header: value\r\n\
1932             Subject: Hello\r\n\
1933             X-Header: another value\r\n\
1934             From :Someone@somewhere\r\n\
1935             Content-Type: text/plain;\r\n\
1936             \tcharset=\"us-ascii\"\r\n\
1937             \r\n\
1938             Body\r\n\
1939             I am at the bottom\r\n"
1940        );
1941    }
1942
1943    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1944\tboundary=\"my-boundary\"\r\n\
1945\r\n\
1946--my-boundary\r\n\
1947Content-Type: text/plain;\r\n\
1948\tcharset=\"us-ascii\"\r\n\
1949\r\n\
1950plain text\r\n\
1951--my-boundary\r\n\
1952Content-Type: text/html;\r\n\
1953\tcharset=\"us-ascii\"\r\n\
1954\r\n\
1955<b>rich</b> text\r\n\
1956--my-boundary\r\n\
1957Content-Type: application/octet-stream\r\n\
1958Content-Transfer-Encoding: base64\r\n\
1959Content-Disposition: attachment;\r\n\
1960\tfilename=\"woot.bin\"\r\n\
1961Content-ID: <woot.id@somewhere>\r\n\
1962\r\n\
1963AAECAw==\r\n\
1964--my-boundary--\r\n\
1965\r\n";
1966
1967    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1968\tboundary=\"my-boundary\"\r\n\
1969\r\n\
1970--my-boundary\r\n\
1971Content-Type: text/plain;\r\n\
1972\tcharset=\"us-ascii\"\r\n\
1973\r\n\
1974plain text\r\n\
1975--my-boundary\r\n\
1976Content-Type: text/html;\r\n\
1977\tcharset=\"us-ascii\"\r\n\
1978\r\n\
1979<BODY>\r\n\
1980<b>rich</b> text\r\n\
1981</BODY>\r\n\
1982--my-boundary\r\n\
1983Content-Type: application/octet-stream\r\n\
1984Content-Transfer-Encoding: base64\r\n\
1985Content-Disposition: attachment;\r\n\
1986\tfilename=\"woot.bin\"\r\n\
1987Content-ID: <woot.id>\r\n\
1988\r\n\
1989AAECAw==\r\n\
1990--my-boundary--\r\n\
1991\r\n";
1992
1993    #[tokio::test]
1994    async fn append_text_html() {
1995        let msg = new_msg_body(MIXED_CONTENT);
1996        msg.append_text_html("bottom html").await.unwrap();
1997        k9::snapshot!(
1998            data_as_string(&msg),
1999            r#"
2000Content-Type: multipart/mixed;\r
2001\tboundary="my-boundary"\r
2002\r
2003--my-boundary\r
2004Content-Type: text/plain;\r
2005\tcharset="us-ascii"\r
2006\r
2007plain text\r
2008--my-boundary\r
2009Content-Type: text/html;\r
2010\tcharset="us-ascii"\r
2011\r
2012<b>rich</b> text\r
2013\r
2014bottom html\r
2015--my-boundary\r
2016Content-Type: application/octet-stream\r
2017Content-Transfer-Encoding: base64\r
2018Content-Disposition: attachment;\r
2019\tfilename="woot.bin"\r
2020Content-ID: <woot.id@somewhere>\r
2021\r
2022AAECAw==\r
2023--my-boundary--\r
2024\r
2025
2026"#
2027        );
2028
2029        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2030        msg.append_text_html("bottom html 👻").await.unwrap();
2031        k9::snapshot!(
2032            data_as_string(&msg),
2033            r#"
2034Content-Type: multipart/mixed;\r
2035\tboundary="my-boundary"\r
2036\r
2037--my-boundary\r
2038Content-Type: text/plain;\r
2039\tcharset="us-ascii"\r
2040\r
2041plain text\r
2042--my-boundary\r
2043Content-Type: text/html;\r
2044\tcharset="utf-8"\r
2045Content-Transfer-Encoding: quoted-printable\r
2046\r
2047<BODY>\r
2048<b>rich</b> text\r
2049\r
2050bottom html =F0=9F=91=BB</BODY>\r
2051--my-boundary\r
2052Content-Type: application/octet-stream\r
2053Content-Transfer-Encoding: base64\r
2054Content-Disposition: attachment;\r
2055\tfilename="woot.bin"\r
2056Content-ID: <woot.id>\r
2057\r
2058AAECAw==\r
2059--my-boundary--\r
2060\r
2061
2062"#
2063        );
2064    }
2065
2066    #[tokio::test]
2067    async fn append_text_plain_mixed() {
2068        let msg = new_msg_body(MIXED_CONTENT);
2069        msg.append_text_plain("bottom text 👾").await.unwrap();
2070        k9::snapshot!(
2071            data_as_string(&msg),
2072            r#"
2073Content-Type: multipart/mixed;\r
2074\tboundary="my-boundary"\r
2075\r
2076--my-boundary\r
2077Content-Type: text/plain;\r
2078\tcharset="utf-8"\r
2079Content-Transfer-Encoding: quoted-printable\r
2080\r
2081plain text\r
2082\r
2083bottom text =F0=9F=91=BE\r
2084--my-boundary\r
2085Content-Type: text/html;\r
2086\tcharset="us-ascii"\r
2087\r
2088<b>rich</b> text\r
2089--my-boundary\r
2090Content-Type: application/octet-stream\r
2091Content-Transfer-Encoding: base64\r
2092Content-Disposition: attachment;\r
2093\tfilename="woot.bin"\r
2094Content-ID: <woot.id@somewhere>\r
2095\r
2096AAECAw==\r
2097--my-boundary--\r
2098\r
2099
2100"#
2101        );
2102    }
2103
2104    #[tokio::test]
2105    async fn check_conformance_angle_msg_id() {
2106        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2107Message-ID: <<1234@example.com>>\r
2108\r
2109Hello";
2110        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2111        k9::snapshot!(
2112            msg.check_fix_conformance(
2113                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2114                MessageConformance::empty(),
2115                None,
2116            )
2117            .await
2118            .unwrap_err()
2119            .to_string(),
2120            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2121        );
2122
2123        msg.check_fix_conformance(
2124            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2125            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2126            None,
2127        )
2128        .await
2129        .unwrap();
2130
2131        // Can't use a snapshot test here because the fixed header
2132        // has a unique random component
2133        /*
2134                k9::snapshot!(
2135                    data_as_string(&msg),
2136                    r#"
2137        Subject: hello\r
2138        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2139        \r
2140        Hello
2141        "#
2142                );
2143        */
2144
2145        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2146Message-ID: <<1234@example.com>>\r
2147\r
2148Hello this is a really long line Hello this is a really long line \
2149Hello this is a really long line Hello this is a really long line \
2150Hello this is a really long line Hello this is a really long line \
2151Hello this is a really long line Hello this is a really long line \
2152Hello this is a really long line Hello this is a really long line \
2153Hello this is a really long line Hello this is a really long line \
2154Hello this is a really long line Hello this is a really long line
2155";
2156        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2157        msg.check_fix_conformance(
2158            MessageConformance::MISSING_COLON_VALUE,
2159            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2160            None,
2161        )
2162        .await
2163        .unwrap();
2164
2165        // Can't use a snapshot test here because the fixed header
2166        // has a random component
2167        /*
2168                k9::snapshot!(
2169                    data_as_string(&msg),
2170                    r#"
2171        Content-Type: text/plain;\r
2172        \tcharset="us-ascii"\r
2173        Content-Transfer-Encoding: quoted-printable\r
2174        Subject: hello\r
2175        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2176        \r
2177        Hello this is a really long line Hello this is a really long line Hello thi=\r
2178        s is a really long line Hello this is a really long line Hello this is a re=\r
2179        ally long line Hello this is a really long line Hello this is a really long=\r
2180         line Hello this is a really long line Hello this is a really long line Hel=\r
2181        lo this is a really long line Hello this is a really long line Hello this i=\r
2182        s a really long line Hello this is a really long line Hello this is a reall=\r
2183        y long line=0A\r
2184
2185        "#
2186                );
2187        */
2188    }
2189
2190    #[tokio::test]
2191    async fn check_conformance() {
2192        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2193        msg.check_fix_conformance(
2194            MessageConformance::default(),
2195            MessageConformance::MISSING_MIME_VERSION,
2196            None,
2197        )
2198        .await
2199        .unwrap();
2200        k9::snapshot!(
2201            data_as_string(&msg),
2202            r#"
2203X-Hello: there\r
2204X-Header: value\r
2205Subject: Hello\r
2206X-Header: another value\r
2207From :Someone@somewhere\r
2208Mime-Version: 1.0\r
2209\r
2210Body
2211"#
2212        );
2213
2214        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2215        msg.check_fix_conformance(
2216            MessageConformance::default(),
2217            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2218            None,
2219        )
2220        .await
2221        .unwrap();
2222        k9::snapshot!(
2223            data_as_string(&msg),
2224            r#"
2225Content-Type: text/plain;\r
2226\tcharset="us-ascii"\r
2227X-Hello: there\r
2228X-Header: value\r
2229Subject: Hello\r
2230X-Header: another value\r
2231From: <Someone@somewhere>\r
2232Mime-Version: 1.0\r
2233\r
2234Body\r
2235
2236"#
2237        );
2238    }
2239
2240    #[tokio::test]
2241    async fn check_fix_latin_input() {
2242        const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2243        let msg = new_msg_body(&*POUNDS);
2244        msg.check_fix_conformance(
2245            MessageConformance::default(),
2246            MessageConformance::NEEDS_TRANSFER_ENCODING,
2247            Some(&CheckFixSettings {
2248                detect_encoding: true,
2249                include_encodings: vec!["iso-8859-1".to_string()],
2250                ..Default::default()
2251            }),
2252        )
2253        .await
2254        .unwrap();
2255
2256        let subject = msg
2257            .get_first_named_header_value("subject")
2258            .await
2259            .unwrap()
2260            .unwrap();
2261        assert_eq!(subject, "£");
2262    }
2263
2264    #[tokio::test]
2265    async fn set_scheduling() -> anyhow::Result<()> {
2266        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2267        assert!(msg.get_due().is_none(), "due is implicitly now");
2268
2269        let now = Utc::now();
2270        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2271
2272        msg.set_scheduling(Some(Scheduling {
2273            restriction: None,
2274            first_attempt: Some((now + one_day).into()),
2275            expires: None,
2276        }))
2277        .await?;
2278
2279        let due = msg.get_due().expect("due to now be set");
2280        assert!(due - now >= one_day, "due time is at least 1 day away");
2281
2282        Ok(())
2283    }
2284
2285    #[cfg(all(test, target_pointer_width = "64"))]
2286    #[test]
2287    fn sizes() {
2288        assert_eq!(std::mem::size_of::<Message>(), 8);
2289        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2290        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2291    }
2292}