message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options, SerdeWrappedValue};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24    CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::collections::{BTreeMap, HashSet};
37use std::hash::Hash;
38use std::sync::{Arc, LazyLock, Weak};
39use std::time::{Duration, Instant};
40use timeq::TimerEntryWithDelay;
41
42bitflags::bitflags! {
43    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
44    struct MessageFlags: u8 {
45        /// true if Metadata needs to be saved
46        const META_DIRTY = 1;
47        /// true if Data needs to be saved
48        const DATA_DIRTY = 2;
49        /// true if scheduling restrictions are present in the metadata
50        const SCHEDULED = 4;
51        /// true if high durability writes should always be used
52        const FORCE_SYNC = 8;
53    }
54}
55
56declare_metric! {
57/// Total number of Message objects.
58///
59/// This encompasses all Message objects in various states, whether
60/// they are in a queue, moving between queues, being built as part
61/// of an injection, pending logging, message metadata and/or data
62/// may be either resident or offloaded to spool.
63static MESSAGE_COUNT: IntGauge("message_count");
64}
65
66declare_metric! {
67/// Total number of Message objects with metadata loaded.
68///
69/// Tracks how many messages have their `meta` data resident
70/// in memory.  This may be because they have not yet saved
71/// it, or because the message is being processed and the
72/// metadata is required for that processing.
73static META_COUNT: IntGauge("message_meta_resident_count");
74}
75
76declare_metric! {
77/// Total number of Message objects with body data loaded.
78///
79/// Tracks how many messages have their `data` resident
80/// in memory.  This may be because they have not yet saved
81/// it, or because the message is being processed and the
82/// data is either required to be in memory in order to
83/// deliver the message, or because logging or other
84/// post-injection policy is configured to operate on
85/// the message.
86static DATA_COUNT: IntGauge("message_data_resident_count");
87}
88
89static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
90
91declare_metric! {
92/// How many seconds it takes to save a message to spool.
93///
94/// This metric encompasses the elapsed time to saved
95/// either or both the `meta` and `data` portions of a
96/// message to spool.
97///
98/// High values indicate IO pressure which may be
99/// alleviated by tuning other constraints and/or
100/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
101static SAVE_HIST: Histogram("message_save_latency");
102}
103
104declare_metric! {
105/// How many seconds it takes to load message data from spool.
106///
107/// High values indicate IO pressure which may be caused
108/// by policy that operates on the message body post-reception.
109/// We recommend *avoiding* logging header values as that is
110/// the most common cause of this metric spiking and has
111/// the biggest impact in resolving it.
112///
113/// IO pressure may also be alleviated by tuning other constraints and/or
114/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
115static LOAD_DATA_HIST: Histogram("message_data_load_latency");
116}
117
118declare_metric! {
119/// How long it takes to load message metadata from spool
120///
121/// High values indicate IO pressure which may be
122/// alleviated by tuning other constraints and/or
123/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
124static LOAD_META_HIST: Histogram("message_meta_load_latency");
125}
126
127#[derive(Debug)]
128struct MessageInner {
129    metadata: Option<Box<MetaData>>,
130    data: Arc<Box<[u8]>>,
131    flags: MessageFlags,
132    num_attempts: u16,
133    due: Option<DateTime<Utc>>,
134}
135
136#[derive(Debug)]
137pub(crate) struct MessageWithId {
138    id: SpoolId,
139    inner: Mutex<MessageInner>,
140    link: LinkedListAtomicLink,
141}
142
143intrusive_adapter!(
144    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
145);
146
147/// A list of messages with an O(1) list overhead; no additional
148/// memory per-message is required to track this list.
149/// However, a given Message can only belong to one instance
150/// of such a list at a time.
151pub struct MessageList {
152    list: LinkedList<MessageWithIdAdapter>,
153    len: usize,
154}
155
156impl Default for MessageList {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl MessageList {
163    /// Create a new MessageList
164    pub fn new() -> Self {
165        Self {
166            list: LinkedList::new(MessageWithIdAdapter::default()),
167            len: 0,
168        }
169    }
170
171    /// Returns the number of elements contained in the list
172    pub fn len(&self) -> usize {
173        self.len
174    }
175
176    pub fn is_empty(&self) -> bool {
177        self.len == 0
178    }
179
180    /// Take all of the elements from this list and return them
181    /// in a new separate instance of MessageList.
182    pub fn take(&mut self) -> Self {
183        let new_list = Self {
184            list: self.list.take(),
185            len: self.len,
186        };
187        self.len = 0;
188        new_list
189    }
190
191    /// Push a message to the back of the list
192    pub fn push_back(&mut self, message: Message) {
193        self.list.push_back(message.msg_and_id);
194        self.len += 1;
195    }
196
197    /// Pop a message from the front of the list
198    pub fn pop_front(&mut self) -> Option<Message> {
199        self.list.pop_front().map(|msg_and_id| {
200            self.len -= 1;
201            Message { msg_and_id }
202        })
203    }
204
205    /// Pop a message from the back of the list
206    pub fn pop_back(&mut self) -> Option<Message> {
207        self.list.pop_back().map(|msg_and_id| {
208            self.len -= 1;
209            Message { msg_and_id }
210        })
211    }
212
213    /// Pop all of the messages from this list into a vector
214    /// of messages.
215    /// Memory usage is O(number-of-messages).
216    pub fn drain(&mut self) -> Vec<Message> {
217        let mut messages = Vec::with_capacity(self.len);
218        while let Some(msg) = self.pop_front() {
219            messages.push(msg);
220        }
221        messages
222    }
223
224    pub fn extend_from_iter<I>(&mut self, iter: I)
225    where
226        I: Iterator<Item = Message>,
227    {
228        for msg in iter {
229            self.push_back(msg)
230        }
231    }
232}
233
234impl IntoIterator for MessageList {
235    type Item = Message;
236    type IntoIter = MessageListIter;
237    fn into_iter(self) -> MessageListIter {
238        MessageListIter { list: self.list }
239    }
240}
241
242pub struct MessageListIter {
243    list: LinkedList<MessageWithIdAdapter>,
244}
245
246impl Iterator for MessageListIter {
247    type Item = Message;
248    fn next(&mut self) -> Option<Message> {
249        self.list
250            .pop_front()
251            .map(|msg_and_id| Message { msg_and_id })
252    }
253}
254
255#[derive(Clone, Debug)]
256#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
257pub struct Message {
258    pub(crate) msg_and_id: Arc<MessageWithId>,
259}
260
261impl PartialEq for Message {
262    fn eq(&self, other: &Self) -> bool {
263        self.id() == other.id()
264    }
265}
266impl Eq for Message {}
267
268impl Hash for Message {
269    fn hash<H>(&self, hasher: &mut H)
270    where
271        H: std::hash::Hasher,
272    {
273        self.id().hash(hasher)
274    }
275}
276
277#[derive(Clone, Debug)]
278pub struct WeakMessage {
279    weak: Weak<MessageWithId>,
280}
281
282impl WeakMessage {
283    pub fn upgrade(&self) -> Option<Message> {
284        Some(Message {
285            msg_and_id: self.weak.upgrade()?,
286        })
287    }
288}
289
290#[serde_as]
291#[derive(Debug, Serialize, Deserialize, Clone)]
292pub(crate) struct MetaData {
293    pub sender: EnvelopeAddress,
294    #[serde_as(as = "OneOrMany<_, PreferOne>")]
295    pub recipient: Vec<EnvelopeAddress>,
296    pub meta: serde_json::Value,
297    #[serde(default)]
298    pub schedule: Option<Scheduling>,
299}
300
301impl Drop for MessageInner {
302    fn drop(&mut self) {
303        if self.metadata.is_some() {
304            META_COUNT.dec();
305        }
306        if !self.data.is_empty() {
307            DATA_COUNT.dec();
308        }
309        MESSAGE_COUNT.dec();
310    }
311}
312
313impl Message {
314    /// Create a new message with the supplied data.
315    /// The message meta and data are marked as dirty
316    pub fn new_dirty(
317        id: SpoolId,
318        sender: EnvelopeAddress,
319        recipient: Vec<EnvelopeAddress>,
320        meta: serde_json::Value,
321        data: Arc<Box<[u8]>>,
322    ) -> anyhow::Result<Self> {
323        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
324        MESSAGE_COUNT.inc();
325        DATA_COUNT.inc();
326        META_COUNT.inc();
327        Ok(Self {
328            msg_and_id: Arc::new(MessageWithId {
329                id,
330                inner: Mutex::new(MessageInner {
331                    metadata: Some(Box::new(MetaData {
332                        sender,
333                        recipient,
334                        meta,
335                        schedule: None,
336                    })),
337                    data,
338                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
339                    num_attempts: 0,
340                    due: None,
341                }),
342                link: LinkedListAtomicLink::default(),
343            }),
344        })
345    }
346
347    pub fn weak(&self) -> WeakMessage {
348        WeakMessage {
349            weak: Arc::downgrade(&self.msg_and_id),
350        }
351    }
352
353    /// Helper for creating a message based on spool enumeration.
354    /// Given a spool id and the serialized metadata blob, returns
355    /// a message holding the deserialized version of that metadata.
356    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
357        let metadata: MetaData = serde_json::from_slice(&metadata)?;
358        MESSAGE_COUNT.inc();
359        META_COUNT.inc();
360
361        let flags = if metadata.schedule.is_some() {
362            MessageFlags::SCHEDULED
363        } else {
364            MessageFlags::empty()
365        };
366
367        Ok(Self {
368            msg_and_id: Arc::new(MessageWithId {
369                id,
370                inner: Mutex::new(MessageInner {
371                    metadata: Some(Box::new(metadata)),
372                    data: NO_DATA.clone(),
373                    flags,
374                    num_attempts: 0,
375                    due: None,
376                }),
377                link: LinkedListAtomicLink::default(),
378            }),
379        })
380    }
381
382    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
383        MESSAGE_COUNT.inc();
384        META_COUNT.inc();
385
386        let flags = if metadata.schedule.is_some() {
387            MessageFlags::SCHEDULED
388        } else {
389            MessageFlags::empty()
390        };
391
392        Self {
393            msg_and_id: Arc::new(MessageWithId {
394                id,
395                inner: Mutex::new(MessageInner {
396                    metadata: Some(Box::new(metadata)),
397                    data,
398                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
399                    num_attempts: 0,
400                    due: None,
401                }),
402                link: LinkedListAtomicLink::default(),
403            }),
404        }
405    }
406
407    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
408        let meta_spool = get_meta_spool();
409        let data = meta_spool.load(id).await?;
410        Self::new_from_spool(id, data)
411    }
412
413    pub fn get_num_attempts(&self) -> u16 {
414        let inner = self.msg_and_id.inner.lock();
415        inner.num_attempts
416    }
417
418    pub fn set_num_attempts(&self, num_attempts: u16) {
419        let mut inner = self.msg_and_id.inner.lock();
420        inner.num_attempts = num_attempts;
421    }
422
423    pub fn increment_num_attempts(&self) {
424        let mut inner = self.msg_and_id.inner.lock();
425        inner.num_attempts += 1;
426    }
427
428    pub async fn set_scheduling(
429        &self,
430        scheduling: Option<Scheduling>,
431    ) -> anyhow::Result<Option<Scheduling>> {
432        self.load_meta_if_needed().await?;
433        let mut inner = self.msg_and_id.inner.lock();
434        match &mut inner.metadata {
435            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
436            Some(meta) => {
437                meta.schedule = scheduling;
438                inner
439                    .flags
440                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
441                if let Some(sched) = scheduling {
442                    let due = inner.due.unwrap_or_else(Utc::now);
443                    inner.due = Some(sched.adjust_for_schedule(due));
444                }
445                Ok(scheduling)
446            }
447        }
448    }
449
450    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
451        self.load_meta_if_needed().await?;
452        let inner = self.msg_and_id.inner.lock();
453        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
454    }
455
456    pub fn get_due(&self) -> Option<DateTime<Utc>> {
457        let inner = self.msg_and_id.inner.lock();
458        inner.due
459    }
460
461    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
462        let scale = rand::random::<f32>();
463        let value = (scale * limit as f32) as i64;
464        self.delay_by(seconds(value)?).await
465    }
466
467    pub async fn delay_by(
468        &self,
469        duration: chrono::Duration,
470    ) -> anyhow::Result<Option<DateTime<Utc>>> {
471        let due = Utc::now() + duration;
472        self.set_due(Some(due)).await
473    }
474
475    /// Delay by requested duration, and add up to 1 minute of jitter
476    pub async fn delay_by_and_jitter(
477        &self,
478        duration: chrono::Duration,
479    ) -> anyhow::Result<Option<DateTime<Utc>>> {
480        let scale = rand::random::<f32>();
481        let value = (scale * 60.) as i64;
482        let due = Utc::now() + duration + seconds(value)?;
483        self.set_due(Some(due)).await
484    }
485
486    pub async fn set_due(
487        &self,
488        due: Option<DateTime<Utc>>,
489    ) -> anyhow::Result<Option<DateTime<Utc>>> {
490        let due = {
491            let mut inner = self.msg_and_id.inner.lock();
492
493            if !inner.flags.contains(MessageFlags::SCHEDULED) {
494                // This is the simple, fast-path, common case
495                inner.due = due;
496                return Ok(inner.due);
497            }
498
499            let due = due.unwrap_or_else(Utc::now);
500
501            if let Some(meta) = &inner.metadata {
502                inner.due = match &meta.schedule {
503                    Some(sched) => Some(sched.adjust_for_schedule(due)),
504                    None => Some(due),
505                };
506                return Ok(inner.due);
507            }
508
509            // We'll need to load the metadata to correctly
510            // update the schedule for this message
511            due
512        };
513
514        self.load_meta().await?;
515
516        {
517            let mut inner = self.msg_and_id.inner.lock();
518            match &inner.metadata {
519                Some(meta) => {
520                    inner.due = match &meta.schedule {
521                        Some(sched) => Some(sched.adjust_for_schedule(due)),
522                        None => Some(due),
523                    };
524                    Ok(inner.due)
525                }
526                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
527            }
528        }
529    }
530
531    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
532        let inner = self.msg_and_id.inner.lock();
533        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
534            Some(Arc::clone(&inner.data))
535        } else {
536            None
537        }
538    }
539
540    fn get_meta_if_dirty(&self) -> Option<MetaData> {
541        let inner = self.msg_and_id.inner.lock();
542        if inner.flags.contains(MessageFlags::META_DIRTY) {
543            inner.metadata.as_ref().map(|md| (**md).clone())
544        } else {
545            None
546        }
547    }
548
549    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
550        self.load_meta_if_needed().await?;
551        let inner = self.msg_and_id.inner.lock();
552        inner
553            .metadata
554            .as_ref()
555            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
556            .map(|md| (**md).clone())
557    }
558
559    pub fn set_force_sync(&self, force: bool) {
560        let mut inner = self.msg_and_id.inner.lock();
561        inner.flags.set(MessageFlags::FORCE_SYNC, force);
562    }
563
564    pub fn needs_save(&self) -> bool {
565        let inner = self.msg_and_id.inner.lock();
566        inner.flags.contains(MessageFlags::META_DIRTY)
567            || inner.flags.contains(MessageFlags::DATA_DIRTY)
568    }
569
570    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
571        let _timer = SAVE_HIST.start_timer();
572        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
573            .await
574    }
575
576    pub async fn save_to(
577        &self,
578        meta_spool: &(dyn Spool + Send + Sync),
579        data_spool: &(dyn Spool + Send + Sync),
580        deadline: Option<Instant>,
581    ) -> anyhow::Result<()> {
582        let force_sync = self
583            .msg_and_id
584            .inner
585            .lock()
586            .flags
587            .contains(MessageFlags::FORCE_SYNC);
588
589        let data_fut = if let Some(data) = self.get_data_if_dirty() {
590            anyhow::ensure!(!data.is_empty(), "message data must not be empty");
591            data_spool
592                .store(self.msg_and_id.id, data, force_sync, deadline)
593                .map(|_| true)
594                .boxed()
595        } else {
596            futures::future::ready(false).boxed()
597        };
598        let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
599            let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
600            meta_spool
601                .store(self.msg_and_id.id, meta, force_sync, deadline)
602                .map(|_| true)
603                .boxed()
604        } else {
605            futures::future::ready(false).boxed()
606        };
607
608        // NOTE: if we have a deadline, it is tempting to want to use
609        // timeout_at here to enforce it, but the underlying spool
610        // futures are not guaranteed to be fully cancel safe, which
611        // is why we pass the deadline down to the save method to allow
612        // them to handle timeouts internally.
613        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
614
615        if data_res {
616            self.msg_and_id
617                .inner
618                .lock()
619                .flags
620                .remove(MessageFlags::DATA_DIRTY);
621        }
622        if meta_res {
623            self.msg_and_id
624                .inner
625                .lock()
626                .flags
627                .remove(MessageFlags::META_DIRTY);
628        }
629        Ok(())
630    }
631
632    pub fn id(&self) -> &SpoolId {
633        &self.msg_and_id.id
634    }
635
636    /// Save the data+meta if needed, then release both
637    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
638        self.save(None).await?;
639        self.shrink()
640    }
641
642    /// Save the data+meta if needed, then release just the data
643    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
644        self.save(None).await?;
645        self.shrink_data()
646    }
647
648    pub fn shrink_data(&self) -> anyhow::Result<bool> {
649        let mut inner = self.msg_and_id.inner.lock();
650        let mut did_shrink = false;
651        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
652            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
653        }
654        if !inner.data.is_empty() {
655            DATA_COUNT.dec();
656            did_shrink = true;
657        }
658        if !inner.data.is_empty() {
659            inner.data = NO_DATA.clone();
660            did_shrink = true;
661        }
662        Ok(did_shrink)
663    }
664
665    pub fn shrink(&self) -> anyhow::Result<bool> {
666        let mut inner = self.msg_and_id.inner.lock();
667        let mut did_shrink = false;
668        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
669            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
670        }
671        if inner.flags.contains(MessageFlags::META_DIRTY) {
672            anyhow::bail!("Cannot shrink message: META_DIRTY");
673        }
674        if inner.metadata.take().is_some() {
675            META_COUNT.dec();
676            did_shrink = true;
677        }
678        if !inner.data.is_empty() {
679            DATA_COUNT.dec();
680            did_shrink = true;
681        }
682        if !inner.data.is_empty() {
683            inner.data = NO_DATA.clone();
684            did_shrink = true;
685        }
686        Ok(did_shrink)
687    }
688
689    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
690        self.load_meta_if_needed().await?;
691        let inner = self.msg_and_id.inner.lock();
692        match &inner.metadata {
693            Some(meta) => Ok(meta.sender.clone()),
694            None => anyhow::bail!("Message::sender metadata is not loaded"),
695        }
696    }
697
698    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
699        self.load_meta_if_needed().await?;
700        let mut inner = self.msg_and_id.inner.lock();
701        match &mut inner.metadata {
702            Some(meta) => {
703                meta.sender = sender;
704                inner.flags.set(MessageFlags::DATA_DIRTY, true);
705                Ok(())
706            }
707            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
708        }
709    }
710
711    #[deprecated = "use recipient_list or first_recipient instead"]
712    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
713        self.first_recipient().await
714    }
715
716    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
717        self.load_meta_if_needed().await?;
718        let inner = self.msg_and_id.inner.lock();
719        match &inner.metadata {
720            Some(meta) => match meta.recipient.first() {
721                Some(recip) => Ok(recip.clone()),
722                None => anyhow::bail!("recipient list is empty!?"),
723            },
724            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
725        }
726    }
727
728    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
729        self.load_meta_if_needed().await?;
730        let inner = self.msg_and_id.inner.lock();
731        match &inner.metadata {
732            Some(meta) => Ok(meta.recipient.clone()),
733            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
734        }
735    }
736
737    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
738        self.load_meta_if_needed().await?;
739        let inner = self.msg_and_id.inner.lock();
740        match &inner.metadata {
741            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
742            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
743        }
744    }
745
746    #[deprecated = "use set_recipient_list instead"]
747    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
748        self.set_recipient_list(vec![recipient]).await
749    }
750
751    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
752        self.load_meta_if_needed().await?;
753
754        let mut inner = self.msg_and_id.inner.lock();
755        match &mut inner.metadata {
756            Some(meta) => {
757                meta.recipient = recipient;
758                inner.flags.set(MessageFlags::DATA_DIRTY, true);
759                Ok(())
760            }
761            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
762        }
763    }
764
765    pub fn is_meta_loaded(&self) -> bool {
766        self.msg_and_id.inner.lock().metadata.is_some()
767    }
768
769    pub fn is_data_loaded(&self) -> bool {
770        !self.msg_and_id.inner.lock().data.is_empty()
771    }
772
773    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
774        if self.is_meta_loaded() {
775            return Ok(());
776        }
777        self.load_meta().await
778    }
779
780    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
781        self.load_data_if_needed().await
782    }
783
784    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
785        if self.is_data_loaded() {
786            return Ok(self.get_data_maybe_not_loaded());
787        }
788        self.load_data().await
789    }
790
791    pub async fn load_meta(&self) -> anyhow::Result<()> {
792        let _timer = LOAD_META_HIST.start_timer();
793        self.load_meta_from(&**get_meta_spool()).await
794    }
795
796    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
797        let id = self.id();
798        let data = meta_spool.load(*id).await?;
799        let mut inner = self.msg_and_id.inner.lock();
800        let was_not_loaded = inner.metadata.is_none();
801        let metadata: MetaData = serde_json::from_slice(&data)?;
802        inner.metadata.replace(Box::new(metadata));
803        if was_not_loaded {
804            META_COUNT.inc();
805        }
806        Ok(())
807    }
808
809    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
810        let _timer = LOAD_DATA_HIST.start_timer();
811        self.load_data_from(&**get_data_spool()).await
812    }
813
814    async fn load_data_from(
815        &self,
816        data_spool: &(dyn Spool + Send + Sync),
817    ) -> anyhow::Result<Arc<Box<[u8]>>> {
818        let data = data_spool.load(*self.id()).await?;
819        let mut inner = self.msg_and_id.inner.lock();
820        let was_empty = inner.data.is_empty();
821        inner.data = Arc::new(data.into_boxed_slice());
822        if was_empty {
823            DATA_COUNT.inc();
824        }
825        Ok(inner.data.clone())
826    }
827
828    pub fn assign_data(&self, data: Vec<u8>) {
829        let mut inner = self.msg_and_id.inner.lock();
830        let was_empty = inner.data.is_empty();
831        inner.data = Arc::new(data.into_boxed_slice());
832        inner.flags.set(MessageFlags::DATA_DIRTY, true);
833        if was_empty {
834            DATA_COUNT.inc();
835        }
836    }
837
838    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
839        let inner = self.msg_and_id.inner.lock();
840        inner.data.clone()
841    }
842
843    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
844        &self,
845        key: S,
846        value: V,
847    ) -> anyhow::Result<()> {
848        self.load_meta_if_needed().await?;
849        let mut inner = self.msg_and_id.inner.lock();
850        match &mut inner.metadata {
851            None => anyhow::bail!("set_meta: metadata must be loaded first"),
852            Some(meta) => {
853                let key = key.as_ref();
854                let value = value.into();
855
856                match &mut meta.meta {
857                    serde_json::Value::Object(map) => {
858                        map.insert(key.to_string(), value);
859                    }
860                    _ => anyhow::bail!("metadata is somehow not a json object"),
861                }
862
863                inner.flags.set(MessageFlags::META_DIRTY, true);
864                Ok(())
865            }
866        }
867    }
868
869    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
870        self.load_meta_if_needed().await?;
871        let mut inner = self.msg_and_id.inner.lock();
872        match &mut inner.metadata {
873            None => anyhow::bail!("set_meta: metadata must be loaded first"),
874            Some(meta) => {
875                let key = key.as_ref();
876
877                match &mut meta.meta {
878                    serde_json::Value::Object(map) => {
879                        map.remove(key);
880                    }
881                    _ => anyhow::bail!("metadata is somehow not a json object"),
882                }
883
884                inner.flags.set(MessageFlags::META_DIRTY, true);
885                Ok(())
886            }
887        }
888    }
889
890    /// Retrieve `key` as a String.
891    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
892        &self,
893        key: S,
894    ) -> anyhow::Result<Option<String>> {
895        match self.get_meta(key).await {
896            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
897            Ok(serde_json::Value::Null) => Ok(None),
898            hmm => {
899                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
900            }
901        }
902    }
903
904    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
905        self.load_meta_if_needed().await?;
906        let inner = self.msg_and_id.inner.lock();
907        match &inner.metadata {
908            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
909            Some(meta) => Ok(meta.meta.clone()),
910        }
911    }
912
913    pub async fn get_meta<S: serde_json::value::Index>(
914        &self,
915        key: S,
916    ) -> anyhow::Result<serde_json::Value> {
917        self.load_meta_if_needed().await?;
918        let inner = self.msg_and_id.inner.lock();
919        match &inner.metadata {
920            None => anyhow::bail!("get_meta: metadata must be loaded first"),
921            Some(meta) => match meta.meta.get(key) {
922                Some(value) => Ok(value.clone()),
923                None => Ok(serde_json::Value::Null),
924            },
925        }
926    }
927
928    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
929        self.msg_and_id.id.age(now)
930    }
931
932    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
933        Ok(match self.get_meta_string("queue").await? {
934            Some(name) => name,
935            None => {
936                let name = QueueNameComponents::format(
937                    self.get_meta_string("campaign").await?,
938                    self.get_meta_string("tenant").await?,
939                    self.first_recipient()
940                        .await?
941                        .domain()
942                        .to_string()
943                        .to_lowercase(),
944                    self.get_meta_string("routing_domain").await?,
945                );
946                name.to_string()
947            }
948        })
949    }
950
951    #[cfg(feature = "impl")]
952    pub async fn arc_verify(
953        &self,
954        opt_resolver_name: Option<String>,
955    ) -> anyhow::Result<AuthenticationResult> {
956        let resolver = get_resolver_instance(&opt_resolver_name)?;
957        let data = self.data().await?;
958        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
959
960        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
961        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
962
963        let arc = ARC::verify(&message, &**resolver).await;
964        Ok(arc.authentication_result())
965    }
966
967    #[cfg(feature = "impl")]
968    pub async fn arc_seal(
969        &self,
970        signer: Signer,
971        auth_results: AuthenticationResults,
972        opt_resolver_name: Option<String>,
973    ) -> anyhow::Result<()> {
974        let resolver = get_resolver_instance(&opt_resolver_name)?;
975        let data = self.data().await?;
976        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
977        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
978        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
979        let arc = ARC::verify(&message, &**resolver).await;
980
981        let headers = arc.seal(&message, auth_results, signer.signer())?;
982        if !headers.is_empty() {
983            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
984
985            for hdr in headers {
986                hdr.write_header(&mut new_data).ok();
987            }
988            new_data.extend_from_slice(&data);
989            self.assign_data(new_data);
990        }
991
992        Ok(())
993    }
994
995    #[cfg(feature = "impl")]
996    pub async fn dkim_verify(
997        &self,
998        opt_resolver_name: Option<String>,
999    ) -> anyhow::Result<Vec<AuthenticationResult>> {
1000        let resolver = get_resolver_instance(&opt_resolver_name)?;
1001        let data = self.data().await?;
1002        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1003
1004        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1005        if parsed
1006            .overall_conformance
1007            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1008        {
1009            return Ok(vec![AuthenticationResult {
1010                method: "dkim".into(),
1011                method_version: None,
1012                result: "permerror".into(),
1013                reason: Some("message has non-canonical line endings".into()),
1014                props: Default::default(),
1015            }]);
1016        }
1017        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1018
1019        let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1020        Ok(results)
1021    }
1022
1023    /// Parses the content into an owned MimePart.
1024    /// Changes to that MimePart are NOT reflected in the underlying
1025    /// message; you must re-assign the message data if you wish to modify
1026    /// the message content.
1027    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1028        let data = self.data().await?;
1029        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1030        Ok(MimePart::parse(owned_data)?)
1031    }
1032
1033    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1034        let data = self.data().await?;
1035        Report::parse(&data)
1036    }
1037
1038    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1039        let data = self.data().await?;
1040        ARFReport::parse(&data)
1041    }
1042
1043    pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1044        let data = self.data().await?;
1045        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1046        emit_header(&mut new_data, name, value);
1047        new_data.extend_from_slice(&data);
1048        self.assign_data(new_data);
1049        Ok(())
1050    }
1051
1052    pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1053        let data = self.data().await?;
1054        let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1055        for (idx, window) in data.windows(4).enumerate() {
1056            if window == b"\r\n\r\n" {
1057                let headers = &data[0..idx + 2];
1058                let body = &data[idx + 2..];
1059
1060                new_data.extend_from_slice(headers);
1061                emit_header(&mut new_data, name, value);
1062                new_data.extend_from_slice(body);
1063                self.assign_data(new_data);
1064                return Ok(());
1065            }
1066        }
1067
1068        anyhow::bail!("append_header could not find the end of the header block");
1069    }
1070
1071    pub async fn get_address_header(
1072        &self,
1073        header_name: &str,
1074    ) -> anyhow::Result<Option<HeaderAddressList>> {
1075        let data = self.data().await?;
1076        let HeaderParseResult { headers, .. } =
1077            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1078
1079        match headers.get_first(header_name) {
1080            Some(hdr) => {
1081                let list = hdr.as_address_list()?;
1082                let result: HeaderAddressList = list.into();
1083                Ok(Some(result))
1084            }
1085            None => Ok(None),
1086        }
1087    }
1088
1089    pub async fn get_first_named_header_value(
1090        &self,
1091        name: &str,
1092    ) -> anyhow::Result<Option<BString>> {
1093        let data = self.data().await?;
1094        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1095
1096        match headers.get_first(name) {
1097            Some(hdr) => Ok(Some(
1098                hdr.as_unstructured()
1099                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1100            )),
1101            None => Ok(None),
1102        }
1103    }
1104
1105    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1106        let data = self.data().await?;
1107        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1108
1109        let mut values = vec![];
1110        for hdr in headers.iter_named(name) {
1111            values.push(
1112                hdr.as_unstructured()
1113                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1114            );
1115        }
1116        Ok(values)
1117    }
1118
1119    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1120        let data = self.data().await?;
1121        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1122
1123        let mut values = vec![];
1124        for hdr in headers.iter() {
1125            values.push((
1126                hdr.get_name().into(),
1127                hdr.as_unstructured()
1128                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1129            ));
1130        }
1131        Ok(values)
1132    }
1133
1134    pub async fn retain_headers<F: FnMut(usize, &Header) -> bool>(
1135        &self,
1136        mut func: F,
1137    ) -> anyhow::Result<()> {
1138        let data = self.data().await?;
1139        let mut new_data = Vec::with_capacity(data.len());
1140        let HeaderParseResult {
1141            headers,
1142            body_offset,
1143            ..
1144        } = Header::parse_headers(data.as_ref().as_ref())?;
1145        for (idx, hdr) in headers.iter().enumerate() {
1146            let retain = (func)(idx, hdr);
1147            if !retain {
1148                continue;
1149            }
1150            hdr.write_header(&mut new_data)?;
1151        }
1152        new_data.extend_from_slice(b"\r\n");
1153        new_data.extend_from_slice(&data[body_offset..]);
1154        self.assign_data(new_data);
1155        Ok(())
1156    }
1157
1158    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1159        let mut removed = false;
1160        self.retain_headers(|_, hdr| {
1161            if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1162                removed = true;
1163                false
1164            } else {
1165                true
1166            }
1167        })
1168        .await
1169    }
1170
1171    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1172        let specs = if names.is_empty() {
1173            vec![ImportHeaderSpec {
1174                name: "X-*".to_string(),
1175                ..ImportHeaderSpec::default()
1176            }]
1177        } else {
1178            names
1179                .into_iter()
1180                .map(|name| ImportHeaderSpec {
1181                    name,
1182                    ..ImportHeaderSpec::default()
1183                })
1184                .collect()
1185        };
1186        self.import_headers(specs).await
1187    }
1188
1189    pub async fn import_headers(&self, specs: Vec<ImportHeaderSpec>) -> anyhow::Result<()> {
1190        let compiled: Vec<CompiledImportHeaderSpec> = specs
1191            .into_iter()
1192            .map(CompiledImportHeaderSpec::compile)
1193            .collect::<anyhow::Result<_>>()?;
1194
1195        let data = self.data().await?;
1196        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1197
1198        let mut accumulators: Vec<PerSpecAccumulator> = compiled
1199            .iter()
1200            .map(|s| PerSpecAccumulator::new(s.match_mode))
1201            .collect();
1202        let mut indices_to_remove: HashSet<usize> = HashSet::new();
1203
1204        for (idx, hdr) in headers.iter().enumerate() {
1205            let hdr_name = hdr.get_name();
1206            for (spec_idx, spec) in compiled.iter().enumerate() {
1207                if spec.matches(hdr_name) {
1208                    let key = spec.target_key(&hdr.get_name_lossy());
1209                    let value = hdr.as_unstructured()?.to_str_lossy().to_string();
1210                    accumulators[spec_idx].record(key, value);
1211                    if spec.remove {
1212                        indices_to_remove.insert(idx);
1213                    }
1214                    break;
1215                }
1216            }
1217        }
1218
1219        for acc in accumulators {
1220            acc.write_to(self).await?;
1221        }
1222
1223        if !indices_to_remove.is_empty() {
1224            self.retain_headers(|idx, _| !indices_to_remove.contains(&idx))
1225                .await?;
1226        }
1227
1228        Ok(())
1229    }
1230
1231    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1232        self.retain_headers(|_, hdr| {
1233            if names.is_empty() {
1234                !is_x_header(hdr.get_name())
1235            } else {
1236                !names
1237                    .iter()
1238                    .any(|n| hdr.get_name().eq_ignore_ascii_case(n.as_bytes()))
1239            }
1240        })
1241        .await
1242    }
1243
1244    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1245        self.retain_headers(|_, hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1246            .await
1247    }
1248
1249    #[cfg(feature = "impl")]
1250    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1251        let data = self.data().await?;
1252        let header = if let Some(runtime) = SIGN_POOL.get() {
1253            runtime.spawn_blocking(move || signer.sign(&data)).await??
1254        } else {
1255            signer.sign(&data)?
1256        };
1257        self.prepend_header(None, header.as_bytes()).await?;
1258        Ok(())
1259    }
1260
1261    pub async fn import_scheduling_header(
1262        &self,
1263        header_name: &str,
1264        remove: bool,
1265    ) -> anyhow::Result<Option<Scheduling>> {
1266        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1267            let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1268                format!("{value} from header {header_name} is not a valid Scheduling header")
1269            })?;
1270            let result = self.set_scheduling(Some(sched)).await?;
1271
1272            if remove {
1273                self.remove_all_named_headers(header_name).await?;
1274            }
1275
1276            Ok(result)
1277        } else {
1278            Ok(None)
1279        }
1280    }
1281
1282    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1283        let data = self.data().await?;
1284        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1285        let parts = msg.simplified_structure_pointers()?;
1286        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1287            match p.body()? {
1288                DecodedBody::Text(text) => {
1289                    let mut text = text.as_bytes().to_vec();
1290                    text.push_str("\r\n");
1291                    text.push_str(content);
1292                    p.replace_text_body("text/plain", &*text)?;
1293
1294                    let new_data = msg.to_message_bytes();
1295                    self.assign_data(new_data);
1296                    Ok(true)
1297                }
1298                DecodedBody::Binary(_) => {
1299                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1300                }
1301            }
1302        } else {
1303            Ok(false)
1304        }
1305    }
1306
1307    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1308        let data = self.data().await?;
1309        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1310        let parts = msg.simplified_structure_pointers()?;
1311        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1312            match p.body()? {
1313                DecodedBody::Text(text) => {
1314                    let mut text = text.as_bytes().to_vec();
1315
1316                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1317                        Some(idx) => {
1318                            text.insert_str(idx, content);
1319                            text.insert_str(idx, "\r\n");
1320                        }
1321                        None => {
1322                            // Just append
1323                            text.push_str("\r\n");
1324                            text.push_str(content);
1325                        }
1326                    }
1327
1328                    p.replace_text_body("text/html", &*text)?;
1329
1330                    let new_data = msg.to_message_bytes();
1331                    self.assign_data(new_data);
1332                    Ok(true)
1333                }
1334                DecodedBody::Binary(_) => {
1335                    anyhow::bail!("expected text/html part to be text, but it is binary");
1336                }
1337            }
1338        } else {
1339            Ok(false)
1340        }
1341    }
1342
1343    pub async fn check_fix_conformance(
1344        &self,
1345        check: MessageConformance,
1346        fix: MessageConformance,
1347        settings: Option<&CheckFixSettings>,
1348    ) -> anyhow::Result<()> {
1349        let data = self.data().await?;
1350        let data_bytes = data.as_ref().as_ref();
1351        let msg = MimePart::parse(data_bytes)?;
1352
1353        let mut settings = settings.map(Clone::clone).unwrap_or_default();
1354
1355        if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1356            && settings.message_id.is_none()
1357            && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1358        {
1359            let sender = self.sender().await?;
1360            let domain = sender.domain();
1361            let id = *self.id();
1362            settings.message_id.replace(format!("{id}@{domain}"));
1363        }
1364
1365        if settings.detect_encoding {
1366            settings.data_bytes.replace(data.clone());
1367        }
1368
1369        let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1370
1371        if let Some(msg) = opt_msg {
1372            let new_data = msg.to_message_bytes();
1373            self.assign_data(new_data);
1374        }
1375
1376        Ok(())
1377    }
1378}
1379
1380fn imported_header_name(name: &str) -> String {
1381    name.chars()
1382        .map(|c| match c.to_ascii_lowercase() {
1383            '-' => '_',
1384            c => c,
1385        })
1386        .collect()
1387}
1388
1389fn is_x_header(name: &[u8]) -> bool {
1390    name.starts_with_str("X-") || name.starts_with_str("x-")
1391}
1392
1393#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1394#[serde(rename_all = "snake_case")]
1395pub enum MatchMode {
1396    First,
1397    #[default]
1398    Last,
1399    All,
1400}
1401
1402#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1403#[serde(rename_all = "snake_case")]
1404pub enum NameTransform {
1405    #[default]
1406    SnakeCase,
1407    KebabCase,
1408    CamelCase,
1409    PascalCase,
1410}
1411
1412#[derive(Debug, Clone, Deserialize)]
1413#[serde(deny_unknown_fields)]
1414pub struct ImportHeaderSpec {
1415    pub name: String,
1416    #[serde(default, rename = "match")]
1417    pub match_mode: MatchMode,
1418    #[serde(default)]
1419    pub transform: NameTransform,
1420    #[serde(default)]
1421    pub target: Option<String>,
1422    #[serde(default)]
1423    pub remove: bool,
1424}
1425
1426impl Default for ImportHeaderSpec {
1427    fn default() -> Self {
1428        Self {
1429            name: String::new(),
1430            match_mode: MatchMode::default(),
1431            transform: NameTransform::default(),
1432            target: None,
1433            remove: false,
1434        }
1435    }
1436}
1437
1438#[derive(Debug, Clone)]
1439enum HeaderPattern {
1440    /// Case-insensitive exact match.
1441    Exact(String),
1442    /// Matches any header whose name (case-insensitively) starts with the
1443    /// given prefix. Built from a pattern like `X-*`.
1444    Prefix(String),
1445}
1446
1447#[derive(Debug, Clone)]
1448struct CompiledImportHeaderSpec {
1449    pattern: HeaderPattern,
1450    match_mode: MatchMode,
1451    transform: NameTransform,
1452    target: Option<String>,
1453    remove: bool,
1454}
1455
1456impl CompiledImportHeaderSpec {
1457    fn compile(spec: ImportHeaderSpec) -> anyhow::Result<Self> {
1458        let pattern = compile_header_pattern(&spec.name)?;
1459        if spec.target.is_some() && matches!(pattern, HeaderPattern::Prefix(_)) {
1460            anyhow::bail!(
1461                "import_headers: `target` cannot be used with wildcard pattern {:?}",
1462                spec.name
1463            );
1464        }
1465        Ok(Self {
1466            pattern,
1467            match_mode: spec.match_mode,
1468            transform: spec.transform,
1469            target: spec.target,
1470            remove: spec.remove,
1471        })
1472    }
1473
1474    fn matches(&self, hdr_name: &[u8]) -> bool {
1475        match &self.pattern {
1476            HeaderPattern::Exact(s) => hdr_name.eq_ignore_ascii_case(s.as_bytes()),
1477            HeaderPattern::Prefix(p) => {
1478                hdr_name.len() >= p.len() && hdr_name[..p.len()].eq_ignore_ascii_case(p.as_bytes())
1479            }
1480        }
1481    }
1482
1483    fn target_key(&self, matched_name: &str) -> String {
1484        if let Some(target) = &self.target {
1485            return target.clone();
1486        }
1487        apply_name_transform(matched_name, self.transform)
1488    }
1489}
1490
1491fn compile_header_pattern(pat: &str) -> anyhow::Result<HeaderPattern> {
1492    if pat.is_empty() {
1493        anyhow::bail!("import_headers: header name pattern must not be empty");
1494    }
1495    let star_count = pat.chars().filter(|c| *c == '*').count();
1496    if star_count == 0 {
1497        return Ok(HeaderPattern::Exact(pat.to_string()));
1498    }
1499    let Some(prefix) = pat.strip_suffix('*') else {
1500        anyhow::bail!(
1501            "import_headers: only a single trailing `*` is supported in pattern {:?}",
1502            pat
1503        );
1504    };
1505    if star_count > 1 {
1506        anyhow::bail!(
1507            "import_headers: only a single trailing `*` is supported in pattern {:?}",
1508            pat
1509        );
1510    }
1511    if prefix.is_empty() {
1512        anyhow::bail!("import_headers: bare `*` patterns are not supported");
1513    }
1514    Ok(HeaderPattern::Prefix(prefix.to_string()))
1515}
1516
1517fn apply_name_transform(name: &str, transform: NameTransform) -> String {
1518    match transform {
1519        NameTransform::SnakeCase => imported_header_name(name),
1520        NameTransform::KebabCase => name
1521            .split('-')
1522            .map(|p| p.to_ascii_lowercase())
1523            .collect::<Vec<_>>()
1524            .join("-"),
1525        NameTransform::CamelCase => name
1526            .split('-')
1527            .enumerate()
1528            .map(|(i, p)| {
1529                if i == 0 {
1530                    p.to_ascii_lowercase()
1531                } else {
1532                    titlecase_ascii(p)
1533                }
1534            })
1535            .collect::<Vec<_>>()
1536            .join(""),
1537        NameTransform::PascalCase => name
1538            .split('-')
1539            .map(titlecase_ascii)
1540            .collect::<Vec<_>>()
1541            .join(""),
1542    }
1543}
1544
1545fn titlecase_ascii(s: &str) -> String {
1546    let mut chars = s.chars();
1547    match chars.next() {
1548        None => String::new(),
1549        Some(first) => {
1550            let mut out = String::with_capacity(s.len());
1551            out.push(first.to_ascii_uppercase());
1552            for c in chars {
1553                out.push(c.to_ascii_lowercase());
1554            }
1555            out
1556        }
1557    }
1558}
1559
1560enum AccValue {
1561    Str(String),
1562    Arr(Vec<String>),
1563}
1564
1565struct PerSpecAccumulator {
1566    mode: MatchMode,
1567    by_key: BTreeMap<String, AccValue>,
1568}
1569
1570impl PerSpecAccumulator {
1571    fn new(mode: MatchMode) -> Self {
1572        Self {
1573            mode,
1574            by_key: BTreeMap::new(),
1575        }
1576    }
1577
1578    fn record(&mut self, key: String, value: String) {
1579        match self.mode {
1580            MatchMode::First => {
1581                self.by_key.entry(key).or_insert(AccValue::Str(value));
1582            }
1583            MatchMode::Last => {
1584                self.by_key.insert(key, AccValue::Str(value));
1585            }
1586            MatchMode::All => {
1587                let entry = self
1588                    .by_key
1589                    .entry(key)
1590                    .or_insert_with(|| AccValue::Arr(Vec::new()));
1591                if let AccValue::Arr(v) = entry {
1592                    v.push(value);
1593                }
1594            }
1595        }
1596    }
1597
1598    async fn write_to(self, msg: &Message) -> anyhow::Result<()> {
1599        for (key, value) in self.by_key {
1600            match value {
1601                AccValue::Str(s) => msg.set_meta(key, s).await?,
1602                AccValue::Arr(arr) => {
1603                    let json = serde_json::Value::Array(
1604                        arr.into_iter().map(serde_json::Value::String).collect(),
1605                    );
1606                    msg.set_meta(key, json).await?;
1607                }
1608            }
1609        }
1610        Ok(())
1611    }
1612}
1613
1614fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1615    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1616}
1617
1618fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1619    if let Some(name) = name {
1620        dest.extend_from_slice(name.as_bytes());
1621        dest.extend_from_slice(b": ");
1622    }
1623    dest.extend_from_slice(value);
1624    if !value.ends_with_str("\r\n") {
1625        dest.extend_from_slice(b"\r\n");
1626    }
1627}
1628
1629#[cfg(feature = "impl")]
1630impl UserData for Message {
1631    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1632        methods.add_async_method(
1633            "set_meta",
1634            move |_, this, (name, value): (String, mlua::Value)| async move {
1635                let value = serde_json::value::to_value(value).map_err(any_err)?;
1636                this.set_meta(name, value).await.map_err(any_err)?;
1637                Ok(())
1638            },
1639        );
1640        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1641            let value = this.get_meta(name).await.map_err(any_err)?;
1642            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1643        });
1644        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1645            let data = this.data().await.map_err(any_err)?;
1646            lua.create_string(&*data)
1647        });
1648        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1649            this.assign_data(data.as_bytes().to_vec());
1650            Ok(())
1651        });
1652
1653        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1654            let data = this.data().await.map_err(any_err)?;
1655            let owned_data = BString::new(data.as_ref().to_vec());
1656            let part = MimePart::parse(owned_data).map_err(any_err)?;
1657            Ok(mod_mimepart::PartRef::new(part))
1658        });
1659
1660        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1661            this.append_text_plain(&data).await.map_err(any_err)
1662        });
1663
1664        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1665            this.append_text_html(&data).await.map_err(any_err)
1666        });
1667
1668        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1669        methods.add_async_method("sender", move |_, this, _: ()| async move {
1670            this.sender().await.map_err(any_err)
1671        });
1672
1673        methods.add_method("num_attempts", move |_, this, _: ()| {
1674            Ok(this.get_num_attempts())
1675        });
1676        methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1677            Ok(this.increment_num_attempts())
1678        });
1679
1680        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1681            this.get_queue_name().await.map_err(any_err)
1682        });
1683
1684        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1685            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1686            let revised_due = this.set_due(due).await.map_err(any_err)?;
1687            lua.to_value(&revised_due)
1688        });
1689
1690        methods.add_async_method(
1691            "set_sender",
1692            move |lua, this, value: mlua::Value| async move {
1693                let sender = match value {
1694                    mlua::Value::String(s) => {
1695                        let s = s.to_str()?;
1696                        EnvelopeAddress::parse(&s).map_err(any_err)?
1697                    }
1698                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1699                };
1700                this.set_sender(sender).await.map_err(any_err)
1701            },
1702        );
1703
1704        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1705            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1706            match recipients.len() {
1707                0 => Ok(mlua::Value::Nil),
1708                1 => {
1709                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1710                    recip.into_lua(&lua)
1711                }
1712                _ => recipients.into_lua(&lua),
1713            }
1714        });
1715
1716        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1717            let recipients = this.recipient_list().await.map_err(any_err)?;
1718            recipients.into_lua(&lua)
1719        });
1720
1721        methods.add_async_method(
1722            "set_recipient",
1723            move |lua, this, value: mlua::Value| async move {
1724                let recipients = match value {
1725                    mlua::Value::String(s) => {
1726                        let s = s.to_str()?;
1727                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1728                    }
1729                    _ => {
1730                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1731                            recips
1732                        } else {
1733                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1734                        }
1735                    }
1736                };
1737                this.set_recipient_list(recipients).await.map_err(any_err)
1738            },
1739        );
1740
1741        #[cfg(feature = "impl")]
1742        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1743            this.dkim_sign(signer).await.map_err(any_err)
1744        });
1745
1746        methods.add_async_method("shrink", |_, this, _: ()| async move {
1747            if this.needs_save() {
1748                this.save(None).await.map_err(any_err)?;
1749            }
1750            this.shrink().map_err(any_err)
1751        });
1752
1753        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1754            if this.needs_save() {
1755                this.save(None).await.map_err(any_err)?;
1756            }
1757            this.shrink_data().map_err(any_err)
1758        });
1759
1760        methods.add_async_method(
1761            "add_authentication_results",
1762            |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1763                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1764                let results = AuthenticationResults {
1765                    serv_id: serv_id.as_bytes().as_ref().into(),
1766                    version: None,
1767                    results,
1768                };
1769
1770                this.prepend_header(
1771                    Some("Authentication-Results"),
1772                    results.encode_value().as_bytes(),
1773                )
1774                .await
1775                .map_err(any_err)?;
1776
1777                Ok(())
1778            },
1779        );
1780
1781        #[cfg(feature = "impl")]
1782        methods.add_async_method(
1783            "arc_verify",
1784            |lua, this, opt_resolver_name: Option<String>| async move {
1785                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1786                lua.to_value_with(&results, serialize_options())
1787            },
1788        );
1789
1790        #[cfg(feature = "impl")]
1791        methods.add_async_method(
1792            "arc_seal",
1793            |lua,
1794             this,
1795             (signer, serv_id, auth_res, opt_resolver_name): (
1796                Signer,
1797                mlua::String,
1798                mlua::Value,
1799                Option<String>,
1800            )| async move {
1801                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1802                this.arc_seal(
1803                    signer,
1804                    AuthenticationResults {
1805                        serv_id: serv_id.as_bytes().as_ref().into(),
1806                        version: None,
1807                        results,
1808                    },
1809                    opt_resolver_name,
1810                )
1811                .await
1812                .map_err(any_err)
1813            },
1814        );
1815
1816        #[cfg(feature = "impl")]
1817        methods.add_async_method(
1818            "dkim_verify",
1819            |lua, this, opt_resolver_name: Option<String>| async move {
1820                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1821                lua.to_value_with(&results, serialize_options())
1822            },
1823        );
1824
1825        methods.add_async_method(
1826            "prepend_header",
1827            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1828                let encode = encode.unwrap_or(false);
1829                if encode {
1830                    let header = Header::new_unstructured(name.clone(), value);
1831                    this.prepend_header(Some(&name), header.get_raw_value())
1832                        .await
1833                        .map_err(any_err)?;
1834                } else {
1835                    this.prepend_header(Some(&name), value.as_bytes())
1836                        .await
1837                        .map_err(any_err)?;
1838                }
1839                Ok(())
1840            },
1841        );
1842        methods.add_async_method(
1843            "append_header",
1844            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1845                let encode = encode.unwrap_or(false);
1846                if encode {
1847                    let header = Header::new_unstructured(name.clone(), value);
1848                    this.append_header(Some(&name), header.get_raw_value())
1849                        .await
1850                        .map_err(any_err)?;
1851                } else {
1852                    this.append_header(Some(&name), value.as_bytes())
1853                        .await
1854                        .map_err(any_err)?;
1855                }
1856                Ok(())
1857            },
1858        );
1859        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1860            this.get_address_header(&name).await.map_err(any_err)
1861        });
1862        methods.add_async_method("from_header", |_, this, ()| async move {
1863            this.get_address_header("From").await.map_err(any_err)
1864        });
1865        methods.add_async_method("to_header", |_, this, ()| async move {
1866            this.get_address_header("To").await.map_err(any_err)
1867        });
1868
1869        methods.add_async_method(
1870            "get_first_named_header_value",
1871            |_, this, name: String| async move {
1872                this.get_first_named_header_value(&name)
1873                    .await
1874                    .map_err(any_err)
1875            },
1876        );
1877        methods.add_async_method(
1878            "get_all_named_header_values",
1879            |_, this, name: String| async move {
1880                this.get_all_named_header_values(&name)
1881                    .await
1882                    .map_err(any_err)
1883            },
1884        );
1885        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1886            Ok(this
1887                .get_all_headers()
1888                .await
1889                .map_err(any_err)?
1890                .into_iter()
1891                .map(|(name, value)| vec![name, value])
1892                .collect::<Vec<Vec<BString>>>())
1893        });
1894        methods.add_async_method(
1895            "import_x_headers",
1896            |_, this, names: Option<Vec<String>>| async move {
1897                this.import_x_headers(names.unwrap_or_default())
1898                    .await
1899                    .map_err(any_err)
1900            },
1901        );
1902        methods.add_async_method(
1903            "import_headers",
1904            |_, this, specs: SerdeWrappedValue<Vec<ImportHeaderSpec>>| async move {
1905                this.import_headers(specs.0).await.map_err(any_err)
1906            },
1907        );
1908
1909        methods.add_async_method(
1910            "remove_x_headers",
1911            |_, this, names: Option<Vec<String>>| async move {
1912                this.remove_x_headers(names.unwrap_or_default())
1913                    .await
1914                    .map_err(any_err)
1915            },
1916        );
1917        methods.add_async_method(
1918            "remove_all_named_headers",
1919            |_, this, name: String| async move {
1920                this.remove_all_named_headers(&name).await.map_err(any_err)
1921            },
1922        );
1923
1924        methods.add_async_method(
1925            "import_scheduling_header",
1926            |lua, this, (header_name, remove): (String, bool)| async move {
1927                let opt_schedule = this
1928                    .import_scheduling_header(&header_name, remove)
1929                    .await
1930                    .map_err(any_err)?;
1931                lua.to_value(&opt_schedule)
1932            },
1933        );
1934
1935        methods.add_async_method(
1936            "set_scheduling",
1937            move |lua, this, params: mlua::Value| async move {
1938                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1939                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1940                lua.to_value(&opt_schedule)
1941            },
1942        );
1943
1944        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1945            let report = this.parse_rfc3464().await.map_err(any_err)?;
1946            match report {
1947                Some(report) => lua.to_value_with(&report, serialize_options()),
1948                None => Ok(mlua::Value::Nil),
1949            }
1950        });
1951
1952        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1953            let report = this.parse_rfc5965().await.map_err(any_err)?;
1954            match report {
1955                Some(report) => lua.to_value_with(&report, serialize_options()),
1956                None => Ok(mlua::Value::Nil),
1957            }
1958        });
1959
1960        methods.add_async_method("save", |_, this, ()| async move {
1961            this.save(None).await.map_err(any_err)
1962        });
1963
1964        methods.add_method("set_force_sync", move |_, this, force: bool| {
1965            this.set_force_sync(force);
1966            Ok(())
1967        });
1968
1969        methods.add_async_method(
1970            "check_fix_conformance",
1971            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1972                use std::str::FromStr;
1973                let check = MessageConformance::from_str(&check).map_err(any_err)?;
1974                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1975
1976                let settings = match settings {
1977                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1978                    None => None,
1979                };
1980
1981                match this
1982                    .check_fix_conformance(check, fix, settings.as_ref())
1983                    .await
1984                {
1985                    Ok(_) => Ok(None),
1986                    Err(err) => Ok(Some(format!("{err:#}"))),
1987                }
1988            },
1989        );
1990    }
1991}
1992
1993impl TimerEntryWithDelay for WeakMessage {
1994    fn delay(&self) -> Duration {
1995        match self.upgrade() {
1996            None => {
1997                // Dangling/Cancelled. Make it appear due immediately
1998                Duration::from_millis(0)
1999            }
2000            Some(msg) => msg.delay(),
2001        }
2002    }
2003}
2004
2005impl TimerEntryWithDelay for Message {
2006    fn delay(&self) -> Duration {
2007        let inner = self.msg_and_id.inner.lock();
2008        match inner.due {
2009            Some(time) => {
2010                let now = Utc::now();
2011                let delta = time - now;
2012                delta.to_std().unwrap_or(Duration::from_millis(0))
2013            }
2014            None => Duration::from_millis(0),
2015        }
2016    }
2017}
2018
2019#[cfg(test)]
2020pub(crate) mod test {
2021    use super::*;
2022    use serde_json::json;
2023
2024    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
2025        Message::new_dirty(
2026            SpoolId::new(),
2027            EnvelopeAddress::parse("sender@example.com").unwrap(),
2028            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
2029            serde_json::json!({}),
2030            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
2031        )
2032        .unwrap()
2033    }
2034
2035    fn data_as_string(msg: &Message) -> String {
2036        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
2037    }
2038
2039    const X_HDR_CONTENT: &str =
2040        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
2041
2042    #[tokio::test]
2043    async fn import_all_x_headers() {
2044        let msg = new_msg_body(X_HDR_CONTENT);
2045
2046        msg.import_x_headers(vec![]).await.unwrap();
2047        k9::assert_equal!(
2048            msg.get_meta_obj().await.unwrap(),
2049            json!({
2050                "x_hello": "there",
2051                "x_header": "value",
2052            })
2053        );
2054    }
2055
2056    #[tokio::test]
2057    async fn meta_and_nil() {
2058        let msg = new_msg_body(X_HDR_CONTENT);
2059        // Ensure that json null round-trips
2060        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
2061        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
2062
2063        // and that it is exposed to lua as nil
2064        let lua = mlua::Lua::new();
2065        lua.globals().set("msg", msg).unwrap();
2066        lua.load("assert(msg:get_meta('test') == nil)")
2067            .exec()
2068            .unwrap();
2069    }
2070
2071    #[tokio::test]
2072    async fn import_some_x_headers() {
2073        let msg = new_msg_body(X_HDR_CONTENT);
2074
2075        msg.import_x_headers(vec!["x-hello".to_string()])
2076            .await
2077            .unwrap();
2078        k9::assert_equal!(
2079            msg.get_meta_obj().await.unwrap(),
2080            json!({
2081                "x_hello": "there",
2082            })
2083        );
2084    }
2085
2086    #[tokio::test]
2087    async fn import_headers_wildcard_remove() {
2088        let msg = new_msg_body(X_HDR_CONTENT);
2089
2090        msg.import_headers(vec![ImportHeaderSpec {
2091            name: "X-*".to_string(),
2092            remove: true,
2093            ..ImportHeaderSpec::default()
2094        }])
2095        .await
2096        .unwrap();
2097        k9::assert_equal!(
2098            msg.get_meta_obj().await.unwrap(),
2099            json!({
2100                "x_hello": "there",
2101                "x_header": "value",
2102            })
2103        );
2104        k9::assert_equal!(
2105            data_as_string(&msg),
2106            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2107        );
2108    }
2109
2110    #[tokio::test]
2111    async fn import_headers_match_modes() {
2112        let body =
2113            "Received: from a\r\nReceived: from b\r\nReceived: from c\r\nSubject: hi\r\n\r\nBody";
2114        for (mode, expected) in [
2115            (MatchMode::First, json!("from a")),
2116            (MatchMode::Last, json!("from c")),
2117            (MatchMode::All, json!(["from a", "from b", "from c"])),
2118        ] {
2119            let msg = new_msg_body(body);
2120            msg.import_headers(vec![ImportHeaderSpec {
2121                name: "Received".to_string(),
2122                match_mode: mode,
2123                ..ImportHeaderSpec::default()
2124            }])
2125            .await
2126            .unwrap();
2127            k9::assert_equal!(
2128                msg.get_meta_obj().await.unwrap(),
2129                json!({ "received": expected })
2130            );
2131        }
2132    }
2133
2134    #[tokio::test]
2135    async fn import_headers_no_match_skips_meta() {
2136        let msg = new_msg_body(X_HDR_CONTENT);
2137        msg.import_headers(vec![ImportHeaderSpec {
2138            name: "Nonexistent".to_string(),
2139            match_mode: MatchMode::All,
2140            ..ImportHeaderSpec::default()
2141        }])
2142        .await
2143        .unwrap();
2144        k9::assert_equal!(msg.get_meta_obj().await.unwrap(), json!({}));
2145    }
2146
2147    #[tokio::test]
2148    async fn import_headers_specific_before_wildcard() {
2149        let body = "X-Campaign-Id: 42\r\nX-Mailer: foo\r\n\r\nBody";
2150        let msg = new_msg_body(body);
2151        msg.import_headers(vec![
2152            ImportHeaderSpec {
2153                name: "X-Campaign-Id".to_string(),
2154                target: Some("campaign".to_string()),
2155                ..ImportHeaderSpec::default()
2156            },
2157            ImportHeaderSpec {
2158                name: "X-*".to_string(),
2159                ..ImportHeaderSpec::default()
2160            },
2161        ])
2162        .await
2163        .unwrap();
2164        k9::assert_equal!(
2165            msg.get_meta_obj().await.unwrap(),
2166            json!({
2167                "campaign": "42",
2168                "x_mailer": "foo",
2169            })
2170        );
2171    }
2172
2173    #[test]
2174    fn name_transforms() {
2175        let n = "X-Campaign-Id";
2176        k9::assert_equal!(
2177            apply_name_transform(n, NameTransform::SnakeCase),
2178            "x_campaign_id"
2179        );
2180        k9::assert_equal!(
2181            apply_name_transform(n, NameTransform::KebabCase),
2182            "x-campaign-id"
2183        );
2184        k9::assert_equal!(
2185            apply_name_transform(n, NameTransform::CamelCase),
2186            "xCampaignId"
2187        );
2188        k9::assert_equal!(
2189            apply_name_transform(n, NameTransform::PascalCase),
2190            "XCampaignId"
2191        );
2192    }
2193
2194    #[test]
2195    fn pattern_compile_rejects_bad_patterns() {
2196        compile_header_pattern("").unwrap_err();
2197        compile_header_pattern("*").unwrap_err();
2198        compile_header_pattern("*-Id").unwrap_err();
2199        compile_header_pattern("X-*-Id").unwrap_err();
2200        compile_header_pattern("X-**").unwrap_err();
2201        assert!(matches!(
2202            compile_header_pattern("Subject").unwrap(),
2203            HeaderPattern::Exact(_)
2204        ));
2205        assert!(matches!(
2206            compile_header_pattern("X-*").unwrap(),
2207            HeaderPattern::Prefix(_)
2208        ));
2209    }
2210
2211    #[tokio::test]
2212    async fn import_headers_target_with_wildcard_rejected() {
2213        let msg = new_msg_body(X_HDR_CONTENT);
2214        msg.import_headers(vec![ImportHeaderSpec {
2215            name: "X-*".to_string(),
2216            target: Some("oops".to_string()),
2217            ..ImportHeaderSpec::default()
2218        }])
2219        .await
2220        .unwrap_err();
2221    }
2222
2223    #[tokio::test]
2224    async fn remove_all_x_headers() {
2225        let msg = new_msg_body(X_HDR_CONTENT);
2226
2227        msg.remove_x_headers(vec![]).await.unwrap();
2228        k9::assert_equal!(
2229            data_as_string(&msg),
2230            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2231        );
2232    }
2233
2234    #[tokio::test]
2235    async fn prepend_header_2_params() {
2236        let msg = new_msg_body(X_HDR_CONTENT);
2237
2238        msg.prepend_header(Some("Date"), b"Today").await.unwrap();
2239        k9::assert_equal!(
2240            data_as_string(&msg),
2241            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2242        );
2243    }
2244
2245    #[tokio::test]
2246    async fn prepend_header_1_params() {
2247        let msg = new_msg_body(X_HDR_CONTENT);
2248
2249        msg.prepend_header(None, b"Date: Today").await.unwrap();
2250        k9::assert_equal!(
2251            data_as_string(&msg),
2252            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2253        );
2254    }
2255
2256    #[tokio::test]
2257    async fn append_header_2_params() {
2258        let msg = new_msg_body(X_HDR_CONTENT);
2259
2260        msg.append_header(Some("Date"), b"Today").await.unwrap();
2261        k9::assert_equal!(
2262            data_as_string(&msg),
2263            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2264        );
2265    }
2266
2267    #[tokio::test]
2268    async fn append_header_1_params() {
2269        let msg = new_msg_body(X_HDR_CONTENT);
2270
2271        msg.append_header(None, b"Date: Today").await.unwrap();
2272        k9::assert_equal!(
2273            data_as_string(&msg),
2274            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2275        );
2276    }
2277
2278    const MULTI_HEADER_CONTENT: &str =
2279        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
2280
2281    #[tokio::test]
2282    async fn get_first_header() {
2283        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2284        k9::assert_equal!(
2285            msg.get_first_named_header_value("X-header")
2286                .await
2287                .unwrap()
2288                .unwrap(),
2289            "value"
2290        );
2291    }
2292
2293    #[tokio::test]
2294    async fn get_all_header() {
2295        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2296        k9::assert_equal!(
2297            msg.get_all_named_header_values("X-header").await.unwrap(),
2298            vec!["value".to_string(), "another value".to_string()]
2299        );
2300    }
2301
2302    #[tokio::test]
2303    async fn remove_first() {
2304        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2305        msg.remove_first_named_header("X-header").await.unwrap();
2306        k9::assert_equal!(
2307            data_as_string(&msg),
2308            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
2309        );
2310    }
2311
2312    #[tokio::test]
2313    async fn remove_all() {
2314        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2315        msg.remove_all_named_headers("X-header").await.unwrap();
2316        k9::assert_equal!(
2317            data_as_string(&msg),
2318            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
2319        );
2320    }
2321
2322    #[tokio::test]
2323    async fn append_text_plain() {
2324        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2325        msg.append_text_plain("I am at the bottom").await.unwrap();
2326        k9::assert_equal!(
2327            data_as_string(&msg),
2328            "X-Hello: there\r\n\
2329             X-Header: value\r\n\
2330             Subject: Hello\r\n\
2331             X-Header: another value\r\n\
2332             From :Someone@somewhere\r\n\
2333             Content-Type: text/plain;\r\n\
2334             \tcharset=\"us-ascii\"\r\n\
2335             \r\n\
2336             Body\r\n\
2337             I am at the bottom\r\n"
2338        );
2339    }
2340
2341    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
2342\tboundary=\"my-boundary\"\r\n\
2343\r\n\
2344--my-boundary\r\n\
2345Content-Type: text/plain;\r\n\
2346\tcharset=\"us-ascii\"\r\n\
2347\r\n\
2348plain text\r\n\
2349--my-boundary\r\n\
2350Content-Type: text/html;\r\n\
2351\tcharset=\"us-ascii\"\r\n\
2352\r\n\
2353<b>rich</b> text\r\n\
2354--my-boundary\r\n\
2355Content-Type: application/octet-stream\r\n\
2356Content-Transfer-Encoding: base64\r\n\
2357Content-Disposition: attachment;\r\n\
2358\tfilename=\"woot.bin\"\r\n\
2359Content-ID: <woot.id@somewhere>\r\n\
2360\r\n\
2361AAECAw==\r\n\
2362--my-boundary--\r\n\
2363\r\n";
2364
2365    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2366\tboundary=\"my-boundary\"\r\n\
2367\r\n\
2368--my-boundary\r\n\
2369Content-Type: text/plain;\r\n\
2370\tcharset=\"us-ascii\"\r\n\
2371\r\n\
2372plain text\r\n\
2373--my-boundary\r\n\
2374Content-Type: text/html;\r\n\
2375\tcharset=\"us-ascii\"\r\n\
2376\r\n\
2377<BODY>\r\n\
2378<b>rich</b> text\r\n\
2379</BODY>\r\n\
2380--my-boundary\r\n\
2381Content-Type: application/octet-stream\r\n\
2382Content-Transfer-Encoding: base64\r\n\
2383Content-Disposition: attachment;\r\n\
2384\tfilename=\"woot.bin\"\r\n\
2385Content-ID: <woot.id>\r\n\
2386\r\n\
2387AAECAw==\r\n\
2388--my-boundary--\r\n\
2389\r\n";
2390
2391    #[tokio::test]
2392    async fn append_text_html() {
2393        let msg = new_msg_body(MIXED_CONTENT);
2394        msg.append_text_html("bottom html").await.unwrap();
2395        k9::snapshot!(
2396            data_as_string(&msg),
2397            r#"
2398Content-Type: multipart/mixed;\r
2399\tboundary="my-boundary"\r
2400\r
2401--my-boundary\r
2402Content-Type: text/plain;\r
2403\tcharset="us-ascii"\r
2404\r
2405plain text\r
2406--my-boundary\r
2407Content-Type: text/html;\r
2408\tcharset="us-ascii"\r
2409\r
2410<b>rich</b> text\r
2411\r
2412bottom html\r
2413--my-boundary\r
2414Content-Type: application/octet-stream\r
2415Content-Transfer-Encoding: base64\r
2416Content-Disposition: attachment;\r
2417\tfilename="woot.bin"\r
2418Content-ID: <woot.id@somewhere>\r
2419\r
2420AAECAw==\r
2421--my-boundary--\r
2422\r
2423
2424"#
2425        );
2426
2427        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2428        msg.append_text_html("bottom html 👻").await.unwrap();
2429        k9::snapshot!(
2430            data_as_string(&msg),
2431            r#"
2432Content-Type: multipart/mixed;\r
2433\tboundary="my-boundary"\r
2434\r
2435--my-boundary\r
2436Content-Type: text/plain;\r
2437\tcharset="us-ascii"\r
2438\r
2439plain text\r
2440--my-boundary\r
2441Content-Type: text/html;\r
2442\tcharset="utf-8"\r
2443Content-Transfer-Encoding: quoted-printable\r
2444\r
2445<BODY>\r
2446<b>rich</b> text\r
2447\r
2448bottom html =F0=9F=91=BB</BODY>\r
2449--my-boundary\r
2450Content-Type: application/octet-stream\r
2451Content-Transfer-Encoding: base64\r
2452Content-Disposition: attachment;\r
2453\tfilename="woot.bin"\r
2454Content-ID: <woot.id>\r
2455\r
2456AAECAw==\r
2457--my-boundary--\r
2458\r
2459
2460"#
2461        );
2462    }
2463
2464    #[tokio::test]
2465    async fn append_text_plain_mixed() {
2466        let msg = new_msg_body(MIXED_CONTENT);
2467        msg.append_text_plain("bottom text 👾").await.unwrap();
2468        k9::snapshot!(
2469            data_as_string(&msg),
2470            r#"
2471Content-Type: multipart/mixed;\r
2472\tboundary="my-boundary"\r
2473\r
2474--my-boundary\r
2475Content-Type: text/plain;\r
2476\tcharset="utf-8"\r
2477Content-Transfer-Encoding: quoted-printable\r
2478\r
2479plain text\r
2480\r
2481bottom text =F0=9F=91=BE\r
2482--my-boundary\r
2483Content-Type: text/html;\r
2484\tcharset="us-ascii"\r
2485\r
2486<b>rich</b> text\r
2487--my-boundary\r
2488Content-Type: application/octet-stream\r
2489Content-Transfer-Encoding: base64\r
2490Content-Disposition: attachment;\r
2491\tfilename="woot.bin"\r
2492Content-ID: <woot.id@somewhere>\r
2493\r
2494AAECAw==\r
2495--my-boundary--\r
2496\r
2497
2498"#
2499        );
2500    }
2501
2502    #[tokio::test]
2503    async fn check_conformance_angle_msg_id() {
2504        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2505Message-ID: <<1234@example.com>>\r
2506\r
2507Hello";
2508        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2509        k9::snapshot!(
2510            msg.check_fix_conformance(
2511                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2512                MessageConformance::empty(),
2513                None,
2514            )
2515            .await
2516            .unwrap_err()
2517            .to_string(),
2518            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2519        );
2520
2521        msg.check_fix_conformance(
2522            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2523            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2524            None,
2525        )
2526        .await
2527        .unwrap();
2528
2529        // Can't use a snapshot test here because the fixed header
2530        // has a unique random component
2531        /*
2532                k9::snapshot!(
2533                    data_as_string(&msg),
2534                    r#"
2535        Subject: hello\r
2536        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2537        \r
2538        Hello
2539        "#
2540                );
2541        */
2542
2543        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2544Message-ID: <<1234@example.com>>\r
2545\r
2546Hello this is a really long line Hello this is a really long line \
2547Hello this is a really long line Hello this is a really long line \
2548Hello this is a really long line Hello this is a really long line \
2549Hello this is a really long line Hello this is a really long line \
2550Hello this is a really long line Hello this is a really long line \
2551Hello this is a really long line Hello this is a really long line \
2552Hello this is a really long line Hello this is a really long line
2553";
2554        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2555        msg.check_fix_conformance(
2556            MessageConformance::MISSING_COLON_VALUE,
2557            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2558            None,
2559        )
2560        .await
2561        .unwrap();
2562
2563        // Can't use a snapshot test here because the fixed header
2564        // has a random component
2565        /*
2566                k9::snapshot!(
2567                    data_as_string(&msg),
2568                    r#"
2569        Content-Type: text/plain;\r
2570        \tcharset="us-ascii"\r
2571        Content-Transfer-Encoding: quoted-printable\r
2572        Subject: hello\r
2573        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2574        \r
2575        Hello this is a really long line Hello this is a really long line Hello thi=\r
2576        s is a really long line Hello this is a really long line Hello this is a re=\r
2577        ally long line Hello this is a really long line Hello this is a really long=\r
2578         line Hello this is a really long line Hello this is a really long line Hel=\r
2579        lo this is a really long line Hello this is a really long line Hello this i=\r
2580        s a really long line Hello this is a really long line Hello this is a reall=\r
2581        y long line=0A\r
2582
2583        "#
2584                );
2585        */
2586    }
2587
2588    #[tokio::test]
2589    async fn check_conformance() {
2590        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2591        msg.check_fix_conformance(
2592            MessageConformance::default(),
2593            MessageConformance::MISSING_MIME_VERSION,
2594            None,
2595        )
2596        .await
2597        .unwrap();
2598        k9::snapshot!(
2599            data_as_string(&msg),
2600            r#"
2601X-Hello: there\r
2602X-Header: value\r
2603Subject: Hello\r
2604X-Header: another value\r
2605From :Someone@somewhere\r
2606Mime-Version: 1.0\r
2607\r
2608Body
2609"#
2610        );
2611
2612        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2613        msg.check_fix_conformance(
2614            MessageConformance::default(),
2615            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2616            None,
2617        )
2618        .await
2619        .unwrap();
2620        k9::snapshot!(
2621            data_as_string(&msg),
2622            r#"
2623Content-Type: text/plain;\r
2624\tcharset="us-ascii"\r
2625X-Hello: there\r
2626X-Header: value\r
2627Subject: Hello\r
2628X-Header: another value\r
2629From: <Someone@somewhere>\r
2630Mime-Version: 1.0\r
2631\r
2632Body\r
2633
2634"#
2635        );
2636    }
2637
2638    #[tokio::test]
2639    async fn check_fix_latin_input() {
2640        const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2641        let msg = new_msg_body(&*POUNDS);
2642        msg.check_fix_conformance(
2643            MessageConformance::default(),
2644            MessageConformance::NEEDS_TRANSFER_ENCODING,
2645            Some(&CheckFixSettings {
2646                detect_encoding: true,
2647                include_encodings: vec!["iso-8859-1".to_string()],
2648                ..Default::default()
2649            }),
2650        )
2651        .await
2652        .unwrap();
2653
2654        let subject = msg
2655            .get_first_named_header_value("subject")
2656            .await
2657            .unwrap()
2658            .unwrap();
2659        assert_eq!(subject, "£");
2660    }
2661
2662    #[tokio::test]
2663    async fn set_scheduling() -> anyhow::Result<()> {
2664        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2665        assert!(msg.get_due().is_none(), "due is implicitly now");
2666
2667        let now = Utc::now();
2668        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2669
2670        msg.set_scheduling(Some(Scheduling {
2671            restriction: None,
2672            first_attempt: Some((now + one_day).into()),
2673            expires: None,
2674        }))
2675        .await?;
2676
2677        let due = msg.get_due().expect("due to now be set");
2678        assert!(due - now >= one_day, "due time is at least 1 day away");
2679
2680        Ok(())
2681    }
2682
2683    #[cfg(all(test, target_pointer_width = "64"))]
2684    #[test]
2685    fn sizes() {
2686        assert_eq!(std::mem::size_of::<Message>(), 8);
2687        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2688        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2689    }
2690}