message/
message.rs

1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options, SerdeWrappedValue};
13use futures::{FutureExt, TryFutureExt};
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24    CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::collections::{BTreeMap, HashSet};
37use std::hash::Hash;
38use std::sync::{Arc, LazyLock, Weak};
39use std::time::{Duration, Instant};
40use timeq::TimerEntryWithDelay;
41
42bitflags::bitflags! {
43    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
44    struct MessageFlags: u8 {
45        /// true if Metadata needs to be saved
46        const META_DIRTY = 1;
47        /// true if Data needs to be saved
48        const DATA_DIRTY = 2;
49        /// true if scheduling restrictions are present in the metadata
50        const SCHEDULED = 4;
51        /// true if high durability writes should always be used
52        const FORCE_SYNC = 8;
53    }
54}
55
56declare_metric! {
57/// Total number of Message objects.
58///
59/// This encompasses all Message objects in various states, whether
60/// they are in a queue, moving between queues, being built as part
61/// of an injection, pending logging, message metadata and/or data
62/// may be either resident or offloaded to spool.
63static MESSAGE_COUNT: IntGauge("message_count");
64}
65
66declare_metric! {
67/// Total number of Message objects with metadata loaded.
68///
69/// Tracks how many messages have their `meta` data resident
70/// in memory.  This may be because they have not yet saved
71/// it, or because the message is being processed and the
72/// metadata is required for that processing.
73static META_COUNT: IntGauge("message_meta_resident_count");
74}
75
76declare_metric! {
77/// Total number of Message objects with body data loaded.
78///
79/// Tracks how many messages have their `data` resident
80/// in memory.  This may be because they have not yet saved
81/// it, or because the message is being processed and the
82/// data is either required to be in memory in order to
83/// deliver the message, or because logging or other
84/// post-injection policy is configured to operate on
85/// the message.
86static DATA_COUNT: IntGauge("message_data_resident_count");
87}
88
89static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
90
91declare_metric! {
92/// How many seconds it takes to save a message to spool.
93///
94/// This metric encompasses the elapsed time to saved
95/// either or both the `meta` and `data` portions of a
96/// message to spool.
97///
98/// High values indicate IO pressure which may be
99/// alleviated by tuning other constraints and/or
100/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
101static SAVE_HIST: Histogram("message_save_latency");
102}
103
104declare_metric! {
105/// How many seconds it takes to load message data from spool.
106///
107/// High values indicate IO pressure which may be caused
108/// by policy that operates on the message body post-reception.
109/// We recommend *avoiding* logging header values as that is
110/// the most common cause of this metric spiking and has
111/// the biggest impact in resolving it.
112///
113/// IO pressure may also be alleviated by tuning other constraints and/or
114/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
115static LOAD_DATA_HIST: Histogram("message_data_load_latency");
116}
117
118declare_metric! {
119/// How long it takes to load message metadata from spool
120///
121/// High values indicate IO pressure which may be
122/// alleviated by tuning other constraints and/or
123/// [RocksDB Parameters](../../kumo/define_spool/rocks_params.md)
124static LOAD_META_HIST: Histogram("message_meta_load_latency");
125}
126
127#[derive(Debug)]
128struct MessageInner {
129    metadata: Option<Box<MetaData>>,
130    data: Arc<Box<[u8]>>,
131    flags: MessageFlags,
132    num_attempts: u16,
133    due: Option<DateTime<Utc>>,
134}
135
136#[derive(Debug)]
137pub(crate) struct MessageWithId {
138    id: SpoolId,
139    inner: Mutex<MessageInner>,
140    link: LinkedListAtomicLink,
141}
142
143intrusive_adapter!(
144    pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
145);
146
147/// A list of messages with an O(1) list overhead; no additional
148/// memory per-message is required to track this list.
149/// However, a given Message can only belong to one instance
150/// of such a list at a time.
151pub struct MessageList {
152    list: LinkedList<MessageWithIdAdapter>,
153    len: usize,
154}
155
156impl Default for MessageList {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl MessageList {
163    /// Create a new MessageList
164    pub fn new() -> Self {
165        Self {
166            list: LinkedList::new(MessageWithIdAdapter::default()),
167            len: 0,
168        }
169    }
170
171    /// Returns the number of elements contained in the list
172    pub fn len(&self) -> usize {
173        self.len
174    }
175
176    pub fn is_empty(&self) -> bool {
177        self.len == 0
178    }
179
180    /// Take all of the elements from this list and return them
181    /// in a new separate instance of MessageList.
182    pub fn take(&mut self) -> Self {
183        let new_list = Self {
184            list: self.list.take(),
185            len: self.len,
186        };
187        self.len = 0;
188        new_list
189    }
190
191    /// Push a message to the back of the list
192    pub fn push_back(&mut self, message: Message) {
193        self.list.push_back(message.msg_and_id);
194        self.len += 1;
195    }
196
197    /// Pop a message from the front of the list
198    pub fn pop_front(&mut self) -> Option<Message> {
199        self.list.pop_front().map(|msg_and_id| {
200            self.len -= 1;
201            Message { msg_and_id }
202        })
203    }
204
205    /// Pop a message from the back of the list
206    pub fn pop_back(&mut self) -> Option<Message> {
207        self.list.pop_back().map(|msg_and_id| {
208            self.len -= 1;
209            Message { msg_and_id }
210        })
211    }
212
213    /// Pop all of the messages from this list into a vector
214    /// of messages.
215    /// Memory usage is O(number-of-messages).
216    pub fn drain(&mut self) -> Vec<Message> {
217        let mut messages = Vec::with_capacity(self.len);
218        while let Some(msg) = self.pop_front() {
219            messages.push(msg);
220        }
221        messages
222    }
223
224    pub fn extend_from_iter<I>(&mut self, iter: I)
225    where
226        I: Iterator<Item = Message>,
227    {
228        for msg in iter {
229            self.push_back(msg)
230        }
231    }
232}
233
234impl IntoIterator for MessageList {
235    type Item = Message;
236    type IntoIter = MessageListIter;
237    fn into_iter(self) -> MessageListIter {
238        MessageListIter { list: self.list }
239    }
240}
241
242pub struct MessageListIter {
243    list: LinkedList<MessageWithIdAdapter>,
244}
245
246impl Iterator for MessageListIter {
247    type Item = Message;
248    fn next(&mut self) -> Option<Message> {
249        self.list
250            .pop_front()
251            .map(|msg_and_id| Message { msg_and_id })
252    }
253}
254
255#[derive(Clone, Debug)]
256#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
257pub struct Message {
258    pub(crate) msg_and_id: Arc<MessageWithId>,
259}
260
261impl PartialEq for Message {
262    fn eq(&self, other: &Self) -> bool {
263        self.id() == other.id()
264    }
265}
266impl Eq for Message {}
267
268impl Hash for Message {
269    fn hash<H>(&self, hasher: &mut H)
270    where
271        H: std::hash::Hasher,
272    {
273        self.id().hash(hasher)
274    }
275}
276
277#[derive(Clone, Debug)]
278pub struct WeakMessage {
279    weak: Weak<MessageWithId>,
280}
281
282impl WeakMessage {
283    pub fn upgrade(&self) -> Option<Message> {
284        Some(Message {
285            msg_and_id: self.weak.upgrade()?,
286        })
287    }
288}
289
290#[serde_as]
291#[derive(Debug, Serialize, Deserialize, Clone)]
292pub(crate) struct MetaData {
293    pub sender: EnvelopeAddress,
294    #[serde_as(as = "OneOrMany<_, PreferOne>")]
295    pub recipient: Vec<EnvelopeAddress>,
296    pub meta: serde_json::Value,
297    #[serde(default)]
298    pub schedule: Option<Scheduling>,
299}
300
301impl Drop for MessageInner {
302    fn drop(&mut self) {
303        if self.metadata.is_some() {
304            META_COUNT.dec();
305        }
306        if !self.data.is_empty() {
307            DATA_COUNT.dec();
308        }
309        MESSAGE_COUNT.dec();
310    }
311}
312
313impl Message {
314    /// Create a new message with the supplied data.
315    /// The message meta and data are marked as dirty
316    pub fn new_dirty(
317        id: SpoolId,
318        sender: EnvelopeAddress,
319        recipient: Vec<EnvelopeAddress>,
320        meta: serde_json::Value,
321        data: Arc<Box<[u8]>>,
322    ) -> anyhow::Result<Self> {
323        anyhow::ensure!(meta.is_object(), "metadata must be a json object");
324        MESSAGE_COUNT.inc();
325        DATA_COUNT.inc();
326        META_COUNT.inc();
327        Ok(Self {
328            msg_and_id: Arc::new(MessageWithId {
329                id,
330                inner: Mutex::new(MessageInner {
331                    metadata: Some(Box::new(MetaData {
332                        sender,
333                        recipient,
334                        meta,
335                        schedule: None,
336                    })),
337                    data,
338                    flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
339                    num_attempts: 0,
340                    due: None,
341                }),
342                link: LinkedListAtomicLink::default(),
343            }),
344        })
345    }
346
347    pub fn weak(&self) -> WeakMessage {
348        WeakMessage {
349            weak: Arc::downgrade(&self.msg_and_id),
350        }
351    }
352
353    /// Helper for creating a message based on spool enumeration.
354    /// Given a spool id and the serialized metadata blob, returns
355    /// a message holding the deserialized version of that metadata.
356    pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
357        let metadata: MetaData = serde_json::from_slice(&metadata)?;
358        MESSAGE_COUNT.inc();
359        META_COUNT.inc();
360
361        let flags = if metadata.schedule.is_some() {
362            MessageFlags::SCHEDULED
363        } else {
364            MessageFlags::empty()
365        };
366
367        Ok(Self {
368            msg_and_id: Arc::new(MessageWithId {
369                id,
370                inner: Mutex::new(MessageInner {
371                    metadata: Some(Box::new(metadata)),
372                    data: NO_DATA.clone(),
373                    flags,
374                    num_attempts: 0,
375                    due: None,
376                }),
377                link: LinkedListAtomicLink::default(),
378            }),
379        })
380    }
381
382    pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
383        MESSAGE_COUNT.inc();
384        META_COUNT.inc();
385
386        let flags = if metadata.schedule.is_some() {
387            MessageFlags::SCHEDULED
388        } else {
389            MessageFlags::empty()
390        };
391
392        Self {
393            msg_and_id: Arc::new(MessageWithId {
394                id,
395                inner: Mutex::new(MessageInner {
396                    metadata: Some(Box::new(metadata)),
397                    data,
398                    flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
399                    num_attempts: 0,
400                    due: None,
401                }),
402                link: LinkedListAtomicLink::default(),
403            }),
404        }
405    }
406
407    pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
408        let meta_spool = get_meta_spool();
409        let data = meta_spool.load(id).await?;
410        Self::new_from_spool(id, data)
411    }
412
413    pub fn get_num_attempts(&self) -> u16 {
414        let inner = self.msg_and_id.inner.lock();
415        inner.num_attempts
416    }
417
418    pub fn set_num_attempts(&self, num_attempts: u16) {
419        let mut inner = self.msg_and_id.inner.lock();
420        inner.num_attempts = num_attempts;
421    }
422
423    pub fn increment_num_attempts(&self) {
424        let mut inner = self.msg_and_id.inner.lock();
425        inner.num_attempts += 1;
426    }
427
428    pub async fn set_scheduling(
429        &self,
430        scheduling: Option<Scheduling>,
431    ) -> anyhow::Result<Option<Scheduling>> {
432        self.load_meta_if_needed().await?;
433        let mut inner = self.msg_and_id.inner.lock();
434        match &mut inner.metadata {
435            None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
436            Some(meta) => {
437                meta.schedule = scheduling;
438                inner
439                    .flags
440                    .set(MessageFlags::SCHEDULED, scheduling.is_some());
441                if let Some(sched) = scheduling {
442                    let due = inner.due.unwrap_or_else(Utc::now);
443                    inner.due = Some(sched.adjust_for_schedule(due));
444                }
445                Ok(scheduling)
446            }
447        }
448    }
449
450    pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
451        self.load_meta_if_needed().await?;
452        let inner = self.msg_and_id.inner.lock();
453        Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
454    }
455
456    pub fn get_due(&self) -> Option<DateTime<Utc>> {
457        let inner = self.msg_and_id.inner.lock();
458        inner.due
459    }
460
461    pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
462        let scale = rand::random::<f32>();
463        let value = (scale * limit as f32) as i64;
464        self.delay_by(seconds(value)?).await
465    }
466
467    pub async fn delay_by(
468        &self,
469        duration: chrono::Duration,
470    ) -> anyhow::Result<Option<DateTime<Utc>>> {
471        let due = Utc::now() + duration;
472        self.set_due(Some(due)).await
473    }
474
475    /// Delay by requested duration, and add up to 1 minute of jitter
476    pub async fn delay_by_and_jitter(
477        &self,
478        duration: chrono::Duration,
479    ) -> anyhow::Result<Option<DateTime<Utc>>> {
480        let scale = rand::random::<f32>();
481        let value = (scale * 60.) as i64;
482        let due = Utc::now() + duration + seconds(value)?;
483        self.set_due(Some(due)).await
484    }
485
486    pub async fn set_due(
487        &self,
488        due: Option<DateTime<Utc>>,
489    ) -> anyhow::Result<Option<DateTime<Utc>>> {
490        let due = {
491            let mut inner = self.msg_and_id.inner.lock();
492
493            if !inner.flags.contains(MessageFlags::SCHEDULED) {
494                // This is the simple, fast-path, common case
495                inner.due = due;
496                return Ok(inner.due);
497            }
498
499            let due = due.unwrap_or_else(Utc::now);
500
501            if let Some(meta) = &inner.metadata {
502                inner.due = match &meta.schedule {
503                    Some(sched) => Some(sched.adjust_for_schedule(due)),
504                    None => Some(due),
505                };
506                return Ok(inner.due);
507            }
508
509            // We'll need to load the metadata to correctly
510            // update the schedule for this message
511            due
512        };
513
514        self.load_meta().await?;
515
516        {
517            let mut inner = self.msg_and_id.inner.lock();
518            match &inner.metadata {
519                Some(meta) => {
520                    inner.due = match &meta.schedule {
521                        Some(sched) => Some(sched.adjust_for_schedule(due)),
522                        None => Some(due),
523                    };
524                    Ok(inner.due)
525                }
526                None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
527            }
528        }
529    }
530
531    fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
532        let inner = self.msg_and_id.inner.lock();
533        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
534            Some(Arc::clone(&inner.data))
535        } else {
536            None
537        }
538    }
539
540    fn get_meta_if_dirty(&self) -> Option<MetaData> {
541        let inner = self.msg_and_id.inner.lock();
542        if inner.flags.contains(MessageFlags::META_DIRTY) {
543            inner.metadata.as_ref().map(|md| (**md).clone())
544        } else {
545            None
546        }
547    }
548
549    pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
550        self.load_meta_if_needed().await?;
551        let inner = self.msg_and_id.inner.lock();
552        inner
553            .metadata
554            .as_ref()
555            .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
556            .map(|md| (**md).clone())
557    }
558
559    pub fn set_force_sync(&self, force: bool) {
560        let mut inner = self.msg_and_id.inner.lock();
561        inner.flags.set(MessageFlags::FORCE_SYNC, force);
562    }
563
564    pub fn needs_save(&self) -> bool {
565        let inner = self.msg_and_id.inner.lock();
566        inner.flags.contains(MessageFlags::META_DIRTY)
567            || inner.flags.contains(MessageFlags::DATA_DIRTY)
568    }
569
570    pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
571        let _timer = SAVE_HIST.start_timer();
572        self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
573            .await
574    }
575
576    pub async fn save_to(
577        &self,
578        meta_spool: &(dyn Spool + Send + Sync),
579        data_spool: &(dyn Spool + Send + Sync),
580        deadline: Option<Instant>,
581    ) -> anyhow::Result<()> {
582        let force_sync = self
583            .msg_and_id
584            .inner
585            .lock()
586            .flags
587            .contains(MessageFlags::FORCE_SYNC);
588
589        let data_fut: futures::future::BoxFuture<'_, anyhow::Result<bool>> =
590            if let Some(data) = self.get_data_if_dirty() {
591                anyhow::ensure!(!data.is_empty(), "message data must not be empty");
592                data_spool
593                    .store(self.msg_and_id.id, data, force_sync, deadline)
594                    .map_ok(|_| true)
595                    .boxed()
596            } else {
597                futures::future::ready(Ok(false)).boxed()
598            };
599        let meta_fut: futures::future::BoxFuture<'_, anyhow::Result<bool>> =
600            if let Some(meta) = self.get_meta_if_dirty() {
601                let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
602                meta_spool
603                    .store(self.msg_and_id.id, meta, force_sync, deadline)
604                    .map_ok(|_| true)
605                    .boxed()
606            } else {
607                futures::future::ready(Ok(false)).boxed()
608            };
609
610        // NOTE: if we have a deadline, it is tempting to want to use
611        // timeout_at here to enforce it, but the underlying spool
612        // futures are not guaranteed to be fully cancel safe, which
613        // is why we pass the deadline down to the save method to allow
614        // them to handle timeouts internally.
615        let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
616
617        // Clear the dirty flag for each spool that successfully
618        // stored (Ok(true)).  Even when the other spool failed, we
619        // must still record the partial success: the caller may
620        // unwind by removing from both spools, and the remove on a
621        // spool that did not receive the data is harmless.
622        if matches!(data_res, Ok(true)) {
623            self.msg_and_id
624                .inner
625                .lock()
626                .flags
627                .remove(MessageFlags::DATA_DIRTY);
628        }
629        if matches!(meta_res, Ok(true)) {
630            self.msg_and_id
631                .inner
632                .lock()
633                .flags
634                .remove(MessageFlags::META_DIRTY);
635        }
636
637        // Propagate spool errors so callers (notably the SMTP
638        // ingress path and load-shedding-aware code) can react
639        // rather than silently accepting a message whose persistent
640        // storage failed.
641        //
642        // For the both-failed case we preserve the typed
643        // SpoolUnhealthyError on the returned error chain when
644        // either side has one, so callers that match on
645        // root_cause (e.g. the SMTP server's user-facing 421
646        // shaping) still see the actionable reason.  The
647        // non-preferred error is folded into the context message so
648        // it is still visible to anything that formats the error
649        // with `:#`.
650        match (data_res, meta_res) {
651            (Ok(_), Ok(_)) => Ok(()),
652            (Err(data_err), Ok(_)) => Err(data_err.context("data spool store")),
653            (Ok(_), Err(meta_err)) => Err(meta_err.context("meta spool store")),
654            (Err(data_err), Err(meta_err)) => {
655                let data_unhealthy = data_err.root_cause().is::<::spool::SpoolUnhealthyError>();
656                let meta_unhealthy = meta_err.root_cause().is::<::spool::SpoolUnhealthyError>();
657                match (data_unhealthy, meta_unhealthy) {
658                    (true, _) => Err(data_err.context(format!(
659                        "data spool store; meta spool store also failed: {meta_err:#}"
660                    ))),
661                    (false, true) => Err(meta_err.context(format!(
662                        "meta spool store; data spool store also failed: {data_err:#}"
663                    ))),
664                    (false, false) => Err(anyhow::anyhow!(
665                        "data spool store: {data_err:#}; meta spool store: {meta_err:#}"
666                    )),
667                }
668            }
669        }
670    }
671
672    pub fn id(&self) -> &SpoolId {
673        &self.msg_and_id.id
674    }
675
676    /// Save the data+meta if needed, then release both
677    pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
678        self.save(None).await?;
679        self.shrink()
680    }
681
682    /// Save the data+meta if needed, then release just the data
683    pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
684        self.save(None).await?;
685        self.shrink_data()
686    }
687
688    pub fn shrink_data(&self) -> anyhow::Result<bool> {
689        let mut inner = self.msg_and_id.inner.lock();
690        let mut did_shrink = false;
691        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
692            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
693        }
694        if !inner.data.is_empty() {
695            DATA_COUNT.dec();
696            did_shrink = true;
697        }
698        if !inner.data.is_empty() {
699            inner.data = NO_DATA.clone();
700            did_shrink = true;
701        }
702        Ok(did_shrink)
703    }
704
705    pub fn shrink(&self) -> anyhow::Result<bool> {
706        let mut inner = self.msg_and_id.inner.lock();
707        let mut did_shrink = false;
708        if inner.flags.contains(MessageFlags::DATA_DIRTY) {
709            anyhow::bail!("Cannot shrink message: DATA_DIRTY");
710        }
711        if inner.flags.contains(MessageFlags::META_DIRTY) {
712            anyhow::bail!("Cannot shrink message: META_DIRTY");
713        }
714        if inner.metadata.take().is_some() {
715            META_COUNT.dec();
716            did_shrink = true;
717        }
718        if !inner.data.is_empty() {
719            DATA_COUNT.dec();
720            did_shrink = true;
721        }
722        if !inner.data.is_empty() {
723            inner.data = NO_DATA.clone();
724            did_shrink = true;
725        }
726        Ok(did_shrink)
727    }
728
729    pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
730        self.load_meta_if_needed().await?;
731        let inner = self.msg_and_id.inner.lock();
732        match &inner.metadata {
733            Some(meta) => Ok(meta.sender.clone()),
734            None => anyhow::bail!("Message::sender metadata is not loaded"),
735        }
736    }
737
738    pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
739        self.load_meta_if_needed().await?;
740        let mut inner = self.msg_and_id.inner.lock();
741        match &mut inner.metadata {
742            Some(meta) => {
743                meta.sender = sender;
744                inner.flags.set(MessageFlags::DATA_DIRTY, true);
745                Ok(())
746            }
747            None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
748        }
749    }
750
751    #[deprecated = "use recipient_list or first_recipient instead"]
752    pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
753        self.first_recipient().await
754    }
755
756    pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
757        self.load_meta_if_needed().await?;
758        let inner = self.msg_and_id.inner.lock();
759        match &inner.metadata {
760            Some(meta) => match meta.recipient.first() {
761                Some(recip) => Ok(recip.clone()),
762                None => anyhow::bail!("recipient list is empty!?"),
763            },
764            None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
765        }
766    }
767
768    pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
769        self.load_meta_if_needed().await?;
770        let inner = self.msg_and_id.inner.lock();
771        match &inner.metadata {
772            Some(meta) => Ok(meta.recipient.clone()),
773            None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
774        }
775    }
776
777    pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
778        self.load_meta_if_needed().await?;
779        let inner = self.msg_and_id.inner.lock();
780        match &inner.metadata {
781            Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
782            None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
783        }
784    }
785
786    #[deprecated = "use set_recipient_list instead"]
787    pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
788        self.set_recipient_list(vec![recipient]).await
789    }
790
791    pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
792        self.load_meta_if_needed().await?;
793
794        let mut inner = self.msg_and_id.inner.lock();
795        match &mut inner.metadata {
796            Some(meta) => {
797                meta.recipient = recipient;
798                inner.flags.set(MessageFlags::DATA_DIRTY, true);
799                Ok(())
800            }
801            None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
802        }
803    }
804
805    pub fn is_meta_loaded(&self) -> bool {
806        self.msg_and_id.inner.lock().metadata.is_some()
807    }
808
809    pub fn is_data_loaded(&self) -> bool {
810        !self.msg_and_id.inner.lock().data.is_empty()
811    }
812
813    pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
814        if self.is_meta_loaded() {
815            return Ok(());
816        }
817        self.load_meta().await
818    }
819
820    pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
821        self.load_data_if_needed().await
822    }
823
824    async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
825        if self.is_data_loaded() {
826            return Ok(self.get_data_maybe_not_loaded());
827        }
828        self.load_data().await
829    }
830
831    pub async fn load_meta(&self) -> anyhow::Result<()> {
832        let _timer = LOAD_META_HIST.start_timer();
833        self.load_meta_from(&**get_meta_spool()).await
834    }
835
836    async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
837        let id = self.id();
838        let data = meta_spool.load(*id).await?;
839        let mut inner = self.msg_and_id.inner.lock();
840        let was_not_loaded = inner.metadata.is_none();
841        let metadata: MetaData = serde_json::from_slice(&data)?;
842        inner.metadata.replace(Box::new(metadata));
843        if was_not_loaded {
844            META_COUNT.inc();
845        }
846        Ok(())
847    }
848
849    pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
850        let _timer = LOAD_DATA_HIST.start_timer();
851        self.load_data_from(&**get_data_spool()).await
852    }
853
854    async fn load_data_from(
855        &self,
856        data_spool: &(dyn Spool + Send + Sync),
857    ) -> anyhow::Result<Arc<Box<[u8]>>> {
858        let data = data_spool.load(*self.id()).await?;
859        let mut inner = self.msg_and_id.inner.lock();
860        let was_empty = inner.data.is_empty();
861        inner.data = Arc::new(data.into_boxed_slice());
862        if was_empty {
863            DATA_COUNT.inc();
864        }
865        Ok(inner.data.clone())
866    }
867
868    pub fn assign_data(&self, data: Vec<u8>) {
869        let mut inner = self.msg_and_id.inner.lock();
870        let was_empty = inner.data.is_empty();
871        inner.data = Arc::new(data.into_boxed_slice());
872        inner.flags.set(MessageFlags::DATA_DIRTY, true);
873        if was_empty {
874            DATA_COUNT.inc();
875        }
876    }
877
878    pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
879        let inner = self.msg_and_id.inner.lock();
880        inner.data.clone()
881    }
882
883    pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
884        &self,
885        key: S,
886        value: V,
887    ) -> anyhow::Result<()> {
888        self.load_meta_if_needed().await?;
889        let mut inner = self.msg_and_id.inner.lock();
890        match &mut inner.metadata {
891            None => anyhow::bail!("set_meta: metadata must be loaded first"),
892            Some(meta) => {
893                let key = key.as_ref();
894                let value = value.into();
895
896                match &mut meta.meta {
897                    serde_json::Value::Object(map) => {
898                        map.insert(key.to_string(), value);
899                    }
900                    _ => anyhow::bail!("metadata is somehow not a json object"),
901                }
902
903                inner.flags.set(MessageFlags::META_DIRTY, true);
904                Ok(())
905            }
906        }
907    }
908
909    pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
910        self.load_meta_if_needed().await?;
911        let mut inner = self.msg_and_id.inner.lock();
912        match &mut inner.metadata {
913            None => anyhow::bail!("set_meta: metadata must be loaded first"),
914            Some(meta) => {
915                let key = key.as_ref();
916
917                match &mut meta.meta {
918                    serde_json::Value::Object(map) => {
919                        map.remove(key);
920                    }
921                    _ => anyhow::bail!("metadata is somehow not a json object"),
922                }
923
924                inner.flags.set(MessageFlags::META_DIRTY, true);
925                Ok(())
926            }
927        }
928    }
929
930    /// Retrieve `key` as a String.
931    pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
932        &self,
933        key: S,
934    ) -> anyhow::Result<Option<String>> {
935        match self.get_meta(key).await {
936            Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
937            Ok(serde_json::Value::Null) => Ok(None),
938            hmm => {
939                anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
940            }
941        }
942    }
943
944    pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
945        self.load_meta_if_needed().await?;
946        let inner = self.msg_and_id.inner.lock();
947        match &inner.metadata {
948            None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
949            Some(meta) => Ok(meta.meta.clone()),
950        }
951    }
952
953    pub async fn get_meta<S: serde_json::value::Index>(
954        &self,
955        key: S,
956    ) -> anyhow::Result<serde_json::Value> {
957        self.load_meta_if_needed().await?;
958        let inner = self.msg_and_id.inner.lock();
959        match &inner.metadata {
960            None => anyhow::bail!("get_meta: metadata must be loaded first"),
961            Some(meta) => match meta.meta.get(key) {
962                Some(value) => Ok(value.clone()),
963                None => Ok(serde_json::Value::Null),
964            },
965        }
966    }
967
968    pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
969        self.msg_and_id.id.age(now)
970    }
971
972    pub async fn get_queue_name(&self) -> anyhow::Result<String> {
973        Ok(match self.get_meta_string("queue").await? {
974            Some(name) => name,
975            None => {
976                let name = QueueNameComponents::format(
977                    self.get_meta_string("campaign").await?,
978                    self.get_meta_string("tenant").await?,
979                    self.first_recipient()
980                        .await?
981                        .domain()
982                        .to_string()
983                        .to_lowercase(),
984                    self.get_meta_string("routing_domain").await?,
985                );
986                name.to_string()
987            }
988        })
989    }
990
991    #[cfg(feature = "impl")]
992    pub async fn arc_verify(
993        &self,
994        opt_resolver_name: Option<String>,
995    ) -> anyhow::Result<AuthenticationResult> {
996        let resolver = get_resolver_instance(&opt_resolver_name)?;
997        let data = self.data().await?;
998        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
999
1000        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1001        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1002
1003        let arc = ARC::verify(&message, &**resolver).await;
1004        Ok(arc.authentication_result())
1005    }
1006
1007    #[cfg(feature = "impl")]
1008    pub async fn arc_seal(
1009        &self,
1010        signer: Signer,
1011        auth_results: AuthenticationResults,
1012        opt_resolver_name: Option<String>,
1013    ) -> anyhow::Result<()> {
1014        let resolver = get_resolver_instance(&opt_resolver_name)?;
1015        let data = self.data().await?;
1016        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1017        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1018        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1019        let arc = ARC::verify(&message, &**resolver).await;
1020
1021        let headers = arc.seal(&message, auth_results, signer.signer())?;
1022        if !headers.is_empty() {
1023            let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
1024
1025            for hdr in headers {
1026                hdr.write_header(&mut new_data).ok();
1027            }
1028            new_data.extend_from_slice(&data);
1029            self.assign_data(new_data);
1030        }
1031
1032        Ok(())
1033    }
1034
1035    #[cfg(feature = "impl")]
1036    pub async fn dkim_verify(
1037        &self,
1038        opt_resolver_name: Option<String>,
1039    ) -> anyhow::Result<Vec<AuthenticationResult>> {
1040        let resolver = get_resolver_instance(&opt_resolver_name)?;
1041        let data = self.data().await?;
1042        let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1043
1044        let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1045        if parsed
1046            .overall_conformance
1047            .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1048        {
1049            return Ok(vec![AuthenticationResult {
1050                method: "dkim".into(),
1051                method_version: None,
1052                result: "permerror".into(),
1053                reason: Some("message has non-canonical line endings".into()),
1054                props: Default::default(),
1055            }]);
1056        }
1057        let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1058
1059        let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1060        Ok(results)
1061    }
1062
1063    /// Parses the content into an owned MimePart.
1064    /// Changes to that MimePart are NOT reflected in the underlying
1065    /// message; you must re-assign the message data if you wish to modify
1066    /// the message content.
1067    pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1068        let data = self.data().await?;
1069        let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1070        Ok(MimePart::parse(owned_data)?)
1071    }
1072
1073    pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1074        let data = self.data().await?;
1075        Report::parse(&data)
1076    }
1077
1078    pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1079        let data = self.data().await?;
1080        ARFReport::parse(&data)
1081    }
1082
1083    pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1084        let data = self.data().await?;
1085        let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1086        emit_header(&mut new_data, name, value);
1087        new_data.extend_from_slice(&data);
1088        self.assign_data(new_data);
1089        Ok(())
1090    }
1091
1092    pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1093        let data = self.data().await?;
1094        let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1095        for (idx, window) in data.windows(4).enumerate() {
1096            if window == b"\r\n\r\n" {
1097                let headers = &data[0..idx + 2];
1098                let body = &data[idx + 2..];
1099
1100                new_data.extend_from_slice(headers);
1101                emit_header(&mut new_data, name, value);
1102                new_data.extend_from_slice(body);
1103                self.assign_data(new_data);
1104                return Ok(());
1105            }
1106        }
1107
1108        anyhow::bail!("append_header could not find the end of the header block");
1109    }
1110
1111    pub async fn get_address_header(
1112        &self,
1113        header_name: &str,
1114    ) -> anyhow::Result<Option<HeaderAddressList>> {
1115        let data = self.data().await?;
1116        let HeaderParseResult { headers, .. } =
1117            mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1118
1119        match headers.get_first(header_name) {
1120            Some(hdr) => {
1121                let list = hdr.as_address_list()?;
1122                let result: HeaderAddressList = list.into();
1123                Ok(Some(result))
1124            }
1125            None => Ok(None),
1126        }
1127    }
1128
1129    pub async fn get_first_named_header_value(
1130        &self,
1131        name: &str,
1132    ) -> anyhow::Result<Option<BString>> {
1133        let data = self.data().await?;
1134        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1135
1136        match headers.get_first(name) {
1137            Some(hdr) => Ok(Some(
1138                hdr.as_unstructured()
1139                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1140            )),
1141            None => Ok(None),
1142        }
1143    }
1144
1145    pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1146        let data = self.data().await?;
1147        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1148
1149        let mut values = vec![];
1150        for hdr in headers.iter_named(name) {
1151            values.push(
1152                hdr.as_unstructured()
1153                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1154            );
1155        }
1156        Ok(values)
1157    }
1158
1159    pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1160        let data = self.data().await?;
1161        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1162
1163        let mut values = vec![];
1164        for hdr in headers.iter() {
1165            values.push((
1166                hdr.get_name().into(),
1167                hdr.as_unstructured()
1168                    .unwrap_or_else(|_| hdr.get_raw_value().into()),
1169            ));
1170        }
1171        Ok(values)
1172    }
1173
1174    pub async fn retain_headers<F: FnMut(usize, &Header) -> bool>(
1175        &self,
1176        mut func: F,
1177    ) -> anyhow::Result<()> {
1178        let data = self.data().await?;
1179        let mut new_data = Vec::with_capacity(data.len());
1180        let HeaderParseResult {
1181            headers,
1182            body_offset,
1183            ..
1184        } = Header::parse_headers(data.as_ref().as_ref())?;
1185        for (idx, hdr) in headers.iter().enumerate() {
1186            let retain = (func)(idx, hdr);
1187            if !retain {
1188                continue;
1189            }
1190            hdr.write_header(&mut new_data)?;
1191        }
1192        new_data.extend_from_slice(b"\r\n");
1193        new_data.extend_from_slice(&data[body_offset..]);
1194        self.assign_data(new_data);
1195        Ok(())
1196    }
1197
1198    pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1199        let mut removed = false;
1200        self.retain_headers(|_, hdr| {
1201            if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1202                removed = true;
1203                false
1204            } else {
1205                true
1206            }
1207        })
1208        .await
1209    }
1210
1211    pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1212        let specs = if names.is_empty() {
1213            vec![ImportHeaderSpec {
1214                name: "X-*".to_string(),
1215                ..ImportHeaderSpec::default()
1216            }]
1217        } else {
1218            names
1219                .into_iter()
1220                .map(|name| ImportHeaderSpec {
1221                    name,
1222                    ..ImportHeaderSpec::default()
1223                })
1224                .collect()
1225        };
1226        self.import_headers(specs).await
1227    }
1228
1229    pub async fn import_headers(&self, specs: Vec<ImportHeaderSpec>) -> anyhow::Result<()> {
1230        let compiled: Vec<CompiledImportHeaderSpec> = specs
1231            .into_iter()
1232            .map(CompiledImportHeaderSpec::compile)
1233            .collect::<anyhow::Result<_>>()?;
1234
1235        let data = self.data().await?;
1236        let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1237
1238        let mut accumulators: Vec<PerSpecAccumulator> = compiled
1239            .iter()
1240            .map(|s| PerSpecAccumulator::new(s.match_mode))
1241            .collect();
1242        let mut indices_to_remove: HashSet<usize> = HashSet::new();
1243
1244        for (idx, hdr) in headers.iter().enumerate() {
1245            let hdr_name = hdr.get_name();
1246            for (spec_idx, spec) in compiled.iter().enumerate() {
1247                if spec.matches(hdr_name) {
1248                    let key = spec.target_key(&hdr.get_name_lossy());
1249                    let value = hdr.as_unstructured()?.to_str_lossy().to_string();
1250                    accumulators[spec_idx].record(key, value);
1251                    if spec.remove {
1252                        indices_to_remove.insert(idx);
1253                    }
1254                    break;
1255                }
1256            }
1257        }
1258
1259        for acc in accumulators {
1260            acc.write_to(self).await?;
1261        }
1262
1263        if !indices_to_remove.is_empty() {
1264            self.retain_headers(|idx, _| !indices_to_remove.contains(&idx))
1265                .await?;
1266        }
1267
1268        Ok(())
1269    }
1270
1271    pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1272        self.retain_headers(|_, hdr| {
1273            if names.is_empty() {
1274                !is_x_header(hdr.get_name())
1275            } else {
1276                !names
1277                    .iter()
1278                    .any(|n| hdr.get_name().eq_ignore_ascii_case(n.as_bytes()))
1279            }
1280        })
1281        .await
1282    }
1283
1284    pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1285        self.retain_headers(|_, hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1286            .await
1287    }
1288
1289    #[cfg(feature = "impl")]
1290    pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1291        let data = self.data().await?;
1292        let header = if let Some(runtime) = SIGN_POOL.get() {
1293            runtime.spawn_blocking(move || signer.sign(&data)).await??
1294        } else {
1295            signer.sign(&data)?
1296        };
1297        self.prepend_header(None, header.as_bytes()).await?;
1298        Ok(())
1299    }
1300
1301    pub async fn import_scheduling_header(
1302        &self,
1303        header_name: &str,
1304        remove: bool,
1305    ) -> anyhow::Result<Option<Scheduling>> {
1306        if let Some(value) = self.get_first_named_header_value(header_name).await? {
1307            let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1308                format!("{value} from header {header_name} is not a valid Scheduling header")
1309            })?;
1310            let result = self.set_scheduling(Some(sched)).await?;
1311
1312            if remove {
1313                self.remove_all_named_headers(header_name).await?;
1314            }
1315
1316            Ok(result)
1317        } else {
1318            Ok(None)
1319        }
1320    }
1321
1322    pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1323        let data = self.data().await?;
1324        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1325        let parts = msg.simplified_structure_pointers()?;
1326        if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1327            match p.body()? {
1328                DecodedBody::Text(text) => {
1329                    let mut text = text.as_bytes().to_vec();
1330                    text.push_str("\r\n");
1331                    text.push_str(content);
1332                    p.replace_text_body("text/plain", &*text)?;
1333
1334                    let new_data = msg.to_message_bytes();
1335                    self.assign_data(new_data);
1336                    Ok(true)
1337                }
1338                DecodedBody::Binary(_) => {
1339                    anyhow::bail!("expected text/plain part to be text, but it is binary");
1340                }
1341            }
1342        } else {
1343            Ok(false)
1344        }
1345    }
1346
1347    pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1348        let data = self.data().await?;
1349        let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1350        let parts = msg.simplified_structure_pointers()?;
1351        if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1352            match p.body()? {
1353                DecodedBody::Text(text) => {
1354                    let mut text = text.as_bytes().to_vec();
1355
1356                    match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1357                        Some(idx) => {
1358                            text.insert_str(idx, content);
1359                            text.insert_str(idx, "\r\n");
1360                        }
1361                        None => {
1362                            // Just append
1363                            text.push_str("\r\n");
1364                            text.push_str(content);
1365                        }
1366                    }
1367
1368                    p.replace_text_body("text/html", &*text)?;
1369
1370                    let new_data = msg.to_message_bytes();
1371                    self.assign_data(new_data);
1372                    Ok(true)
1373                }
1374                DecodedBody::Binary(_) => {
1375                    anyhow::bail!("expected text/html part to be text, but it is binary");
1376                }
1377            }
1378        } else {
1379            Ok(false)
1380        }
1381    }
1382
1383    pub async fn check_fix_conformance(
1384        &self,
1385        check: MessageConformance,
1386        fix: MessageConformance,
1387        settings: Option<&CheckFixSettings>,
1388    ) -> anyhow::Result<()> {
1389        let data = self.data().await?;
1390        let data_bytes = data.as_ref().as_ref();
1391        let msg = MimePart::parse(data_bytes)?;
1392
1393        let mut settings = settings.map(Clone::clone).unwrap_or_default();
1394
1395        if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1396            && settings.message_id.is_none()
1397            && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1398        {
1399            let sender = self.sender().await?;
1400            let domain = sender.domain();
1401            let id = *self.id();
1402            settings.message_id.replace(format!("{id}@{domain}"));
1403        }
1404
1405        if settings.detect_encoding {
1406            settings.data_bytes.replace(data.clone());
1407        }
1408
1409        let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1410
1411        if let Some(msg) = opt_msg {
1412            let new_data = msg.to_message_bytes();
1413            self.assign_data(new_data);
1414        }
1415
1416        Ok(())
1417    }
1418}
1419
1420fn imported_header_name(name: &str) -> String {
1421    name.chars()
1422        .map(|c| match c.to_ascii_lowercase() {
1423            '-' => '_',
1424            c => c,
1425        })
1426        .collect()
1427}
1428
1429fn is_x_header(name: &[u8]) -> bool {
1430    name.starts_with_str("X-") || name.starts_with_str("x-")
1431}
1432
1433#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1434#[serde(rename_all = "snake_case")]
1435pub enum MatchMode {
1436    First,
1437    #[default]
1438    Last,
1439    All,
1440}
1441
1442#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1443#[serde(rename_all = "snake_case")]
1444pub enum NameTransform {
1445    #[default]
1446    SnakeCase,
1447    KebabCase,
1448    CamelCase,
1449    PascalCase,
1450}
1451
1452#[derive(Debug, Clone, Deserialize)]
1453#[serde(deny_unknown_fields)]
1454pub struct ImportHeaderSpec {
1455    pub name: String,
1456    #[serde(default, rename = "match")]
1457    pub match_mode: MatchMode,
1458    #[serde(default)]
1459    pub transform: NameTransform,
1460    #[serde(default)]
1461    pub target: Option<String>,
1462    #[serde(default)]
1463    pub remove: bool,
1464}
1465
1466impl Default for ImportHeaderSpec {
1467    fn default() -> Self {
1468        Self {
1469            name: String::new(),
1470            match_mode: MatchMode::default(),
1471            transform: NameTransform::default(),
1472            target: None,
1473            remove: false,
1474        }
1475    }
1476}
1477
1478#[derive(Debug, Clone)]
1479enum HeaderPattern {
1480    /// Case-insensitive exact match.
1481    Exact(String),
1482    /// Matches any header whose name (case-insensitively) starts with the
1483    /// given prefix. Built from a pattern like `X-*`.
1484    Prefix(String),
1485}
1486
1487#[derive(Debug, Clone)]
1488struct CompiledImportHeaderSpec {
1489    pattern: HeaderPattern,
1490    match_mode: MatchMode,
1491    transform: NameTransform,
1492    target: Option<String>,
1493    remove: bool,
1494}
1495
1496impl CompiledImportHeaderSpec {
1497    fn compile(spec: ImportHeaderSpec) -> anyhow::Result<Self> {
1498        let pattern = compile_header_pattern(&spec.name)?;
1499        if spec.target.is_some() && matches!(pattern, HeaderPattern::Prefix(_)) {
1500            anyhow::bail!(
1501                "import_headers: `target` cannot be used with wildcard pattern {:?}",
1502                spec.name
1503            );
1504        }
1505        Ok(Self {
1506            pattern,
1507            match_mode: spec.match_mode,
1508            transform: spec.transform,
1509            target: spec.target,
1510            remove: spec.remove,
1511        })
1512    }
1513
1514    fn matches(&self, hdr_name: &[u8]) -> bool {
1515        match &self.pattern {
1516            HeaderPattern::Exact(s) => hdr_name.eq_ignore_ascii_case(s.as_bytes()),
1517            HeaderPattern::Prefix(p) => {
1518                hdr_name.len() >= p.len() && hdr_name[..p.len()].eq_ignore_ascii_case(p.as_bytes())
1519            }
1520        }
1521    }
1522
1523    fn target_key(&self, matched_name: &str) -> String {
1524        if let Some(target) = &self.target {
1525            return target.clone();
1526        }
1527        apply_name_transform(matched_name, self.transform)
1528    }
1529}
1530
1531fn compile_header_pattern(pat: &str) -> anyhow::Result<HeaderPattern> {
1532    if pat.is_empty() {
1533        anyhow::bail!("import_headers: header name pattern must not be empty");
1534    }
1535    let star_count = pat.chars().filter(|c| *c == '*').count();
1536    if star_count == 0 {
1537        return Ok(HeaderPattern::Exact(pat.to_string()));
1538    }
1539    let Some(prefix) = pat.strip_suffix('*') else {
1540        anyhow::bail!(
1541            "import_headers: only a single trailing `*` is supported in pattern {:?}",
1542            pat
1543        );
1544    };
1545    if star_count > 1 {
1546        anyhow::bail!(
1547            "import_headers: only a single trailing `*` is supported in pattern {:?}",
1548            pat
1549        );
1550    }
1551    if prefix.is_empty() {
1552        anyhow::bail!("import_headers: bare `*` patterns are not supported");
1553    }
1554    Ok(HeaderPattern::Prefix(prefix.to_string()))
1555}
1556
1557fn apply_name_transform(name: &str, transform: NameTransform) -> String {
1558    match transform {
1559        NameTransform::SnakeCase => imported_header_name(name),
1560        NameTransform::KebabCase => name
1561            .split('-')
1562            .map(|p| p.to_ascii_lowercase())
1563            .collect::<Vec<_>>()
1564            .join("-"),
1565        NameTransform::CamelCase => name
1566            .split('-')
1567            .enumerate()
1568            .map(|(i, p)| {
1569                if i == 0 {
1570                    p.to_ascii_lowercase()
1571                } else {
1572                    titlecase_ascii(p)
1573                }
1574            })
1575            .collect::<Vec<_>>()
1576            .join(""),
1577        NameTransform::PascalCase => name
1578            .split('-')
1579            .map(titlecase_ascii)
1580            .collect::<Vec<_>>()
1581            .join(""),
1582    }
1583}
1584
1585fn titlecase_ascii(s: &str) -> String {
1586    let mut chars = s.chars();
1587    match chars.next() {
1588        None => String::new(),
1589        Some(first) => {
1590            let mut out = String::with_capacity(s.len());
1591            out.push(first.to_ascii_uppercase());
1592            for c in chars {
1593                out.push(c.to_ascii_lowercase());
1594            }
1595            out
1596        }
1597    }
1598}
1599
1600enum AccValue {
1601    Str(String),
1602    Arr(Vec<String>),
1603}
1604
1605struct PerSpecAccumulator {
1606    mode: MatchMode,
1607    by_key: BTreeMap<String, AccValue>,
1608}
1609
1610impl PerSpecAccumulator {
1611    fn new(mode: MatchMode) -> Self {
1612        Self {
1613            mode,
1614            by_key: BTreeMap::new(),
1615        }
1616    }
1617
1618    fn record(&mut self, key: String, value: String) {
1619        match self.mode {
1620            MatchMode::First => {
1621                self.by_key.entry(key).or_insert(AccValue::Str(value));
1622            }
1623            MatchMode::Last => {
1624                self.by_key.insert(key, AccValue::Str(value));
1625            }
1626            MatchMode::All => {
1627                let entry = self
1628                    .by_key
1629                    .entry(key)
1630                    .or_insert_with(|| AccValue::Arr(Vec::new()));
1631                if let AccValue::Arr(v) = entry {
1632                    v.push(value);
1633                }
1634            }
1635        }
1636    }
1637
1638    async fn write_to(self, msg: &Message) -> anyhow::Result<()> {
1639        for (key, value) in self.by_key {
1640            match value {
1641                AccValue::Str(s) => msg.set_meta(key, s).await?,
1642                AccValue::Arr(arr) => {
1643                    let json = serde_json::Value::Array(
1644                        arr.into_iter().map(serde_json::Value::String).collect(),
1645                    );
1646                    msg.set_meta(key, json).await?;
1647                }
1648            }
1649        }
1650        Ok(())
1651    }
1652}
1653
1654fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1655    name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1656}
1657
1658fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1659    if let Some(name) = name {
1660        dest.extend_from_slice(name.as_bytes());
1661        dest.extend_from_slice(b": ");
1662    }
1663    dest.extend_from_slice(value);
1664    if !value.ends_with_str("\r\n") {
1665        dest.extend_from_slice(b"\r\n");
1666    }
1667}
1668
1669#[cfg(feature = "impl")]
1670impl UserData for Message {
1671    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1672        methods.add_async_method(
1673            "set_meta",
1674            move |_, this, (name, value): (String, mlua::Value)| async move {
1675                let value = serde_json::value::to_value(value).map_err(any_err)?;
1676                this.set_meta(name, value).await.map_err(any_err)?;
1677                Ok(())
1678            },
1679        );
1680        methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1681            let value = this.get_meta(name).await.map_err(any_err)?;
1682            Ok(Some(lua.to_value_with(&value, serialize_options())?))
1683        });
1684        methods.add_async_method("get_data", |lua, this, _: ()| async move {
1685            let data = this.data().await.map_err(any_err)?;
1686            lua.create_string(&*data)
1687        });
1688        methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1689            this.assign_data(data.as_bytes().to_vec());
1690            Ok(())
1691        });
1692
1693        methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1694            let data = this.data().await.map_err(any_err)?;
1695            let owned_data = BString::new(data.as_ref().to_vec());
1696            let part = MimePart::parse(owned_data).map_err(any_err)?;
1697            Ok(mod_mimepart::PartRef::new(part))
1698        });
1699
1700        methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1701            this.append_text_plain(&data).await.map_err(any_err)
1702        });
1703
1704        methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1705            this.append_text_html(&data).await.map_err(any_err)
1706        });
1707
1708        methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1709        methods.add_async_method("sender", move |_, this, _: ()| async move {
1710            this.sender().await.map_err(any_err)
1711        });
1712
1713        methods.add_method("num_attempts", move |_, this, _: ()| {
1714            Ok(this.get_num_attempts())
1715        });
1716        methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1717            Ok(this.increment_num_attempts())
1718        });
1719
1720        methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1721            this.get_queue_name().await.map_err(any_err)
1722        });
1723
1724        methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1725            let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1726            let revised_due = this.set_due(due).await.map_err(any_err)?;
1727            lua.to_value(&revised_due)
1728        });
1729
1730        methods.add_async_method(
1731            "set_sender",
1732            move |lua, this, value: mlua::Value| async move {
1733                let sender = match value {
1734                    mlua::Value::String(s) => {
1735                        let s = s.to_str()?;
1736                        EnvelopeAddress::parse(&s).map_err(any_err)?
1737                    }
1738                    _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1739                };
1740                this.set_sender(sender).await.map_err(any_err)
1741            },
1742        );
1743
1744        methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1745            let mut recipients = this.recipient_list().await.map_err(any_err)?;
1746            match recipients.len() {
1747                0 => Ok(mlua::Value::Nil),
1748                1 => {
1749                    let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1750                    recip.into_lua(&lua)
1751                }
1752                _ => recipients.into_lua(&lua),
1753            }
1754        });
1755
1756        methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1757            let recipients = this.recipient_list().await.map_err(any_err)?;
1758            recipients.into_lua(&lua)
1759        });
1760
1761        methods.add_async_method(
1762            "set_recipient",
1763            move |lua, this, value: mlua::Value| async move {
1764                let recipients = match value {
1765                    mlua::Value::String(s) => {
1766                        let s = s.to_str()?;
1767                        vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1768                    }
1769                    _ => {
1770                        if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1771                            recips
1772                        } else {
1773                            vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1774                        }
1775                    }
1776                };
1777                this.set_recipient_list(recipients).await.map_err(any_err)
1778            },
1779        );
1780
1781        #[cfg(feature = "impl")]
1782        methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1783            this.dkim_sign(signer).await.map_err(any_err)
1784        });
1785
1786        methods.add_async_method("shrink", |_, this, _: ()| async move {
1787            if this.needs_save() {
1788                this.save(None).await.map_err(any_err)?;
1789            }
1790            this.shrink().map_err(any_err)
1791        });
1792
1793        methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1794            if this.needs_save() {
1795                this.save(None).await.map_err(any_err)?;
1796            }
1797            this.shrink_data().map_err(any_err)
1798        });
1799
1800        methods.add_async_method(
1801            "add_authentication_results",
1802            |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1803                let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1804                let results = AuthenticationResults {
1805                    serv_id: serv_id.as_bytes().as_ref().into(),
1806                    version: None,
1807                    results,
1808                };
1809
1810                this.prepend_header(
1811                    Some("Authentication-Results"),
1812                    results.encode_value().as_bytes(),
1813                )
1814                .await
1815                .map_err(any_err)?;
1816
1817                Ok(())
1818            },
1819        );
1820
1821        #[cfg(feature = "impl")]
1822        methods.add_async_method(
1823            "arc_verify",
1824            |lua, this, opt_resolver_name: Option<String>| async move {
1825                let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1826                lua.to_value_with(&results, serialize_options())
1827            },
1828        );
1829
1830        #[cfg(feature = "impl")]
1831        methods.add_async_method(
1832            "arc_seal",
1833            |lua,
1834             this,
1835             (signer, serv_id, auth_res, opt_resolver_name): (
1836                Signer,
1837                mlua::String,
1838                mlua::Value,
1839                Option<String>,
1840            )| async move {
1841                let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1842                this.arc_seal(
1843                    signer,
1844                    AuthenticationResults {
1845                        serv_id: serv_id.as_bytes().as_ref().into(),
1846                        version: None,
1847                        results,
1848                    },
1849                    opt_resolver_name,
1850                )
1851                .await
1852                .map_err(any_err)
1853            },
1854        );
1855
1856        #[cfg(feature = "impl")]
1857        methods.add_async_method(
1858            "dkim_verify",
1859            |lua, this, opt_resolver_name: Option<String>| async move {
1860                let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1861                lua.to_value_with(&results, serialize_options())
1862            },
1863        );
1864
1865        methods.add_async_method(
1866            "prepend_header",
1867            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1868                let encode = encode.unwrap_or(false);
1869                if encode {
1870                    let header = Header::new_unstructured(name.clone(), value);
1871                    this.prepend_header(Some(&name), header.get_raw_value())
1872                        .await
1873                        .map_err(any_err)?;
1874                } else {
1875                    this.prepend_header(Some(&name), value.as_bytes())
1876                        .await
1877                        .map_err(any_err)?;
1878                }
1879                Ok(())
1880            },
1881        );
1882        methods.add_async_method(
1883            "append_header",
1884            |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1885                let encode = encode.unwrap_or(false);
1886                if encode {
1887                    let header = Header::new_unstructured(name.clone(), value);
1888                    this.append_header(Some(&name), header.get_raw_value())
1889                        .await
1890                        .map_err(any_err)?;
1891                } else {
1892                    this.append_header(Some(&name), value.as_bytes())
1893                        .await
1894                        .map_err(any_err)?;
1895                }
1896                Ok(())
1897            },
1898        );
1899        methods.add_async_method("get_address_header", |_, this, name: String| async move {
1900            this.get_address_header(&name).await.map_err(any_err)
1901        });
1902        methods.add_async_method("from_header", |_, this, ()| async move {
1903            this.get_address_header("From").await.map_err(any_err)
1904        });
1905        methods.add_async_method("to_header", |_, this, ()| async move {
1906            this.get_address_header("To").await.map_err(any_err)
1907        });
1908
1909        methods.add_async_method(
1910            "get_first_named_header_value",
1911            |_, this, name: String| async move {
1912                this.get_first_named_header_value(&name)
1913                    .await
1914                    .map_err(any_err)
1915            },
1916        );
1917        methods.add_async_method(
1918            "get_all_named_header_values",
1919            |_, this, name: String| async move {
1920                this.get_all_named_header_values(&name)
1921                    .await
1922                    .map_err(any_err)
1923            },
1924        );
1925        methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1926            Ok(this
1927                .get_all_headers()
1928                .await
1929                .map_err(any_err)?
1930                .into_iter()
1931                .map(|(name, value)| vec![name, value])
1932                .collect::<Vec<Vec<BString>>>())
1933        });
1934        methods.add_async_method(
1935            "import_x_headers",
1936            |_, this, names: Option<Vec<String>>| async move {
1937                this.import_x_headers(names.unwrap_or_default())
1938                    .await
1939                    .map_err(any_err)
1940            },
1941        );
1942        methods.add_async_method(
1943            "import_headers",
1944            |_, this, specs: SerdeWrappedValue<Vec<ImportHeaderSpec>>| async move {
1945                this.import_headers(specs.0).await.map_err(any_err)
1946            },
1947        );
1948
1949        methods.add_async_method(
1950            "remove_x_headers",
1951            |_, this, names: Option<Vec<String>>| async move {
1952                this.remove_x_headers(names.unwrap_or_default())
1953                    .await
1954                    .map_err(any_err)
1955            },
1956        );
1957        methods.add_async_method(
1958            "remove_all_named_headers",
1959            |_, this, name: String| async move {
1960                this.remove_all_named_headers(&name).await.map_err(any_err)
1961            },
1962        );
1963
1964        methods.add_async_method(
1965            "import_scheduling_header",
1966            |lua, this, (header_name, remove): (String, bool)| async move {
1967                let opt_schedule = this
1968                    .import_scheduling_header(&header_name, remove)
1969                    .await
1970                    .map_err(any_err)?;
1971                lua.to_value(&opt_schedule)
1972            },
1973        );
1974
1975        methods.add_async_method(
1976            "set_scheduling",
1977            move |lua, this, params: mlua::Value| async move {
1978                let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1979                let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1980                lua.to_value(&opt_schedule)
1981            },
1982        );
1983
1984        methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1985            let report = this.parse_rfc3464().await.map_err(any_err)?;
1986            match report {
1987                Some(report) => lua.to_value_with(&report, serialize_options()),
1988                None => Ok(mlua::Value::Nil),
1989            }
1990        });
1991
1992        methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1993            let report = this.parse_rfc5965().await.map_err(any_err)?;
1994            match report {
1995                Some(report) => lua.to_value_with(&report, serialize_options()),
1996                None => Ok(mlua::Value::Nil),
1997            }
1998        });
1999
2000        methods.add_async_method("save", |_, this, ()| async move {
2001            this.save(None).await.map_err(any_err)
2002        });
2003
2004        methods.add_method("set_force_sync", move |_, this, force: bool| {
2005            this.set_force_sync(force);
2006            Ok(())
2007        });
2008
2009        methods.add_async_method(
2010            "check_fix_conformance",
2011            |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
2012                use std::str::FromStr;
2013                let check = MessageConformance::from_str(&check).map_err(any_err)?;
2014                let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
2015
2016                let settings = match settings {
2017                    Some(v) => Some(lua.from_value(v).map_err(any_err)?),
2018                    None => None,
2019                };
2020
2021                match this
2022                    .check_fix_conformance(check, fix, settings.as_ref())
2023                    .await
2024                {
2025                    Ok(_) => Ok(None),
2026                    Err(err) => Ok(Some(format!("{err:#}"))),
2027                }
2028            },
2029        );
2030    }
2031}
2032
2033impl TimerEntryWithDelay for WeakMessage {
2034    fn delay(&self) -> Duration {
2035        match self.upgrade() {
2036            None => {
2037                // Dangling/Cancelled. Make it appear due immediately
2038                Duration::from_millis(0)
2039            }
2040            Some(msg) => msg.delay(),
2041        }
2042    }
2043}
2044
2045impl TimerEntryWithDelay for Message {
2046    fn delay(&self) -> Duration {
2047        let inner = self.msg_and_id.inner.lock();
2048        match inner.due {
2049            Some(time) => {
2050                let now = Utc::now();
2051                let delta = time - now;
2052                delta.to_std().unwrap_or(Duration::from_millis(0))
2053            }
2054            None => Duration::from_millis(0),
2055        }
2056    }
2057}
2058
2059#[cfg(test)]
2060pub(crate) mod test {
2061    use super::*;
2062    use serde_json::json;
2063
2064    pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
2065        Message::new_dirty(
2066            SpoolId::new(),
2067            EnvelopeAddress::parse("sender@example.com").unwrap(),
2068            vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
2069            serde_json::json!({}),
2070            Arc::new(s.as_ref().to_vec().into_boxed_slice()),
2071        )
2072        .unwrap()
2073    }
2074
2075    fn data_as_string(msg: &Message) -> String {
2076        String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
2077    }
2078
2079    const X_HDR_CONTENT: &str =
2080        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
2081
2082    #[tokio::test]
2083    async fn import_all_x_headers() {
2084        let msg = new_msg_body(X_HDR_CONTENT);
2085
2086        msg.import_x_headers(vec![]).await.unwrap();
2087        k9::assert_equal!(
2088            msg.get_meta_obj().await.unwrap(),
2089            json!({
2090                "x_hello": "there",
2091                "x_header": "value",
2092            })
2093        );
2094    }
2095
2096    #[tokio::test]
2097    async fn meta_and_nil() {
2098        let msg = new_msg_body(X_HDR_CONTENT);
2099        // Ensure that json null round-trips
2100        msg.set_meta("test", serde_json::Value::Null).await.unwrap();
2101        k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
2102
2103        // and that it is exposed to lua as nil
2104        let lua = mlua::Lua::new();
2105        lua.globals().set("msg", msg).unwrap();
2106        lua.load("assert(msg:get_meta('test') == nil)")
2107            .exec()
2108            .unwrap();
2109    }
2110
2111    #[tokio::test]
2112    async fn import_some_x_headers() {
2113        let msg = new_msg_body(X_HDR_CONTENT);
2114
2115        msg.import_x_headers(vec!["x-hello".to_string()])
2116            .await
2117            .unwrap();
2118        k9::assert_equal!(
2119            msg.get_meta_obj().await.unwrap(),
2120            json!({
2121                "x_hello": "there",
2122            })
2123        );
2124    }
2125
2126    #[tokio::test]
2127    async fn import_headers_wildcard_remove() {
2128        let msg = new_msg_body(X_HDR_CONTENT);
2129
2130        msg.import_headers(vec![ImportHeaderSpec {
2131            name: "X-*".to_string(),
2132            remove: true,
2133            ..ImportHeaderSpec::default()
2134        }])
2135        .await
2136        .unwrap();
2137        k9::assert_equal!(
2138            msg.get_meta_obj().await.unwrap(),
2139            json!({
2140                "x_hello": "there",
2141                "x_header": "value",
2142            })
2143        );
2144        k9::assert_equal!(
2145            data_as_string(&msg),
2146            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2147        );
2148    }
2149
2150    #[tokio::test]
2151    async fn import_headers_match_modes() {
2152        let body =
2153            "Received: from a\r\nReceived: from b\r\nReceived: from c\r\nSubject: hi\r\n\r\nBody";
2154        for (mode, expected) in [
2155            (MatchMode::First, json!("from a")),
2156            (MatchMode::Last, json!("from c")),
2157            (MatchMode::All, json!(["from a", "from b", "from c"])),
2158        ] {
2159            let msg = new_msg_body(body);
2160            msg.import_headers(vec![ImportHeaderSpec {
2161                name: "Received".to_string(),
2162                match_mode: mode,
2163                ..ImportHeaderSpec::default()
2164            }])
2165            .await
2166            .unwrap();
2167            k9::assert_equal!(
2168                msg.get_meta_obj().await.unwrap(),
2169                json!({ "received": expected })
2170            );
2171        }
2172    }
2173
2174    #[tokio::test]
2175    async fn import_headers_no_match_skips_meta() {
2176        let msg = new_msg_body(X_HDR_CONTENT);
2177        msg.import_headers(vec![ImportHeaderSpec {
2178            name: "Nonexistent".to_string(),
2179            match_mode: MatchMode::All,
2180            ..ImportHeaderSpec::default()
2181        }])
2182        .await
2183        .unwrap();
2184        k9::assert_equal!(msg.get_meta_obj().await.unwrap(), json!({}));
2185    }
2186
2187    #[tokio::test]
2188    async fn import_headers_specific_before_wildcard() {
2189        let body = "X-Campaign-Id: 42\r\nX-Mailer: foo\r\n\r\nBody";
2190        let msg = new_msg_body(body);
2191        msg.import_headers(vec![
2192            ImportHeaderSpec {
2193                name: "X-Campaign-Id".to_string(),
2194                target: Some("campaign".to_string()),
2195                ..ImportHeaderSpec::default()
2196            },
2197            ImportHeaderSpec {
2198                name: "X-*".to_string(),
2199                ..ImportHeaderSpec::default()
2200            },
2201        ])
2202        .await
2203        .unwrap();
2204        k9::assert_equal!(
2205            msg.get_meta_obj().await.unwrap(),
2206            json!({
2207                "campaign": "42",
2208                "x_mailer": "foo",
2209            })
2210        );
2211    }
2212
2213    #[test]
2214    fn name_transforms() {
2215        let n = "X-Campaign-Id";
2216        k9::assert_equal!(
2217            apply_name_transform(n, NameTransform::SnakeCase),
2218            "x_campaign_id"
2219        );
2220        k9::assert_equal!(
2221            apply_name_transform(n, NameTransform::KebabCase),
2222            "x-campaign-id"
2223        );
2224        k9::assert_equal!(
2225            apply_name_transform(n, NameTransform::CamelCase),
2226            "xCampaignId"
2227        );
2228        k9::assert_equal!(
2229            apply_name_transform(n, NameTransform::PascalCase),
2230            "XCampaignId"
2231        );
2232    }
2233
2234    #[test]
2235    fn pattern_compile_rejects_bad_patterns() {
2236        compile_header_pattern("").unwrap_err();
2237        compile_header_pattern("*").unwrap_err();
2238        compile_header_pattern("*-Id").unwrap_err();
2239        compile_header_pattern("X-*-Id").unwrap_err();
2240        compile_header_pattern("X-**").unwrap_err();
2241        assert!(matches!(
2242            compile_header_pattern("Subject").unwrap(),
2243            HeaderPattern::Exact(_)
2244        ));
2245        assert!(matches!(
2246            compile_header_pattern("X-*").unwrap(),
2247            HeaderPattern::Prefix(_)
2248        ));
2249    }
2250
2251    #[tokio::test]
2252    async fn import_headers_target_with_wildcard_rejected() {
2253        let msg = new_msg_body(X_HDR_CONTENT);
2254        msg.import_headers(vec![ImportHeaderSpec {
2255            name: "X-*".to_string(),
2256            target: Some("oops".to_string()),
2257            ..ImportHeaderSpec::default()
2258        }])
2259        .await
2260        .unwrap_err();
2261    }
2262
2263    #[tokio::test]
2264    async fn remove_all_x_headers() {
2265        let msg = new_msg_body(X_HDR_CONTENT);
2266
2267        msg.remove_x_headers(vec![]).await.unwrap();
2268        k9::assert_equal!(
2269            data_as_string(&msg),
2270            "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2271        );
2272    }
2273
2274    #[tokio::test]
2275    async fn prepend_header_2_params() {
2276        let msg = new_msg_body(X_HDR_CONTENT);
2277
2278        msg.prepend_header(Some("Date"), b"Today").await.unwrap();
2279        k9::assert_equal!(
2280            data_as_string(&msg),
2281            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2282        );
2283    }
2284
2285    #[tokio::test]
2286    async fn prepend_header_1_params() {
2287        let msg = new_msg_body(X_HDR_CONTENT);
2288
2289        msg.prepend_header(None, b"Date: Today").await.unwrap();
2290        k9::assert_equal!(
2291            data_as_string(&msg),
2292            "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2293        );
2294    }
2295
2296    #[tokio::test]
2297    async fn append_header_2_params() {
2298        let msg = new_msg_body(X_HDR_CONTENT);
2299
2300        msg.append_header(Some("Date"), b"Today").await.unwrap();
2301        k9::assert_equal!(
2302            data_as_string(&msg),
2303            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2304        );
2305    }
2306
2307    #[tokio::test]
2308    async fn append_header_1_params() {
2309        let msg = new_msg_body(X_HDR_CONTENT);
2310
2311        msg.append_header(None, b"Date: Today").await.unwrap();
2312        k9::assert_equal!(
2313            data_as_string(&msg),
2314            "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2315        );
2316    }
2317
2318    const MULTI_HEADER_CONTENT: &str =
2319        "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
2320
2321    #[tokio::test]
2322    async fn get_first_header() {
2323        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2324        k9::assert_equal!(
2325            msg.get_first_named_header_value("X-header")
2326                .await
2327                .unwrap()
2328                .unwrap(),
2329            "value"
2330        );
2331    }
2332
2333    #[tokio::test]
2334    async fn get_all_header() {
2335        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2336        k9::assert_equal!(
2337            msg.get_all_named_header_values("X-header").await.unwrap(),
2338            vec!["value".to_string(), "another value".to_string()]
2339        );
2340    }
2341
2342    #[tokio::test]
2343    async fn remove_first() {
2344        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2345        msg.remove_first_named_header("X-header").await.unwrap();
2346        k9::assert_equal!(
2347            data_as_string(&msg),
2348            "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
2349        );
2350    }
2351
2352    #[tokio::test]
2353    async fn remove_all() {
2354        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2355        msg.remove_all_named_headers("X-header").await.unwrap();
2356        k9::assert_equal!(
2357            data_as_string(&msg),
2358            "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
2359        );
2360    }
2361
2362    #[tokio::test]
2363    async fn append_text_plain() {
2364        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2365        msg.append_text_plain("I am at the bottom").await.unwrap();
2366        k9::assert_equal!(
2367            data_as_string(&msg),
2368            "X-Hello: there\r\n\
2369             X-Header: value\r\n\
2370             Subject: Hello\r\n\
2371             X-Header: another value\r\n\
2372             From :Someone@somewhere\r\n\
2373             Content-Type: text/plain;\r\n\
2374             \tcharset=\"us-ascii\"\r\n\
2375             \r\n\
2376             Body\r\n\
2377             I am at the bottom\r\n"
2378        );
2379    }
2380
2381    const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
2382\tboundary=\"my-boundary\"\r\n\
2383\r\n\
2384--my-boundary\r\n\
2385Content-Type: text/plain;\r\n\
2386\tcharset=\"us-ascii\"\r\n\
2387\r\n\
2388plain text\r\n\
2389--my-boundary\r\n\
2390Content-Type: text/html;\r\n\
2391\tcharset=\"us-ascii\"\r\n\
2392\r\n\
2393<b>rich</b> text\r\n\
2394--my-boundary\r\n\
2395Content-Type: application/octet-stream\r\n\
2396Content-Transfer-Encoding: base64\r\n\
2397Content-Disposition: attachment;\r\n\
2398\tfilename=\"woot.bin\"\r\n\
2399Content-ID: <woot.id@somewhere>\r\n\
2400\r\n\
2401AAECAw==\r\n\
2402--my-boundary--\r\n\
2403\r\n";
2404
2405    const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2406\tboundary=\"my-boundary\"\r\n\
2407\r\n\
2408--my-boundary\r\n\
2409Content-Type: text/plain;\r\n\
2410\tcharset=\"us-ascii\"\r\n\
2411\r\n\
2412plain text\r\n\
2413--my-boundary\r\n\
2414Content-Type: text/html;\r\n\
2415\tcharset=\"us-ascii\"\r\n\
2416\r\n\
2417<BODY>\r\n\
2418<b>rich</b> text\r\n\
2419</BODY>\r\n\
2420--my-boundary\r\n\
2421Content-Type: application/octet-stream\r\n\
2422Content-Transfer-Encoding: base64\r\n\
2423Content-Disposition: attachment;\r\n\
2424\tfilename=\"woot.bin\"\r\n\
2425Content-ID: <woot.id>\r\n\
2426\r\n\
2427AAECAw==\r\n\
2428--my-boundary--\r\n\
2429\r\n";
2430
2431    #[tokio::test]
2432    async fn append_text_html() {
2433        let msg = new_msg_body(MIXED_CONTENT);
2434        msg.append_text_html("bottom html").await.unwrap();
2435        k9::snapshot!(
2436            data_as_string(&msg),
2437            r#"
2438Content-Type: multipart/mixed;\r
2439\tboundary="my-boundary"\r
2440\r
2441--my-boundary\r
2442Content-Type: text/plain;\r
2443\tcharset="us-ascii"\r
2444\r
2445plain text\r
2446--my-boundary\r
2447Content-Type: text/html;\r
2448\tcharset="us-ascii"\r
2449\r
2450<b>rich</b> text\r
2451\r
2452bottom html\r
2453--my-boundary\r
2454Content-Type: application/octet-stream\r
2455Content-Transfer-Encoding: base64\r
2456Content-Disposition: attachment;\r
2457\tfilename="woot.bin"\r
2458Content-ID: <woot.id@somewhere>\r
2459\r
2460AAECAw==\r
2461--my-boundary--\r
2462\r
2463
2464"#
2465        );
2466
2467        let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2468        msg.append_text_html("bottom html 👻").await.unwrap();
2469        k9::snapshot!(
2470            data_as_string(&msg),
2471            r#"
2472Content-Type: multipart/mixed;\r
2473\tboundary="my-boundary"\r
2474\r
2475--my-boundary\r
2476Content-Type: text/plain;\r
2477\tcharset="us-ascii"\r
2478\r
2479plain text\r
2480--my-boundary\r
2481Content-Type: text/html;\r
2482\tcharset="utf-8"\r
2483Content-Transfer-Encoding: quoted-printable\r
2484\r
2485<BODY>\r
2486<b>rich</b> text\r
2487\r
2488bottom html =F0=9F=91=BB</BODY>\r
2489--my-boundary\r
2490Content-Type: application/octet-stream\r
2491Content-Transfer-Encoding: base64\r
2492Content-Disposition: attachment;\r
2493\tfilename="woot.bin"\r
2494Content-ID: <woot.id>\r
2495\r
2496AAECAw==\r
2497--my-boundary--\r
2498\r
2499
2500"#
2501        );
2502    }
2503
2504    #[tokio::test]
2505    async fn append_text_plain_mixed() {
2506        let msg = new_msg_body(MIXED_CONTENT);
2507        msg.append_text_plain("bottom text 👾").await.unwrap();
2508        k9::snapshot!(
2509            data_as_string(&msg),
2510            r#"
2511Content-Type: multipart/mixed;\r
2512\tboundary="my-boundary"\r
2513\r
2514--my-boundary\r
2515Content-Type: text/plain;\r
2516\tcharset="utf-8"\r
2517Content-Transfer-Encoding: quoted-printable\r
2518\r
2519plain text\r
2520\r
2521bottom text =F0=9F=91=BE\r
2522--my-boundary\r
2523Content-Type: text/html;\r
2524\tcharset="us-ascii"\r
2525\r
2526<b>rich</b> text\r
2527--my-boundary\r
2528Content-Type: application/octet-stream\r
2529Content-Transfer-Encoding: base64\r
2530Content-Disposition: attachment;\r
2531\tfilename="woot.bin"\r
2532Content-ID: <woot.id@somewhere>\r
2533\r
2534AAECAw==\r
2535--my-boundary--\r
2536\r
2537
2538"#
2539        );
2540    }
2541
2542    #[tokio::test]
2543    async fn check_conformance_angle_msg_id() {
2544        const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2545Message-ID: <<1234@example.com>>\r
2546\r
2547Hello";
2548        let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2549        k9::snapshot!(
2550            msg.check_fix_conformance(
2551                MessageConformance::MISSING_MESSAGE_ID_HEADER,
2552                MessageConformance::empty(),
2553                None,
2554            )
2555            .await
2556            .unwrap_err()
2557            .to_string(),
2558            "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2559        );
2560
2561        msg.check_fix_conformance(
2562            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2563            MessageConformance::MISSING_MESSAGE_ID_HEADER,
2564            None,
2565        )
2566        .await
2567        .unwrap();
2568
2569        // Can't use a snapshot test here because the fixed header
2570        // has a unique random component
2571        /*
2572                k9::snapshot!(
2573                    data_as_string(&msg),
2574                    r#"
2575        Subject: hello\r
2576        Message-ID: <4106566d2ce911ef9dcd0242289ea0df@example.com>\r
2577        \r
2578        Hello
2579        "#
2580                );
2581        */
2582
2583        const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2584Message-ID: <<1234@example.com>>\r
2585\r
2586Hello this is a really long line Hello this is a really long line \
2587Hello this is a really long line Hello this is a really long line \
2588Hello this is a really long line Hello this is a really long line \
2589Hello this is a really long line Hello this is a really long line \
2590Hello this is a really long line Hello this is a really long line \
2591Hello this is a really long line Hello this is a really long line \
2592Hello this is a really long line Hello this is a really long line
2593";
2594        let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2595        msg.check_fix_conformance(
2596            MessageConformance::MISSING_COLON_VALUE,
2597            MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2598            None,
2599        )
2600        .await
2601        .unwrap();
2602
2603        // Can't use a snapshot test here because the fixed header
2604        // has a random component
2605        /*
2606                k9::snapshot!(
2607                    data_as_string(&msg),
2608                    r#"
2609        Content-Type: text/plain;\r
2610        \tcharset="us-ascii"\r
2611        Content-Transfer-Encoding: quoted-printable\r
2612        Subject: hello\r
2613        Message-ID: <749fc87e2cea11ef96a50242289ea0df@example.com>\r
2614        \r
2615        Hello this is a really long line Hello this is a really long line Hello thi=\r
2616        s is a really long line Hello this is a really long line Hello this is a re=\r
2617        ally long line Hello this is a really long line Hello this is a really long=\r
2618         line Hello this is a really long line Hello this is a really long line Hel=\r
2619        lo this is a really long line Hello this is a really long line Hello this i=\r
2620        s a really long line Hello this is a really long line Hello this is a reall=\r
2621        y long line=0A\r
2622
2623        "#
2624                );
2625        */
2626    }
2627
2628    #[tokio::test]
2629    async fn check_conformance() {
2630        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2631        msg.check_fix_conformance(
2632            MessageConformance::default(),
2633            MessageConformance::MISSING_MIME_VERSION,
2634            None,
2635        )
2636        .await
2637        .unwrap();
2638        k9::snapshot!(
2639            data_as_string(&msg),
2640            r#"
2641X-Hello: there\r
2642X-Header: value\r
2643Subject: Hello\r
2644X-Header: another value\r
2645From :Someone@somewhere\r
2646Mime-Version: 1.0\r
2647\r
2648Body
2649"#
2650        );
2651
2652        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2653        msg.check_fix_conformance(
2654            MessageConformance::default(),
2655            MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2656            None,
2657        )
2658        .await
2659        .unwrap();
2660        k9::snapshot!(
2661            data_as_string(&msg),
2662            r#"
2663Content-Type: text/plain;\r
2664\tcharset="us-ascii"\r
2665X-Hello: there\r
2666X-Header: value\r
2667Subject: Hello\r
2668X-Header: another value\r
2669From: <Someone@somewhere>\r
2670Mime-Version: 1.0\r
2671\r
2672Body\r
2673
2674"#
2675        );
2676    }
2677
2678    #[tokio::test]
2679    async fn check_fix_latin_input() {
2680        const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2681        let msg = new_msg_body(&*POUNDS);
2682        msg.check_fix_conformance(
2683            MessageConformance::default(),
2684            MessageConformance::NEEDS_TRANSFER_ENCODING,
2685            Some(&CheckFixSettings {
2686                detect_encoding: true,
2687                include_encodings: vec!["iso-8859-1".to_string()],
2688                ..Default::default()
2689            }),
2690        )
2691        .await
2692        .unwrap();
2693
2694        let subject = msg
2695            .get_first_named_header_value("subject")
2696            .await
2697            .unwrap()
2698            .unwrap();
2699        assert_eq!(subject, "£");
2700    }
2701
2702    #[tokio::test]
2703    async fn set_scheduling() -> anyhow::Result<()> {
2704        let msg = new_msg_body(MULTI_HEADER_CONTENT);
2705        assert!(msg.get_due().is_none(), "due is implicitly now");
2706
2707        let now = Utc::now();
2708        let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2709
2710        msg.set_scheduling(Some(Scheduling {
2711            restriction: None,
2712            first_attempt: Some((now + one_day).into()),
2713            expires: None,
2714        }))
2715        .await?;
2716
2717        let due = msg.get_due().expect("due to now be set");
2718        assert!(due - now >= one_day, "due time is at least 1 day away");
2719
2720        Ok(())
2721    }
2722
2723    #[cfg(all(test, target_pointer_width = "64"))]
2724    #[test]
2725    fn sizes() {
2726        assert_eq!(std::mem::size_of::<Message>(), 8);
2727        assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2728        assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2729    }
2730}