1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options, SerdeWrappedValue};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24 CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::collections::{BTreeMap, HashSet};
37use std::hash::Hash;
38use std::sync::{Arc, LazyLock, Weak};
39use std::time::{Duration, Instant};
40use timeq::TimerEntryWithDelay;
41
42bitflags::bitflags! {
43 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
44 struct MessageFlags: u8 {
45 const META_DIRTY = 1;
47 const DATA_DIRTY = 2;
49 const SCHEDULED = 4;
51 const FORCE_SYNC = 8;
53 }
54}
55
56declare_metric! {
57static MESSAGE_COUNT: IntGauge("message_count");
64}
65
66declare_metric! {
67static META_COUNT: IntGauge("message_meta_resident_count");
74}
75
76declare_metric! {
77static DATA_COUNT: IntGauge("message_data_resident_count");
87}
88
89static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
90
91declare_metric! {
92static SAVE_HIST: Histogram("message_save_latency");
102}
103
104declare_metric! {
105static LOAD_DATA_HIST: Histogram("message_data_load_latency");
116}
117
118declare_metric! {
119static LOAD_META_HIST: Histogram("message_meta_load_latency");
125}
126
127#[derive(Debug)]
128struct MessageInner {
129 metadata: Option<Box<MetaData>>,
130 data: Arc<Box<[u8]>>,
131 flags: MessageFlags,
132 num_attempts: u16,
133 due: Option<DateTime<Utc>>,
134}
135
136#[derive(Debug)]
137pub(crate) struct MessageWithId {
138 id: SpoolId,
139 inner: Mutex<MessageInner>,
140 link: LinkedListAtomicLink,
141}
142
143intrusive_adapter!(
144 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
145);
146
147pub struct MessageList {
152 list: LinkedList<MessageWithIdAdapter>,
153 len: usize,
154}
155
156impl Default for MessageList {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162impl MessageList {
163 pub fn new() -> Self {
165 Self {
166 list: LinkedList::new(MessageWithIdAdapter::default()),
167 len: 0,
168 }
169 }
170
171 pub fn len(&self) -> usize {
173 self.len
174 }
175
176 pub fn is_empty(&self) -> bool {
177 self.len == 0
178 }
179
180 pub fn take(&mut self) -> Self {
183 let new_list = Self {
184 list: self.list.take(),
185 len: self.len,
186 };
187 self.len = 0;
188 new_list
189 }
190
191 pub fn push_back(&mut self, message: Message) {
193 self.list.push_back(message.msg_and_id);
194 self.len += 1;
195 }
196
197 pub fn pop_front(&mut self) -> Option<Message> {
199 self.list.pop_front().map(|msg_and_id| {
200 self.len -= 1;
201 Message { msg_and_id }
202 })
203 }
204
205 pub fn pop_back(&mut self) -> Option<Message> {
207 self.list.pop_back().map(|msg_and_id| {
208 self.len -= 1;
209 Message { msg_and_id }
210 })
211 }
212
213 pub fn drain(&mut self) -> Vec<Message> {
217 let mut messages = Vec::with_capacity(self.len);
218 while let Some(msg) = self.pop_front() {
219 messages.push(msg);
220 }
221 messages
222 }
223
224 pub fn extend_from_iter<I>(&mut self, iter: I)
225 where
226 I: Iterator<Item = Message>,
227 {
228 for msg in iter {
229 self.push_back(msg)
230 }
231 }
232}
233
234impl IntoIterator for MessageList {
235 type Item = Message;
236 type IntoIter = MessageListIter;
237 fn into_iter(self) -> MessageListIter {
238 MessageListIter { list: self.list }
239 }
240}
241
242pub struct MessageListIter {
243 list: LinkedList<MessageWithIdAdapter>,
244}
245
246impl Iterator for MessageListIter {
247 type Item = Message;
248 fn next(&mut self) -> Option<Message> {
249 self.list
250 .pop_front()
251 .map(|msg_and_id| Message { msg_and_id })
252 }
253}
254
255#[derive(Clone, Debug)]
256#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
257pub struct Message {
258 pub(crate) msg_and_id: Arc<MessageWithId>,
259}
260
261impl PartialEq for Message {
262 fn eq(&self, other: &Self) -> bool {
263 self.id() == other.id()
264 }
265}
266impl Eq for Message {}
267
268impl Hash for Message {
269 fn hash<H>(&self, hasher: &mut H)
270 where
271 H: std::hash::Hasher,
272 {
273 self.id().hash(hasher)
274 }
275}
276
277#[derive(Clone, Debug)]
278pub struct WeakMessage {
279 weak: Weak<MessageWithId>,
280}
281
282impl WeakMessage {
283 pub fn upgrade(&self) -> Option<Message> {
284 Some(Message {
285 msg_and_id: self.weak.upgrade()?,
286 })
287 }
288}
289
290#[serde_as]
291#[derive(Debug, Serialize, Deserialize, Clone)]
292pub(crate) struct MetaData {
293 pub sender: EnvelopeAddress,
294 #[serde_as(as = "OneOrMany<_, PreferOne>")]
295 pub recipient: Vec<EnvelopeAddress>,
296 pub meta: serde_json::Value,
297 #[serde(default)]
298 pub schedule: Option<Scheduling>,
299}
300
301impl Drop for MessageInner {
302 fn drop(&mut self) {
303 if self.metadata.is_some() {
304 META_COUNT.dec();
305 }
306 if !self.data.is_empty() {
307 DATA_COUNT.dec();
308 }
309 MESSAGE_COUNT.dec();
310 }
311}
312
313impl Message {
314 pub fn new_dirty(
317 id: SpoolId,
318 sender: EnvelopeAddress,
319 recipient: Vec<EnvelopeAddress>,
320 meta: serde_json::Value,
321 data: Arc<Box<[u8]>>,
322 ) -> anyhow::Result<Self> {
323 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
324 MESSAGE_COUNT.inc();
325 DATA_COUNT.inc();
326 META_COUNT.inc();
327 Ok(Self {
328 msg_and_id: Arc::new(MessageWithId {
329 id,
330 inner: Mutex::new(MessageInner {
331 metadata: Some(Box::new(MetaData {
332 sender,
333 recipient,
334 meta,
335 schedule: None,
336 })),
337 data,
338 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
339 num_attempts: 0,
340 due: None,
341 }),
342 link: LinkedListAtomicLink::default(),
343 }),
344 })
345 }
346
347 pub fn weak(&self) -> WeakMessage {
348 WeakMessage {
349 weak: Arc::downgrade(&self.msg_and_id),
350 }
351 }
352
353 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
357 let metadata: MetaData = serde_json::from_slice(&metadata)?;
358 MESSAGE_COUNT.inc();
359 META_COUNT.inc();
360
361 let flags = if metadata.schedule.is_some() {
362 MessageFlags::SCHEDULED
363 } else {
364 MessageFlags::empty()
365 };
366
367 Ok(Self {
368 msg_and_id: Arc::new(MessageWithId {
369 id,
370 inner: Mutex::new(MessageInner {
371 metadata: Some(Box::new(metadata)),
372 data: NO_DATA.clone(),
373 flags,
374 num_attempts: 0,
375 due: None,
376 }),
377 link: LinkedListAtomicLink::default(),
378 }),
379 })
380 }
381
382 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
383 MESSAGE_COUNT.inc();
384 META_COUNT.inc();
385
386 let flags = if metadata.schedule.is_some() {
387 MessageFlags::SCHEDULED
388 } else {
389 MessageFlags::empty()
390 };
391
392 Self {
393 msg_and_id: Arc::new(MessageWithId {
394 id,
395 inner: Mutex::new(MessageInner {
396 metadata: Some(Box::new(metadata)),
397 data,
398 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
399 num_attempts: 0,
400 due: None,
401 }),
402 link: LinkedListAtomicLink::default(),
403 }),
404 }
405 }
406
407 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
408 let meta_spool = get_meta_spool();
409 let data = meta_spool.load(id).await?;
410 Self::new_from_spool(id, data)
411 }
412
413 pub fn get_num_attempts(&self) -> u16 {
414 let inner = self.msg_and_id.inner.lock();
415 inner.num_attempts
416 }
417
418 pub fn set_num_attempts(&self, num_attempts: u16) {
419 let mut inner = self.msg_and_id.inner.lock();
420 inner.num_attempts = num_attempts;
421 }
422
423 pub fn increment_num_attempts(&self) {
424 let mut inner = self.msg_and_id.inner.lock();
425 inner.num_attempts += 1;
426 }
427
428 pub async fn set_scheduling(
429 &self,
430 scheduling: Option<Scheduling>,
431 ) -> anyhow::Result<Option<Scheduling>> {
432 self.load_meta_if_needed().await?;
433 let mut inner = self.msg_and_id.inner.lock();
434 match &mut inner.metadata {
435 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
436 Some(meta) => {
437 meta.schedule = scheduling;
438 inner
439 .flags
440 .set(MessageFlags::SCHEDULED, scheduling.is_some());
441 if let Some(sched) = scheduling {
442 let due = inner.due.unwrap_or_else(Utc::now);
443 inner.due = Some(sched.adjust_for_schedule(due));
444 }
445 Ok(scheduling)
446 }
447 }
448 }
449
450 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
451 self.load_meta_if_needed().await?;
452 let inner = self.msg_and_id.inner.lock();
453 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
454 }
455
456 pub fn get_due(&self) -> Option<DateTime<Utc>> {
457 let inner = self.msg_and_id.inner.lock();
458 inner.due
459 }
460
461 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
462 let scale = rand::random::<f32>();
463 let value = (scale * limit as f32) as i64;
464 self.delay_by(seconds(value)?).await
465 }
466
467 pub async fn delay_by(
468 &self,
469 duration: chrono::Duration,
470 ) -> anyhow::Result<Option<DateTime<Utc>>> {
471 let due = Utc::now() + duration;
472 self.set_due(Some(due)).await
473 }
474
475 pub async fn delay_by_and_jitter(
477 &self,
478 duration: chrono::Duration,
479 ) -> anyhow::Result<Option<DateTime<Utc>>> {
480 let scale = rand::random::<f32>();
481 let value = (scale * 60.) as i64;
482 let due = Utc::now() + duration + seconds(value)?;
483 self.set_due(Some(due)).await
484 }
485
486 pub async fn set_due(
487 &self,
488 due: Option<DateTime<Utc>>,
489 ) -> anyhow::Result<Option<DateTime<Utc>>> {
490 let due = {
491 let mut inner = self.msg_and_id.inner.lock();
492
493 if !inner.flags.contains(MessageFlags::SCHEDULED) {
494 inner.due = due;
496 return Ok(inner.due);
497 }
498
499 let due = due.unwrap_or_else(Utc::now);
500
501 if let Some(meta) = &inner.metadata {
502 inner.due = match &meta.schedule {
503 Some(sched) => Some(sched.adjust_for_schedule(due)),
504 None => Some(due),
505 };
506 return Ok(inner.due);
507 }
508
509 due
512 };
513
514 self.load_meta().await?;
515
516 {
517 let mut inner = self.msg_and_id.inner.lock();
518 match &inner.metadata {
519 Some(meta) => {
520 inner.due = match &meta.schedule {
521 Some(sched) => Some(sched.adjust_for_schedule(due)),
522 None => Some(due),
523 };
524 Ok(inner.due)
525 }
526 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
527 }
528 }
529 }
530
531 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
532 let inner = self.msg_and_id.inner.lock();
533 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
534 Some(Arc::clone(&inner.data))
535 } else {
536 None
537 }
538 }
539
540 fn get_meta_if_dirty(&self) -> Option<MetaData> {
541 let inner = self.msg_and_id.inner.lock();
542 if inner.flags.contains(MessageFlags::META_DIRTY) {
543 inner.metadata.as_ref().map(|md| (**md).clone())
544 } else {
545 None
546 }
547 }
548
549 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
550 self.load_meta_if_needed().await?;
551 let inner = self.msg_and_id.inner.lock();
552 inner
553 .metadata
554 .as_ref()
555 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
556 .map(|md| (**md).clone())
557 }
558
559 pub fn set_force_sync(&self, force: bool) {
560 let mut inner = self.msg_and_id.inner.lock();
561 inner.flags.set(MessageFlags::FORCE_SYNC, force);
562 }
563
564 pub fn needs_save(&self) -> bool {
565 let inner = self.msg_and_id.inner.lock();
566 inner.flags.contains(MessageFlags::META_DIRTY)
567 || inner.flags.contains(MessageFlags::DATA_DIRTY)
568 }
569
570 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
571 let _timer = SAVE_HIST.start_timer();
572 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
573 .await
574 }
575
576 pub async fn save_to(
577 &self,
578 meta_spool: &(dyn Spool + Send + Sync),
579 data_spool: &(dyn Spool + Send + Sync),
580 deadline: Option<Instant>,
581 ) -> anyhow::Result<()> {
582 let force_sync = self
583 .msg_and_id
584 .inner
585 .lock()
586 .flags
587 .contains(MessageFlags::FORCE_SYNC);
588
589 let data_fut = if let Some(data) = self.get_data_if_dirty() {
590 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
591 data_spool
592 .store(self.msg_and_id.id, data, force_sync, deadline)
593 .map(|_| true)
594 .boxed()
595 } else {
596 futures::future::ready(false).boxed()
597 };
598 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
599 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
600 meta_spool
601 .store(self.msg_and_id.id, meta, force_sync, deadline)
602 .map(|_| true)
603 .boxed()
604 } else {
605 futures::future::ready(false).boxed()
606 };
607
608 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
614
615 if data_res {
616 self.msg_and_id
617 .inner
618 .lock()
619 .flags
620 .remove(MessageFlags::DATA_DIRTY);
621 }
622 if meta_res {
623 self.msg_and_id
624 .inner
625 .lock()
626 .flags
627 .remove(MessageFlags::META_DIRTY);
628 }
629 Ok(())
630 }
631
632 pub fn id(&self) -> &SpoolId {
633 &self.msg_and_id.id
634 }
635
636 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
638 self.save(None).await?;
639 self.shrink()
640 }
641
642 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
644 self.save(None).await?;
645 self.shrink_data()
646 }
647
648 pub fn shrink_data(&self) -> anyhow::Result<bool> {
649 let mut inner = self.msg_and_id.inner.lock();
650 let mut did_shrink = false;
651 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
652 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
653 }
654 if !inner.data.is_empty() {
655 DATA_COUNT.dec();
656 did_shrink = true;
657 }
658 if !inner.data.is_empty() {
659 inner.data = NO_DATA.clone();
660 did_shrink = true;
661 }
662 Ok(did_shrink)
663 }
664
665 pub fn shrink(&self) -> anyhow::Result<bool> {
666 let mut inner = self.msg_and_id.inner.lock();
667 let mut did_shrink = false;
668 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
669 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
670 }
671 if inner.flags.contains(MessageFlags::META_DIRTY) {
672 anyhow::bail!("Cannot shrink message: META_DIRTY");
673 }
674 if inner.metadata.take().is_some() {
675 META_COUNT.dec();
676 did_shrink = true;
677 }
678 if !inner.data.is_empty() {
679 DATA_COUNT.dec();
680 did_shrink = true;
681 }
682 if !inner.data.is_empty() {
683 inner.data = NO_DATA.clone();
684 did_shrink = true;
685 }
686 Ok(did_shrink)
687 }
688
689 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
690 self.load_meta_if_needed().await?;
691 let inner = self.msg_and_id.inner.lock();
692 match &inner.metadata {
693 Some(meta) => Ok(meta.sender.clone()),
694 None => anyhow::bail!("Message::sender metadata is not loaded"),
695 }
696 }
697
698 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
699 self.load_meta_if_needed().await?;
700 let mut inner = self.msg_and_id.inner.lock();
701 match &mut inner.metadata {
702 Some(meta) => {
703 meta.sender = sender;
704 inner.flags.set(MessageFlags::DATA_DIRTY, true);
705 Ok(())
706 }
707 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
708 }
709 }
710
711 #[deprecated = "use recipient_list or first_recipient instead"]
712 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
713 self.first_recipient().await
714 }
715
716 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
717 self.load_meta_if_needed().await?;
718 let inner = self.msg_and_id.inner.lock();
719 match &inner.metadata {
720 Some(meta) => match meta.recipient.first() {
721 Some(recip) => Ok(recip.clone()),
722 None => anyhow::bail!("recipient list is empty!?"),
723 },
724 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
725 }
726 }
727
728 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
729 self.load_meta_if_needed().await?;
730 let inner = self.msg_and_id.inner.lock();
731 match &inner.metadata {
732 Some(meta) => Ok(meta.recipient.clone()),
733 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
734 }
735 }
736
737 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
738 self.load_meta_if_needed().await?;
739 let inner = self.msg_and_id.inner.lock();
740 match &inner.metadata {
741 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
742 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
743 }
744 }
745
746 #[deprecated = "use set_recipient_list instead"]
747 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
748 self.set_recipient_list(vec![recipient]).await
749 }
750
751 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
752 self.load_meta_if_needed().await?;
753
754 let mut inner = self.msg_and_id.inner.lock();
755 match &mut inner.metadata {
756 Some(meta) => {
757 meta.recipient = recipient;
758 inner.flags.set(MessageFlags::DATA_DIRTY, true);
759 Ok(())
760 }
761 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
762 }
763 }
764
765 pub fn is_meta_loaded(&self) -> bool {
766 self.msg_and_id.inner.lock().metadata.is_some()
767 }
768
769 pub fn is_data_loaded(&self) -> bool {
770 !self.msg_and_id.inner.lock().data.is_empty()
771 }
772
773 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
774 if self.is_meta_loaded() {
775 return Ok(());
776 }
777 self.load_meta().await
778 }
779
780 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
781 self.load_data_if_needed().await
782 }
783
784 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
785 if self.is_data_loaded() {
786 return Ok(self.get_data_maybe_not_loaded());
787 }
788 self.load_data().await
789 }
790
791 pub async fn load_meta(&self) -> anyhow::Result<()> {
792 let _timer = LOAD_META_HIST.start_timer();
793 self.load_meta_from(&**get_meta_spool()).await
794 }
795
796 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
797 let id = self.id();
798 let data = meta_spool.load(*id).await?;
799 let mut inner = self.msg_and_id.inner.lock();
800 let was_not_loaded = inner.metadata.is_none();
801 let metadata: MetaData = serde_json::from_slice(&data)?;
802 inner.metadata.replace(Box::new(metadata));
803 if was_not_loaded {
804 META_COUNT.inc();
805 }
806 Ok(())
807 }
808
809 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
810 let _timer = LOAD_DATA_HIST.start_timer();
811 self.load_data_from(&**get_data_spool()).await
812 }
813
814 async fn load_data_from(
815 &self,
816 data_spool: &(dyn Spool + Send + Sync),
817 ) -> anyhow::Result<Arc<Box<[u8]>>> {
818 let data = data_spool.load(*self.id()).await?;
819 let mut inner = self.msg_and_id.inner.lock();
820 let was_empty = inner.data.is_empty();
821 inner.data = Arc::new(data.into_boxed_slice());
822 if was_empty {
823 DATA_COUNT.inc();
824 }
825 Ok(inner.data.clone())
826 }
827
828 pub fn assign_data(&self, data: Vec<u8>) {
829 let mut inner = self.msg_and_id.inner.lock();
830 let was_empty = inner.data.is_empty();
831 inner.data = Arc::new(data.into_boxed_slice());
832 inner.flags.set(MessageFlags::DATA_DIRTY, true);
833 if was_empty {
834 DATA_COUNT.inc();
835 }
836 }
837
838 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
839 let inner = self.msg_and_id.inner.lock();
840 inner.data.clone()
841 }
842
843 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
844 &self,
845 key: S,
846 value: V,
847 ) -> anyhow::Result<()> {
848 self.load_meta_if_needed().await?;
849 let mut inner = self.msg_and_id.inner.lock();
850 match &mut inner.metadata {
851 None => anyhow::bail!("set_meta: metadata must be loaded first"),
852 Some(meta) => {
853 let key = key.as_ref();
854 let value = value.into();
855
856 match &mut meta.meta {
857 serde_json::Value::Object(map) => {
858 map.insert(key.to_string(), value);
859 }
860 _ => anyhow::bail!("metadata is somehow not a json object"),
861 }
862
863 inner.flags.set(MessageFlags::META_DIRTY, true);
864 Ok(())
865 }
866 }
867 }
868
869 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
870 self.load_meta_if_needed().await?;
871 let mut inner = self.msg_and_id.inner.lock();
872 match &mut inner.metadata {
873 None => anyhow::bail!("set_meta: metadata must be loaded first"),
874 Some(meta) => {
875 let key = key.as_ref();
876
877 match &mut meta.meta {
878 serde_json::Value::Object(map) => {
879 map.remove(key);
880 }
881 _ => anyhow::bail!("metadata is somehow not a json object"),
882 }
883
884 inner.flags.set(MessageFlags::META_DIRTY, true);
885 Ok(())
886 }
887 }
888 }
889
890 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
892 &self,
893 key: S,
894 ) -> anyhow::Result<Option<String>> {
895 match self.get_meta(key).await {
896 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
897 Ok(serde_json::Value::Null) => Ok(None),
898 hmm => {
899 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
900 }
901 }
902 }
903
904 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
905 self.load_meta_if_needed().await?;
906 let inner = self.msg_and_id.inner.lock();
907 match &inner.metadata {
908 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
909 Some(meta) => Ok(meta.meta.clone()),
910 }
911 }
912
913 pub async fn get_meta<S: serde_json::value::Index>(
914 &self,
915 key: S,
916 ) -> anyhow::Result<serde_json::Value> {
917 self.load_meta_if_needed().await?;
918 let inner = self.msg_and_id.inner.lock();
919 match &inner.metadata {
920 None => anyhow::bail!("get_meta: metadata must be loaded first"),
921 Some(meta) => match meta.meta.get(key) {
922 Some(value) => Ok(value.clone()),
923 None => Ok(serde_json::Value::Null),
924 },
925 }
926 }
927
928 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
929 self.msg_and_id.id.age(now)
930 }
931
932 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
933 Ok(match self.get_meta_string("queue").await? {
934 Some(name) => name,
935 None => {
936 let name = QueueNameComponents::format(
937 self.get_meta_string("campaign").await?,
938 self.get_meta_string("tenant").await?,
939 self.first_recipient()
940 .await?
941 .domain()
942 .to_string()
943 .to_lowercase(),
944 self.get_meta_string("routing_domain").await?,
945 );
946 name.to_string()
947 }
948 })
949 }
950
951 #[cfg(feature = "impl")]
952 pub async fn arc_verify(
953 &self,
954 opt_resolver_name: Option<String>,
955 ) -> anyhow::Result<AuthenticationResult> {
956 let resolver = get_resolver_instance(&opt_resolver_name)?;
957 let data = self.data().await?;
958 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
959
960 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
961 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
962
963 let arc = ARC::verify(&message, &**resolver).await;
964 Ok(arc.authentication_result())
965 }
966
967 #[cfg(feature = "impl")]
968 pub async fn arc_seal(
969 &self,
970 signer: Signer,
971 auth_results: AuthenticationResults,
972 opt_resolver_name: Option<String>,
973 ) -> anyhow::Result<()> {
974 let resolver = get_resolver_instance(&opt_resolver_name)?;
975 let data = self.data().await?;
976 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
977 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
978 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
979 let arc = ARC::verify(&message, &**resolver).await;
980
981 let headers = arc.seal(&message, auth_results, signer.signer())?;
982 if !headers.is_empty() {
983 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
984
985 for hdr in headers {
986 hdr.write_header(&mut new_data).ok();
987 }
988 new_data.extend_from_slice(&data);
989 self.assign_data(new_data);
990 }
991
992 Ok(())
993 }
994
995 #[cfg(feature = "impl")]
996 pub async fn dkim_verify(
997 &self,
998 opt_resolver_name: Option<String>,
999 ) -> anyhow::Result<Vec<AuthenticationResult>> {
1000 let resolver = get_resolver_instance(&opt_resolver_name)?;
1001 let data = self.data().await?;
1002 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1003
1004 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1005 if parsed
1006 .overall_conformance
1007 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1008 {
1009 return Ok(vec![AuthenticationResult {
1010 method: "dkim".into(),
1011 method_version: None,
1012 result: "permerror".into(),
1013 reason: Some("message has non-canonical line endings".into()),
1014 props: Default::default(),
1015 }]);
1016 }
1017 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1018
1019 let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1020 Ok(results)
1021 }
1022
1023 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1028 let data = self.data().await?;
1029 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1030 Ok(MimePart::parse(owned_data)?)
1031 }
1032
1033 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1034 let data = self.data().await?;
1035 Report::parse(&data)
1036 }
1037
1038 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1039 let data = self.data().await?;
1040 ARFReport::parse(&data)
1041 }
1042
1043 pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1044 let data = self.data().await?;
1045 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1046 emit_header(&mut new_data, name, value);
1047 new_data.extend_from_slice(&data);
1048 self.assign_data(new_data);
1049 Ok(())
1050 }
1051
1052 pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1053 let data = self.data().await?;
1054 let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1055 for (idx, window) in data.windows(4).enumerate() {
1056 if window == b"\r\n\r\n" {
1057 let headers = &data[0..idx + 2];
1058 let body = &data[idx + 2..];
1059
1060 new_data.extend_from_slice(headers);
1061 emit_header(&mut new_data, name, value);
1062 new_data.extend_from_slice(body);
1063 self.assign_data(new_data);
1064 return Ok(());
1065 }
1066 }
1067
1068 anyhow::bail!("append_header could not find the end of the header block");
1069 }
1070
1071 pub async fn get_address_header(
1072 &self,
1073 header_name: &str,
1074 ) -> anyhow::Result<Option<HeaderAddressList>> {
1075 let data = self.data().await?;
1076 let HeaderParseResult { headers, .. } =
1077 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1078
1079 match headers.get_first(header_name) {
1080 Some(hdr) => {
1081 let list = hdr.as_address_list()?;
1082 let result: HeaderAddressList = list.into();
1083 Ok(Some(result))
1084 }
1085 None => Ok(None),
1086 }
1087 }
1088
1089 pub async fn get_first_named_header_value(
1090 &self,
1091 name: &str,
1092 ) -> anyhow::Result<Option<BString>> {
1093 let data = self.data().await?;
1094 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1095
1096 match headers.get_first(name) {
1097 Some(hdr) => Ok(Some(
1098 hdr.as_unstructured()
1099 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1100 )),
1101 None => Ok(None),
1102 }
1103 }
1104
1105 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1106 let data = self.data().await?;
1107 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1108
1109 let mut values = vec![];
1110 for hdr in headers.iter_named(name) {
1111 values.push(
1112 hdr.as_unstructured()
1113 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1114 );
1115 }
1116 Ok(values)
1117 }
1118
1119 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1120 let data = self.data().await?;
1121 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1122
1123 let mut values = vec![];
1124 for hdr in headers.iter() {
1125 values.push((
1126 hdr.get_name().into(),
1127 hdr.as_unstructured()
1128 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1129 ));
1130 }
1131 Ok(values)
1132 }
1133
1134 pub async fn retain_headers<F: FnMut(usize, &Header) -> bool>(
1135 &self,
1136 mut func: F,
1137 ) -> anyhow::Result<()> {
1138 let data = self.data().await?;
1139 let mut new_data = Vec::with_capacity(data.len());
1140 let HeaderParseResult {
1141 headers,
1142 body_offset,
1143 ..
1144 } = Header::parse_headers(data.as_ref().as_ref())?;
1145 for (idx, hdr) in headers.iter().enumerate() {
1146 let retain = (func)(idx, hdr);
1147 if !retain {
1148 continue;
1149 }
1150 hdr.write_header(&mut new_data)?;
1151 }
1152 new_data.extend_from_slice(b"\r\n");
1153 new_data.extend_from_slice(&data[body_offset..]);
1154 self.assign_data(new_data);
1155 Ok(())
1156 }
1157
1158 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1159 let mut removed = false;
1160 self.retain_headers(|_, hdr| {
1161 if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1162 removed = true;
1163 false
1164 } else {
1165 true
1166 }
1167 })
1168 .await
1169 }
1170
1171 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1172 let specs = if names.is_empty() {
1173 vec![ImportHeaderSpec {
1174 name: "X-*".to_string(),
1175 ..ImportHeaderSpec::default()
1176 }]
1177 } else {
1178 names
1179 .into_iter()
1180 .map(|name| ImportHeaderSpec {
1181 name,
1182 ..ImportHeaderSpec::default()
1183 })
1184 .collect()
1185 };
1186 self.import_headers(specs).await
1187 }
1188
1189 pub async fn import_headers(&self, specs: Vec<ImportHeaderSpec>) -> anyhow::Result<()> {
1190 let compiled: Vec<CompiledImportHeaderSpec> = specs
1191 .into_iter()
1192 .map(CompiledImportHeaderSpec::compile)
1193 .collect::<anyhow::Result<_>>()?;
1194
1195 let data = self.data().await?;
1196 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1197
1198 let mut accumulators: Vec<PerSpecAccumulator> = compiled
1199 .iter()
1200 .map(|s| PerSpecAccumulator::new(s.match_mode))
1201 .collect();
1202 let mut indices_to_remove: HashSet<usize> = HashSet::new();
1203
1204 for (idx, hdr) in headers.iter().enumerate() {
1205 let hdr_name = hdr.get_name();
1206 for (spec_idx, spec) in compiled.iter().enumerate() {
1207 if spec.matches(hdr_name) {
1208 let key = spec.target_key(&hdr.get_name_lossy());
1209 let value = hdr.as_unstructured()?.to_str_lossy().to_string();
1210 accumulators[spec_idx].record(key, value);
1211 if spec.remove {
1212 indices_to_remove.insert(idx);
1213 }
1214 break;
1215 }
1216 }
1217 }
1218
1219 for acc in accumulators {
1220 acc.write_to(self).await?;
1221 }
1222
1223 if !indices_to_remove.is_empty() {
1224 self.retain_headers(|idx, _| !indices_to_remove.contains(&idx))
1225 .await?;
1226 }
1227
1228 Ok(())
1229 }
1230
1231 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1232 self.retain_headers(|_, hdr| {
1233 if names.is_empty() {
1234 !is_x_header(hdr.get_name())
1235 } else {
1236 !names
1237 .iter()
1238 .any(|n| hdr.get_name().eq_ignore_ascii_case(n.as_bytes()))
1239 }
1240 })
1241 .await
1242 }
1243
1244 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1245 self.retain_headers(|_, hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1246 .await
1247 }
1248
1249 #[cfg(feature = "impl")]
1250 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1251 let data = self.data().await?;
1252 let header = if let Some(runtime) = SIGN_POOL.get() {
1253 runtime.spawn_blocking(move || signer.sign(&data)).await??
1254 } else {
1255 signer.sign(&data)?
1256 };
1257 self.prepend_header(None, header.as_bytes()).await?;
1258 Ok(())
1259 }
1260
1261 pub async fn import_scheduling_header(
1262 &self,
1263 header_name: &str,
1264 remove: bool,
1265 ) -> anyhow::Result<Option<Scheduling>> {
1266 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1267 let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1268 format!("{value} from header {header_name} is not a valid Scheduling header")
1269 })?;
1270 let result = self.set_scheduling(Some(sched)).await?;
1271
1272 if remove {
1273 self.remove_all_named_headers(header_name).await?;
1274 }
1275
1276 Ok(result)
1277 } else {
1278 Ok(None)
1279 }
1280 }
1281
1282 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1283 let data = self.data().await?;
1284 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1285 let parts = msg.simplified_structure_pointers()?;
1286 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1287 match p.body()? {
1288 DecodedBody::Text(text) => {
1289 let mut text = text.as_bytes().to_vec();
1290 text.push_str("\r\n");
1291 text.push_str(content);
1292 p.replace_text_body("text/plain", &*text)?;
1293
1294 let new_data = msg.to_message_bytes();
1295 self.assign_data(new_data);
1296 Ok(true)
1297 }
1298 DecodedBody::Binary(_) => {
1299 anyhow::bail!("expected text/plain part to be text, but it is binary");
1300 }
1301 }
1302 } else {
1303 Ok(false)
1304 }
1305 }
1306
1307 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1308 let data = self.data().await?;
1309 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1310 let parts = msg.simplified_structure_pointers()?;
1311 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1312 match p.body()? {
1313 DecodedBody::Text(text) => {
1314 let mut text = text.as_bytes().to_vec();
1315
1316 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1317 Some(idx) => {
1318 text.insert_str(idx, content);
1319 text.insert_str(idx, "\r\n");
1320 }
1321 None => {
1322 text.push_str("\r\n");
1324 text.push_str(content);
1325 }
1326 }
1327
1328 p.replace_text_body("text/html", &*text)?;
1329
1330 let new_data = msg.to_message_bytes();
1331 self.assign_data(new_data);
1332 Ok(true)
1333 }
1334 DecodedBody::Binary(_) => {
1335 anyhow::bail!("expected text/html part to be text, but it is binary");
1336 }
1337 }
1338 } else {
1339 Ok(false)
1340 }
1341 }
1342
1343 pub async fn check_fix_conformance(
1344 &self,
1345 check: MessageConformance,
1346 fix: MessageConformance,
1347 settings: Option<&CheckFixSettings>,
1348 ) -> anyhow::Result<()> {
1349 let data = self.data().await?;
1350 let data_bytes = data.as_ref().as_ref();
1351 let msg = MimePart::parse(data_bytes)?;
1352
1353 let mut settings = settings.map(Clone::clone).unwrap_or_default();
1354
1355 if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1356 && settings.message_id.is_none()
1357 && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1358 {
1359 let sender = self.sender().await?;
1360 let domain = sender.domain();
1361 let id = *self.id();
1362 settings.message_id.replace(format!("{id}@{domain}"));
1363 }
1364
1365 if settings.detect_encoding {
1366 settings.data_bytes.replace(data.clone());
1367 }
1368
1369 let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1370
1371 if let Some(msg) = opt_msg {
1372 let new_data = msg.to_message_bytes();
1373 self.assign_data(new_data);
1374 }
1375
1376 Ok(())
1377 }
1378}
1379
1380fn imported_header_name(name: &str) -> String {
1381 name.chars()
1382 .map(|c| match c.to_ascii_lowercase() {
1383 '-' => '_',
1384 c => c,
1385 })
1386 .collect()
1387}
1388
1389fn is_x_header(name: &[u8]) -> bool {
1390 name.starts_with_str("X-") || name.starts_with_str("x-")
1391}
1392
1393#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1394#[serde(rename_all = "snake_case")]
1395pub enum MatchMode {
1396 First,
1397 #[default]
1398 Last,
1399 All,
1400}
1401
1402#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1403#[serde(rename_all = "snake_case")]
1404pub enum NameTransform {
1405 #[default]
1406 SnakeCase,
1407 KebabCase,
1408 CamelCase,
1409 PascalCase,
1410}
1411
1412#[derive(Debug, Clone, Deserialize)]
1413#[serde(deny_unknown_fields)]
1414pub struct ImportHeaderSpec {
1415 pub name: String,
1416 #[serde(default, rename = "match")]
1417 pub match_mode: MatchMode,
1418 #[serde(default)]
1419 pub transform: NameTransform,
1420 #[serde(default)]
1421 pub target: Option<String>,
1422 #[serde(default)]
1423 pub remove: bool,
1424}
1425
1426impl Default for ImportHeaderSpec {
1427 fn default() -> Self {
1428 Self {
1429 name: String::new(),
1430 match_mode: MatchMode::default(),
1431 transform: NameTransform::default(),
1432 target: None,
1433 remove: false,
1434 }
1435 }
1436}
1437
1438#[derive(Debug, Clone)]
1439enum HeaderPattern {
1440 Exact(String),
1442 Prefix(String),
1445}
1446
1447#[derive(Debug, Clone)]
1448struct CompiledImportHeaderSpec {
1449 pattern: HeaderPattern,
1450 match_mode: MatchMode,
1451 transform: NameTransform,
1452 target: Option<String>,
1453 remove: bool,
1454}
1455
1456impl CompiledImportHeaderSpec {
1457 fn compile(spec: ImportHeaderSpec) -> anyhow::Result<Self> {
1458 let pattern = compile_header_pattern(&spec.name)?;
1459 if spec.target.is_some() && matches!(pattern, HeaderPattern::Prefix(_)) {
1460 anyhow::bail!(
1461 "import_headers: `target` cannot be used with wildcard pattern {:?}",
1462 spec.name
1463 );
1464 }
1465 Ok(Self {
1466 pattern,
1467 match_mode: spec.match_mode,
1468 transform: spec.transform,
1469 target: spec.target,
1470 remove: spec.remove,
1471 })
1472 }
1473
1474 fn matches(&self, hdr_name: &[u8]) -> bool {
1475 match &self.pattern {
1476 HeaderPattern::Exact(s) => hdr_name.eq_ignore_ascii_case(s.as_bytes()),
1477 HeaderPattern::Prefix(p) => {
1478 hdr_name.len() >= p.len() && hdr_name[..p.len()].eq_ignore_ascii_case(p.as_bytes())
1479 }
1480 }
1481 }
1482
1483 fn target_key(&self, matched_name: &str) -> String {
1484 if let Some(target) = &self.target {
1485 return target.clone();
1486 }
1487 apply_name_transform(matched_name, self.transform)
1488 }
1489}
1490
1491fn compile_header_pattern(pat: &str) -> anyhow::Result<HeaderPattern> {
1492 if pat.is_empty() {
1493 anyhow::bail!("import_headers: header name pattern must not be empty");
1494 }
1495 let star_count = pat.chars().filter(|c| *c == '*').count();
1496 if star_count == 0 {
1497 return Ok(HeaderPattern::Exact(pat.to_string()));
1498 }
1499 let Some(prefix) = pat.strip_suffix('*') else {
1500 anyhow::bail!(
1501 "import_headers: only a single trailing `*` is supported in pattern {:?}",
1502 pat
1503 );
1504 };
1505 if star_count > 1 {
1506 anyhow::bail!(
1507 "import_headers: only a single trailing `*` is supported in pattern {:?}",
1508 pat
1509 );
1510 }
1511 if prefix.is_empty() {
1512 anyhow::bail!("import_headers: bare `*` patterns are not supported");
1513 }
1514 Ok(HeaderPattern::Prefix(prefix.to_string()))
1515}
1516
1517fn apply_name_transform(name: &str, transform: NameTransform) -> String {
1518 match transform {
1519 NameTransform::SnakeCase => imported_header_name(name),
1520 NameTransform::KebabCase => name
1521 .split('-')
1522 .map(|p| p.to_ascii_lowercase())
1523 .collect::<Vec<_>>()
1524 .join("-"),
1525 NameTransform::CamelCase => name
1526 .split('-')
1527 .enumerate()
1528 .map(|(i, p)| {
1529 if i == 0 {
1530 p.to_ascii_lowercase()
1531 } else {
1532 titlecase_ascii(p)
1533 }
1534 })
1535 .collect::<Vec<_>>()
1536 .join(""),
1537 NameTransform::PascalCase => name
1538 .split('-')
1539 .map(titlecase_ascii)
1540 .collect::<Vec<_>>()
1541 .join(""),
1542 }
1543}
1544
1545fn titlecase_ascii(s: &str) -> String {
1546 let mut chars = s.chars();
1547 match chars.next() {
1548 None => String::new(),
1549 Some(first) => {
1550 let mut out = String::with_capacity(s.len());
1551 out.push(first.to_ascii_uppercase());
1552 for c in chars {
1553 out.push(c.to_ascii_lowercase());
1554 }
1555 out
1556 }
1557 }
1558}
1559
1560enum AccValue {
1561 Str(String),
1562 Arr(Vec<String>),
1563}
1564
1565struct PerSpecAccumulator {
1566 mode: MatchMode,
1567 by_key: BTreeMap<String, AccValue>,
1568}
1569
1570impl PerSpecAccumulator {
1571 fn new(mode: MatchMode) -> Self {
1572 Self {
1573 mode,
1574 by_key: BTreeMap::new(),
1575 }
1576 }
1577
1578 fn record(&mut self, key: String, value: String) {
1579 match self.mode {
1580 MatchMode::First => {
1581 self.by_key.entry(key).or_insert(AccValue::Str(value));
1582 }
1583 MatchMode::Last => {
1584 self.by_key.insert(key, AccValue::Str(value));
1585 }
1586 MatchMode::All => {
1587 let entry = self
1588 .by_key
1589 .entry(key)
1590 .or_insert_with(|| AccValue::Arr(Vec::new()));
1591 if let AccValue::Arr(v) = entry {
1592 v.push(value);
1593 }
1594 }
1595 }
1596 }
1597
1598 async fn write_to(self, msg: &Message) -> anyhow::Result<()> {
1599 for (key, value) in self.by_key {
1600 match value {
1601 AccValue::Str(s) => msg.set_meta(key, s).await?,
1602 AccValue::Arr(arr) => {
1603 let json = serde_json::Value::Array(
1604 arr.into_iter().map(serde_json::Value::String).collect(),
1605 );
1606 msg.set_meta(key, json).await?;
1607 }
1608 }
1609 }
1610 Ok(())
1611 }
1612}
1613
1614fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1615 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1616}
1617
1618fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1619 if let Some(name) = name {
1620 dest.extend_from_slice(name.as_bytes());
1621 dest.extend_from_slice(b": ");
1622 }
1623 dest.extend_from_slice(value);
1624 if !value.ends_with_str("\r\n") {
1625 dest.extend_from_slice(b"\r\n");
1626 }
1627}
1628
1629#[cfg(feature = "impl")]
1630impl UserData for Message {
1631 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1632 methods.add_async_method(
1633 "set_meta",
1634 move |_, this, (name, value): (String, mlua::Value)| async move {
1635 let value = serde_json::value::to_value(value).map_err(any_err)?;
1636 this.set_meta(name, value).await.map_err(any_err)?;
1637 Ok(())
1638 },
1639 );
1640 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1641 let value = this.get_meta(name).await.map_err(any_err)?;
1642 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1643 });
1644 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1645 let data = this.data().await.map_err(any_err)?;
1646 lua.create_string(&*data)
1647 });
1648 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1649 this.assign_data(data.as_bytes().to_vec());
1650 Ok(())
1651 });
1652
1653 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1654 let data = this.data().await.map_err(any_err)?;
1655 let owned_data = BString::new(data.as_ref().to_vec());
1656 let part = MimePart::parse(owned_data).map_err(any_err)?;
1657 Ok(mod_mimepart::PartRef::new(part))
1658 });
1659
1660 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1661 this.append_text_plain(&data).await.map_err(any_err)
1662 });
1663
1664 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1665 this.append_text_html(&data).await.map_err(any_err)
1666 });
1667
1668 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1669 methods.add_async_method("sender", move |_, this, _: ()| async move {
1670 this.sender().await.map_err(any_err)
1671 });
1672
1673 methods.add_method("num_attempts", move |_, this, _: ()| {
1674 Ok(this.get_num_attempts())
1675 });
1676 methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1677 Ok(this.increment_num_attempts())
1678 });
1679
1680 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1681 this.get_queue_name().await.map_err(any_err)
1682 });
1683
1684 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1685 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1686 let revised_due = this.set_due(due).await.map_err(any_err)?;
1687 lua.to_value(&revised_due)
1688 });
1689
1690 methods.add_async_method(
1691 "set_sender",
1692 move |lua, this, value: mlua::Value| async move {
1693 let sender = match value {
1694 mlua::Value::String(s) => {
1695 let s = s.to_str()?;
1696 EnvelopeAddress::parse(&s).map_err(any_err)?
1697 }
1698 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1699 };
1700 this.set_sender(sender).await.map_err(any_err)
1701 },
1702 );
1703
1704 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1705 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1706 match recipients.len() {
1707 0 => Ok(mlua::Value::Nil),
1708 1 => {
1709 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1710 recip.into_lua(&lua)
1711 }
1712 _ => recipients.into_lua(&lua),
1713 }
1714 });
1715
1716 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1717 let recipients = this.recipient_list().await.map_err(any_err)?;
1718 recipients.into_lua(&lua)
1719 });
1720
1721 methods.add_async_method(
1722 "set_recipient",
1723 move |lua, this, value: mlua::Value| async move {
1724 let recipients = match value {
1725 mlua::Value::String(s) => {
1726 let s = s.to_str()?;
1727 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1728 }
1729 _ => {
1730 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1731 recips
1732 } else {
1733 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1734 }
1735 }
1736 };
1737 this.set_recipient_list(recipients).await.map_err(any_err)
1738 },
1739 );
1740
1741 #[cfg(feature = "impl")]
1742 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1743 this.dkim_sign(signer).await.map_err(any_err)
1744 });
1745
1746 methods.add_async_method("shrink", |_, this, _: ()| async move {
1747 if this.needs_save() {
1748 this.save(None).await.map_err(any_err)?;
1749 }
1750 this.shrink().map_err(any_err)
1751 });
1752
1753 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1754 if this.needs_save() {
1755 this.save(None).await.map_err(any_err)?;
1756 }
1757 this.shrink_data().map_err(any_err)
1758 });
1759
1760 methods.add_async_method(
1761 "add_authentication_results",
1762 |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1763 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1764 let results = AuthenticationResults {
1765 serv_id: serv_id.as_bytes().as_ref().into(),
1766 version: None,
1767 results,
1768 };
1769
1770 this.prepend_header(
1771 Some("Authentication-Results"),
1772 results.encode_value().as_bytes(),
1773 )
1774 .await
1775 .map_err(any_err)?;
1776
1777 Ok(())
1778 },
1779 );
1780
1781 #[cfg(feature = "impl")]
1782 methods.add_async_method(
1783 "arc_verify",
1784 |lua, this, opt_resolver_name: Option<String>| async move {
1785 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1786 lua.to_value_with(&results, serialize_options())
1787 },
1788 );
1789
1790 #[cfg(feature = "impl")]
1791 methods.add_async_method(
1792 "arc_seal",
1793 |lua,
1794 this,
1795 (signer, serv_id, auth_res, opt_resolver_name): (
1796 Signer,
1797 mlua::String,
1798 mlua::Value,
1799 Option<String>,
1800 )| async move {
1801 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1802 this.arc_seal(
1803 signer,
1804 AuthenticationResults {
1805 serv_id: serv_id.as_bytes().as_ref().into(),
1806 version: None,
1807 results,
1808 },
1809 opt_resolver_name,
1810 )
1811 .await
1812 .map_err(any_err)
1813 },
1814 );
1815
1816 #[cfg(feature = "impl")]
1817 methods.add_async_method(
1818 "dkim_verify",
1819 |lua, this, opt_resolver_name: Option<String>| async move {
1820 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1821 lua.to_value_with(&results, serialize_options())
1822 },
1823 );
1824
1825 methods.add_async_method(
1826 "prepend_header",
1827 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1828 let encode = encode.unwrap_or(false);
1829 if encode {
1830 let header = Header::new_unstructured(name.clone(), value);
1831 this.prepend_header(Some(&name), header.get_raw_value())
1832 .await
1833 .map_err(any_err)?;
1834 } else {
1835 this.prepend_header(Some(&name), value.as_bytes())
1836 .await
1837 .map_err(any_err)?;
1838 }
1839 Ok(())
1840 },
1841 );
1842 methods.add_async_method(
1843 "append_header",
1844 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1845 let encode = encode.unwrap_or(false);
1846 if encode {
1847 let header = Header::new_unstructured(name.clone(), value);
1848 this.append_header(Some(&name), header.get_raw_value())
1849 .await
1850 .map_err(any_err)?;
1851 } else {
1852 this.append_header(Some(&name), value.as_bytes())
1853 .await
1854 .map_err(any_err)?;
1855 }
1856 Ok(())
1857 },
1858 );
1859 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1860 this.get_address_header(&name).await.map_err(any_err)
1861 });
1862 methods.add_async_method("from_header", |_, this, ()| async move {
1863 this.get_address_header("From").await.map_err(any_err)
1864 });
1865 methods.add_async_method("to_header", |_, this, ()| async move {
1866 this.get_address_header("To").await.map_err(any_err)
1867 });
1868
1869 methods.add_async_method(
1870 "get_first_named_header_value",
1871 |_, this, name: String| async move {
1872 this.get_first_named_header_value(&name)
1873 .await
1874 .map_err(any_err)
1875 },
1876 );
1877 methods.add_async_method(
1878 "get_all_named_header_values",
1879 |_, this, name: String| async move {
1880 this.get_all_named_header_values(&name)
1881 .await
1882 .map_err(any_err)
1883 },
1884 );
1885 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1886 Ok(this
1887 .get_all_headers()
1888 .await
1889 .map_err(any_err)?
1890 .into_iter()
1891 .map(|(name, value)| vec![name, value])
1892 .collect::<Vec<Vec<BString>>>())
1893 });
1894 methods.add_async_method(
1895 "import_x_headers",
1896 |_, this, names: Option<Vec<String>>| async move {
1897 this.import_x_headers(names.unwrap_or_default())
1898 .await
1899 .map_err(any_err)
1900 },
1901 );
1902 methods.add_async_method(
1903 "import_headers",
1904 |_, this, specs: SerdeWrappedValue<Vec<ImportHeaderSpec>>| async move {
1905 this.import_headers(specs.0).await.map_err(any_err)
1906 },
1907 );
1908
1909 methods.add_async_method(
1910 "remove_x_headers",
1911 |_, this, names: Option<Vec<String>>| async move {
1912 this.remove_x_headers(names.unwrap_or_default())
1913 .await
1914 .map_err(any_err)
1915 },
1916 );
1917 methods.add_async_method(
1918 "remove_all_named_headers",
1919 |_, this, name: String| async move {
1920 this.remove_all_named_headers(&name).await.map_err(any_err)
1921 },
1922 );
1923
1924 methods.add_async_method(
1925 "import_scheduling_header",
1926 |lua, this, (header_name, remove): (String, bool)| async move {
1927 let opt_schedule = this
1928 .import_scheduling_header(&header_name, remove)
1929 .await
1930 .map_err(any_err)?;
1931 lua.to_value(&opt_schedule)
1932 },
1933 );
1934
1935 methods.add_async_method(
1936 "set_scheduling",
1937 move |lua, this, params: mlua::Value| async move {
1938 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1939 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1940 lua.to_value(&opt_schedule)
1941 },
1942 );
1943
1944 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1945 let report = this.parse_rfc3464().await.map_err(any_err)?;
1946 match report {
1947 Some(report) => lua.to_value_with(&report, serialize_options()),
1948 None => Ok(mlua::Value::Nil),
1949 }
1950 });
1951
1952 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1953 let report = this.parse_rfc5965().await.map_err(any_err)?;
1954 match report {
1955 Some(report) => lua.to_value_with(&report, serialize_options()),
1956 None => Ok(mlua::Value::Nil),
1957 }
1958 });
1959
1960 methods.add_async_method("save", |_, this, ()| async move {
1961 this.save(None).await.map_err(any_err)
1962 });
1963
1964 methods.add_method("set_force_sync", move |_, this, force: bool| {
1965 this.set_force_sync(force);
1966 Ok(())
1967 });
1968
1969 methods.add_async_method(
1970 "check_fix_conformance",
1971 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1972 use std::str::FromStr;
1973 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1974 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1975
1976 let settings = match settings {
1977 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1978 None => None,
1979 };
1980
1981 match this
1982 .check_fix_conformance(check, fix, settings.as_ref())
1983 .await
1984 {
1985 Ok(_) => Ok(None),
1986 Err(err) => Ok(Some(format!("{err:#}"))),
1987 }
1988 },
1989 );
1990 }
1991}
1992
1993impl TimerEntryWithDelay for WeakMessage {
1994 fn delay(&self) -> Duration {
1995 match self.upgrade() {
1996 None => {
1997 Duration::from_millis(0)
1999 }
2000 Some(msg) => msg.delay(),
2001 }
2002 }
2003}
2004
2005impl TimerEntryWithDelay for Message {
2006 fn delay(&self) -> Duration {
2007 let inner = self.msg_and_id.inner.lock();
2008 match inner.due {
2009 Some(time) => {
2010 let now = Utc::now();
2011 let delta = time - now;
2012 delta.to_std().unwrap_or(Duration::from_millis(0))
2013 }
2014 None => Duration::from_millis(0),
2015 }
2016 }
2017}
2018
2019#[cfg(test)]
2020pub(crate) mod test {
2021 use super::*;
2022 use serde_json::json;
2023
2024 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
2025 Message::new_dirty(
2026 SpoolId::new(),
2027 EnvelopeAddress::parse("sender@example.com").unwrap(),
2028 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
2029 serde_json::json!({}),
2030 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
2031 )
2032 .unwrap()
2033 }
2034
2035 fn data_as_string(msg: &Message) -> String {
2036 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
2037 }
2038
2039 const X_HDR_CONTENT: &str =
2040 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
2041
2042 #[tokio::test]
2043 async fn import_all_x_headers() {
2044 let msg = new_msg_body(X_HDR_CONTENT);
2045
2046 msg.import_x_headers(vec![]).await.unwrap();
2047 k9::assert_equal!(
2048 msg.get_meta_obj().await.unwrap(),
2049 json!({
2050 "x_hello": "there",
2051 "x_header": "value",
2052 })
2053 );
2054 }
2055
2056 #[tokio::test]
2057 async fn meta_and_nil() {
2058 let msg = new_msg_body(X_HDR_CONTENT);
2059 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
2061 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
2062
2063 let lua = mlua::Lua::new();
2065 lua.globals().set("msg", msg).unwrap();
2066 lua.load("assert(msg:get_meta('test') == nil)")
2067 .exec()
2068 .unwrap();
2069 }
2070
2071 #[tokio::test]
2072 async fn import_some_x_headers() {
2073 let msg = new_msg_body(X_HDR_CONTENT);
2074
2075 msg.import_x_headers(vec!["x-hello".to_string()])
2076 .await
2077 .unwrap();
2078 k9::assert_equal!(
2079 msg.get_meta_obj().await.unwrap(),
2080 json!({
2081 "x_hello": "there",
2082 })
2083 );
2084 }
2085
2086 #[tokio::test]
2087 async fn import_headers_wildcard_remove() {
2088 let msg = new_msg_body(X_HDR_CONTENT);
2089
2090 msg.import_headers(vec![ImportHeaderSpec {
2091 name: "X-*".to_string(),
2092 remove: true,
2093 ..ImportHeaderSpec::default()
2094 }])
2095 .await
2096 .unwrap();
2097 k9::assert_equal!(
2098 msg.get_meta_obj().await.unwrap(),
2099 json!({
2100 "x_hello": "there",
2101 "x_header": "value",
2102 })
2103 );
2104 k9::assert_equal!(
2105 data_as_string(&msg),
2106 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2107 );
2108 }
2109
2110 #[tokio::test]
2111 async fn import_headers_match_modes() {
2112 let body =
2113 "Received: from a\r\nReceived: from b\r\nReceived: from c\r\nSubject: hi\r\n\r\nBody";
2114 for (mode, expected) in [
2115 (MatchMode::First, json!("from a")),
2116 (MatchMode::Last, json!("from c")),
2117 (MatchMode::All, json!(["from a", "from b", "from c"])),
2118 ] {
2119 let msg = new_msg_body(body);
2120 msg.import_headers(vec![ImportHeaderSpec {
2121 name: "Received".to_string(),
2122 match_mode: mode,
2123 ..ImportHeaderSpec::default()
2124 }])
2125 .await
2126 .unwrap();
2127 k9::assert_equal!(
2128 msg.get_meta_obj().await.unwrap(),
2129 json!({ "received": expected })
2130 );
2131 }
2132 }
2133
2134 #[tokio::test]
2135 async fn import_headers_no_match_skips_meta() {
2136 let msg = new_msg_body(X_HDR_CONTENT);
2137 msg.import_headers(vec![ImportHeaderSpec {
2138 name: "Nonexistent".to_string(),
2139 match_mode: MatchMode::All,
2140 ..ImportHeaderSpec::default()
2141 }])
2142 .await
2143 .unwrap();
2144 k9::assert_equal!(msg.get_meta_obj().await.unwrap(), json!({}));
2145 }
2146
2147 #[tokio::test]
2148 async fn import_headers_specific_before_wildcard() {
2149 let body = "X-Campaign-Id: 42\r\nX-Mailer: foo\r\n\r\nBody";
2150 let msg = new_msg_body(body);
2151 msg.import_headers(vec![
2152 ImportHeaderSpec {
2153 name: "X-Campaign-Id".to_string(),
2154 target: Some("campaign".to_string()),
2155 ..ImportHeaderSpec::default()
2156 },
2157 ImportHeaderSpec {
2158 name: "X-*".to_string(),
2159 ..ImportHeaderSpec::default()
2160 },
2161 ])
2162 .await
2163 .unwrap();
2164 k9::assert_equal!(
2165 msg.get_meta_obj().await.unwrap(),
2166 json!({
2167 "campaign": "42",
2168 "x_mailer": "foo",
2169 })
2170 );
2171 }
2172
2173 #[test]
2174 fn name_transforms() {
2175 let n = "X-Campaign-Id";
2176 k9::assert_equal!(
2177 apply_name_transform(n, NameTransform::SnakeCase),
2178 "x_campaign_id"
2179 );
2180 k9::assert_equal!(
2181 apply_name_transform(n, NameTransform::KebabCase),
2182 "x-campaign-id"
2183 );
2184 k9::assert_equal!(
2185 apply_name_transform(n, NameTransform::CamelCase),
2186 "xCampaignId"
2187 );
2188 k9::assert_equal!(
2189 apply_name_transform(n, NameTransform::PascalCase),
2190 "XCampaignId"
2191 );
2192 }
2193
2194 #[test]
2195 fn pattern_compile_rejects_bad_patterns() {
2196 compile_header_pattern("").unwrap_err();
2197 compile_header_pattern("*").unwrap_err();
2198 compile_header_pattern("*-Id").unwrap_err();
2199 compile_header_pattern("X-*-Id").unwrap_err();
2200 compile_header_pattern("X-**").unwrap_err();
2201 assert!(matches!(
2202 compile_header_pattern("Subject").unwrap(),
2203 HeaderPattern::Exact(_)
2204 ));
2205 assert!(matches!(
2206 compile_header_pattern("X-*").unwrap(),
2207 HeaderPattern::Prefix(_)
2208 ));
2209 }
2210
2211 #[tokio::test]
2212 async fn import_headers_target_with_wildcard_rejected() {
2213 let msg = new_msg_body(X_HDR_CONTENT);
2214 msg.import_headers(vec![ImportHeaderSpec {
2215 name: "X-*".to_string(),
2216 target: Some("oops".to_string()),
2217 ..ImportHeaderSpec::default()
2218 }])
2219 .await
2220 .unwrap_err();
2221 }
2222
2223 #[tokio::test]
2224 async fn remove_all_x_headers() {
2225 let msg = new_msg_body(X_HDR_CONTENT);
2226
2227 msg.remove_x_headers(vec![]).await.unwrap();
2228 k9::assert_equal!(
2229 data_as_string(&msg),
2230 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2231 );
2232 }
2233
2234 #[tokio::test]
2235 async fn prepend_header_2_params() {
2236 let msg = new_msg_body(X_HDR_CONTENT);
2237
2238 msg.prepend_header(Some("Date"), b"Today").await.unwrap();
2239 k9::assert_equal!(
2240 data_as_string(&msg),
2241 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2242 );
2243 }
2244
2245 #[tokio::test]
2246 async fn prepend_header_1_params() {
2247 let msg = new_msg_body(X_HDR_CONTENT);
2248
2249 msg.prepend_header(None, b"Date: Today").await.unwrap();
2250 k9::assert_equal!(
2251 data_as_string(&msg),
2252 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2253 );
2254 }
2255
2256 #[tokio::test]
2257 async fn append_header_2_params() {
2258 let msg = new_msg_body(X_HDR_CONTENT);
2259
2260 msg.append_header(Some("Date"), b"Today").await.unwrap();
2261 k9::assert_equal!(
2262 data_as_string(&msg),
2263 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2264 );
2265 }
2266
2267 #[tokio::test]
2268 async fn append_header_1_params() {
2269 let msg = new_msg_body(X_HDR_CONTENT);
2270
2271 msg.append_header(None, b"Date: Today").await.unwrap();
2272 k9::assert_equal!(
2273 data_as_string(&msg),
2274 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2275 );
2276 }
2277
2278 const MULTI_HEADER_CONTENT: &str =
2279 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
2280
2281 #[tokio::test]
2282 async fn get_first_header() {
2283 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2284 k9::assert_equal!(
2285 msg.get_first_named_header_value("X-header")
2286 .await
2287 .unwrap()
2288 .unwrap(),
2289 "value"
2290 );
2291 }
2292
2293 #[tokio::test]
2294 async fn get_all_header() {
2295 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2296 k9::assert_equal!(
2297 msg.get_all_named_header_values("X-header").await.unwrap(),
2298 vec!["value".to_string(), "another value".to_string()]
2299 );
2300 }
2301
2302 #[tokio::test]
2303 async fn remove_first() {
2304 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2305 msg.remove_first_named_header("X-header").await.unwrap();
2306 k9::assert_equal!(
2307 data_as_string(&msg),
2308 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
2309 );
2310 }
2311
2312 #[tokio::test]
2313 async fn remove_all() {
2314 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2315 msg.remove_all_named_headers("X-header").await.unwrap();
2316 k9::assert_equal!(
2317 data_as_string(&msg),
2318 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
2319 );
2320 }
2321
2322 #[tokio::test]
2323 async fn append_text_plain() {
2324 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2325 msg.append_text_plain("I am at the bottom").await.unwrap();
2326 k9::assert_equal!(
2327 data_as_string(&msg),
2328 "X-Hello: there\r\n\
2329 X-Header: value\r\n\
2330 Subject: Hello\r\n\
2331 X-Header: another value\r\n\
2332 From :Someone@somewhere\r\n\
2333 Content-Type: text/plain;\r\n\
2334 \tcharset=\"us-ascii\"\r\n\
2335 \r\n\
2336 Body\r\n\
2337 I am at the bottom\r\n"
2338 );
2339 }
2340
2341 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
2342\tboundary=\"my-boundary\"\r\n\
2343\r\n\
2344--my-boundary\r\n\
2345Content-Type: text/plain;\r\n\
2346\tcharset=\"us-ascii\"\r\n\
2347\r\n\
2348plain text\r\n\
2349--my-boundary\r\n\
2350Content-Type: text/html;\r\n\
2351\tcharset=\"us-ascii\"\r\n\
2352\r\n\
2353<b>rich</b> text\r\n\
2354--my-boundary\r\n\
2355Content-Type: application/octet-stream\r\n\
2356Content-Transfer-Encoding: base64\r\n\
2357Content-Disposition: attachment;\r\n\
2358\tfilename=\"woot.bin\"\r\n\
2359Content-ID: <woot.id@somewhere>\r\n\
2360\r\n\
2361AAECAw==\r\n\
2362--my-boundary--\r\n\
2363\r\n";
2364
2365 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2366\tboundary=\"my-boundary\"\r\n\
2367\r\n\
2368--my-boundary\r\n\
2369Content-Type: text/plain;\r\n\
2370\tcharset=\"us-ascii\"\r\n\
2371\r\n\
2372plain text\r\n\
2373--my-boundary\r\n\
2374Content-Type: text/html;\r\n\
2375\tcharset=\"us-ascii\"\r\n\
2376\r\n\
2377<BODY>\r\n\
2378<b>rich</b> text\r\n\
2379</BODY>\r\n\
2380--my-boundary\r\n\
2381Content-Type: application/octet-stream\r\n\
2382Content-Transfer-Encoding: base64\r\n\
2383Content-Disposition: attachment;\r\n\
2384\tfilename=\"woot.bin\"\r\n\
2385Content-ID: <woot.id>\r\n\
2386\r\n\
2387AAECAw==\r\n\
2388--my-boundary--\r\n\
2389\r\n";
2390
2391 #[tokio::test]
2392 async fn append_text_html() {
2393 let msg = new_msg_body(MIXED_CONTENT);
2394 msg.append_text_html("bottom html").await.unwrap();
2395 k9::snapshot!(
2396 data_as_string(&msg),
2397 r#"
2398Content-Type: multipart/mixed;\r
2399\tboundary="my-boundary"\r
2400\r
2401--my-boundary\r
2402Content-Type: text/plain;\r
2403\tcharset="us-ascii"\r
2404\r
2405plain text\r
2406--my-boundary\r
2407Content-Type: text/html;\r
2408\tcharset="us-ascii"\r
2409\r
2410<b>rich</b> text\r
2411\r
2412bottom html\r
2413--my-boundary\r
2414Content-Type: application/octet-stream\r
2415Content-Transfer-Encoding: base64\r
2416Content-Disposition: attachment;\r
2417\tfilename="woot.bin"\r
2418Content-ID: <woot.id@somewhere>\r
2419\r
2420AAECAw==\r
2421--my-boundary--\r
2422\r
2423
2424"#
2425 );
2426
2427 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2428 msg.append_text_html("bottom html 👻").await.unwrap();
2429 k9::snapshot!(
2430 data_as_string(&msg),
2431 r#"
2432Content-Type: multipart/mixed;\r
2433\tboundary="my-boundary"\r
2434\r
2435--my-boundary\r
2436Content-Type: text/plain;\r
2437\tcharset="us-ascii"\r
2438\r
2439plain text\r
2440--my-boundary\r
2441Content-Type: text/html;\r
2442\tcharset="utf-8"\r
2443Content-Transfer-Encoding: quoted-printable\r
2444\r
2445<BODY>\r
2446<b>rich</b> text\r
2447\r
2448bottom html =F0=9F=91=BB</BODY>\r
2449--my-boundary\r
2450Content-Type: application/octet-stream\r
2451Content-Transfer-Encoding: base64\r
2452Content-Disposition: attachment;\r
2453\tfilename="woot.bin"\r
2454Content-ID: <woot.id>\r
2455\r
2456AAECAw==\r
2457--my-boundary--\r
2458\r
2459
2460"#
2461 );
2462 }
2463
2464 #[tokio::test]
2465 async fn append_text_plain_mixed() {
2466 let msg = new_msg_body(MIXED_CONTENT);
2467 msg.append_text_plain("bottom text 👾").await.unwrap();
2468 k9::snapshot!(
2469 data_as_string(&msg),
2470 r#"
2471Content-Type: multipart/mixed;\r
2472\tboundary="my-boundary"\r
2473\r
2474--my-boundary\r
2475Content-Type: text/plain;\r
2476\tcharset="utf-8"\r
2477Content-Transfer-Encoding: quoted-printable\r
2478\r
2479plain text\r
2480\r
2481bottom text =F0=9F=91=BE\r
2482--my-boundary\r
2483Content-Type: text/html;\r
2484\tcharset="us-ascii"\r
2485\r
2486<b>rich</b> text\r
2487--my-boundary\r
2488Content-Type: application/octet-stream\r
2489Content-Transfer-Encoding: base64\r
2490Content-Disposition: attachment;\r
2491\tfilename="woot.bin"\r
2492Content-ID: <woot.id@somewhere>\r
2493\r
2494AAECAw==\r
2495--my-boundary--\r
2496\r
2497
2498"#
2499 );
2500 }
2501
2502 #[tokio::test]
2503 async fn check_conformance_angle_msg_id() {
2504 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2505Message-ID: <<1234@example.com>>\r
2506\r
2507Hello";
2508 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2509 k9::snapshot!(
2510 msg.check_fix_conformance(
2511 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2512 MessageConformance::empty(),
2513 None,
2514 )
2515 .await
2516 .unwrap_err()
2517 .to_string(),
2518 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2519 );
2520
2521 msg.check_fix_conformance(
2522 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2523 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2524 None,
2525 )
2526 .await
2527 .unwrap();
2528
2529 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2544Message-ID: <<1234@example.com>>\r
2545\r
2546Hello this is a really long line Hello this is a really long line \
2547Hello this is a really long line Hello this is a really long line \
2548Hello this is a really long line Hello this is a really long line \
2549Hello this is a really long line Hello this is a really long line \
2550Hello this is a really long line Hello this is a really long line \
2551Hello this is a really long line Hello this is a really long line \
2552Hello this is a really long line Hello this is a really long line
2553";
2554 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2555 msg.check_fix_conformance(
2556 MessageConformance::MISSING_COLON_VALUE,
2557 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2558 None,
2559 )
2560 .await
2561 .unwrap();
2562
2563 }
2587
2588 #[tokio::test]
2589 async fn check_conformance() {
2590 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2591 msg.check_fix_conformance(
2592 MessageConformance::default(),
2593 MessageConformance::MISSING_MIME_VERSION,
2594 None,
2595 )
2596 .await
2597 .unwrap();
2598 k9::snapshot!(
2599 data_as_string(&msg),
2600 r#"
2601X-Hello: there\r
2602X-Header: value\r
2603Subject: Hello\r
2604X-Header: another value\r
2605From :Someone@somewhere\r
2606Mime-Version: 1.0\r
2607\r
2608Body
2609"#
2610 );
2611
2612 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2613 msg.check_fix_conformance(
2614 MessageConformance::default(),
2615 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2616 None,
2617 )
2618 .await
2619 .unwrap();
2620 k9::snapshot!(
2621 data_as_string(&msg),
2622 r#"
2623Content-Type: text/plain;\r
2624\tcharset="us-ascii"\r
2625X-Hello: there\r
2626X-Header: value\r
2627Subject: Hello\r
2628X-Header: another value\r
2629From: <Someone@somewhere>\r
2630Mime-Version: 1.0\r
2631\r
2632Body\r
2633
2634"#
2635 );
2636 }
2637
2638 #[tokio::test]
2639 async fn check_fix_latin_input() {
2640 const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2641 let msg = new_msg_body(&*POUNDS);
2642 msg.check_fix_conformance(
2643 MessageConformance::default(),
2644 MessageConformance::NEEDS_TRANSFER_ENCODING,
2645 Some(&CheckFixSettings {
2646 detect_encoding: true,
2647 include_encodings: vec!["iso-8859-1".to_string()],
2648 ..Default::default()
2649 }),
2650 )
2651 .await
2652 .unwrap();
2653
2654 let subject = msg
2655 .get_first_named_header_value("subject")
2656 .await
2657 .unwrap()
2658 .unwrap();
2659 assert_eq!(subject, "£");
2660 }
2661
2662 #[tokio::test]
2663 async fn set_scheduling() -> anyhow::Result<()> {
2664 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2665 assert!(msg.get_due().is_none(), "due is implicitly now");
2666
2667 let now = Utc::now();
2668 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2669
2670 msg.set_scheduling(Some(Scheduling {
2671 restriction: None,
2672 first_attempt: Some((now + one_day).into()),
2673 expires: None,
2674 }))
2675 .await?;
2676
2677 let due = msg.get_due().expect("due to now be set");
2678 assert!(due - now >= one_day, "due time is at least 1 day away");
2679
2680 Ok(())
2681 }
2682
2683 #[cfg(all(test, target_pointer_width = "64"))]
2684 #[test]
2685 fn sizes() {
2686 assert_eq!(std::mem::size_of::<Message>(), 8);
2687 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2688 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2689 }
2690}