1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options, SerdeWrappedValue};
13use futures::{FutureExt, TryFutureExt};
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24 CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::collections::{BTreeMap, HashSet};
37use std::hash::Hash;
38use std::sync::{Arc, LazyLock, Weak};
39use std::time::{Duration, Instant};
40use timeq::TimerEntryWithDelay;
41
42bitflags::bitflags! {
43 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
44 struct MessageFlags: u8 {
45 const META_DIRTY = 1;
47 const DATA_DIRTY = 2;
49 const SCHEDULED = 4;
51 const FORCE_SYNC = 8;
53 }
54}
55
56declare_metric! {
57static MESSAGE_COUNT: IntGauge("message_count");
64}
65
66declare_metric! {
67static META_COUNT: IntGauge("message_meta_resident_count");
74}
75
76declare_metric! {
77static DATA_COUNT: IntGauge("message_data_resident_count");
87}
88
89static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
90
91declare_metric! {
92static SAVE_HIST: Histogram("message_save_latency");
102}
103
104declare_metric! {
105static LOAD_DATA_HIST: Histogram("message_data_load_latency");
116}
117
118declare_metric! {
119static LOAD_META_HIST: Histogram("message_meta_load_latency");
125}
126
127#[derive(Debug)]
128struct MessageInner {
129 metadata: Option<Box<MetaData>>,
130 data: Arc<Box<[u8]>>,
131 flags: MessageFlags,
132 num_attempts: u16,
133 due: Option<DateTime<Utc>>,
134}
135
136#[derive(Debug)]
137pub(crate) struct MessageWithId {
138 id: SpoolId,
139 inner: Mutex<MessageInner>,
140 link: LinkedListAtomicLink,
141}
142
143intrusive_adapter!(
144 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
145);
146
147pub struct MessageList {
152 list: LinkedList<MessageWithIdAdapter>,
153 len: usize,
154}
155
156impl Default for MessageList {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162impl MessageList {
163 pub fn new() -> Self {
165 Self {
166 list: LinkedList::new(MessageWithIdAdapter::default()),
167 len: 0,
168 }
169 }
170
171 pub fn len(&self) -> usize {
173 self.len
174 }
175
176 pub fn is_empty(&self) -> bool {
177 self.len == 0
178 }
179
180 pub fn take(&mut self) -> Self {
183 let new_list = Self {
184 list: self.list.take(),
185 len: self.len,
186 };
187 self.len = 0;
188 new_list
189 }
190
191 pub fn push_back(&mut self, message: Message) {
193 self.list.push_back(message.msg_and_id);
194 self.len += 1;
195 }
196
197 pub fn pop_front(&mut self) -> Option<Message> {
199 self.list.pop_front().map(|msg_and_id| {
200 self.len -= 1;
201 Message { msg_and_id }
202 })
203 }
204
205 pub fn pop_back(&mut self) -> Option<Message> {
207 self.list.pop_back().map(|msg_and_id| {
208 self.len -= 1;
209 Message { msg_and_id }
210 })
211 }
212
213 pub fn drain(&mut self) -> Vec<Message> {
217 let mut messages = Vec::with_capacity(self.len);
218 while let Some(msg) = self.pop_front() {
219 messages.push(msg);
220 }
221 messages
222 }
223
224 pub fn extend_from_iter<I>(&mut self, iter: I)
225 where
226 I: Iterator<Item = Message>,
227 {
228 for msg in iter {
229 self.push_back(msg)
230 }
231 }
232}
233
234impl IntoIterator for MessageList {
235 type Item = Message;
236 type IntoIter = MessageListIter;
237 fn into_iter(self) -> MessageListIter {
238 MessageListIter { list: self.list }
239 }
240}
241
242pub struct MessageListIter {
243 list: LinkedList<MessageWithIdAdapter>,
244}
245
246impl Iterator for MessageListIter {
247 type Item = Message;
248 fn next(&mut self) -> Option<Message> {
249 self.list
250 .pop_front()
251 .map(|msg_and_id| Message { msg_and_id })
252 }
253}
254
255#[derive(Clone, Debug)]
256#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
257pub struct Message {
258 pub(crate) msg_and_id: Arc<MessageWithId>,
259}
260
261impl PartialEq for Message {
262 fn eq(&self, other: &Self) -> bool {
263 self.id() == other.id()
264 }
265}
266impl Eq for Message {}
267
268impl Hash for Message {
269 fn hash<H>(&self, hasher: &mut H)
270 where
271 H: std::hash::Hasher,
272 {
273 self.id().hash(hasher)
274 }
275}
276
277#[derive(Clone, Debug)]
278pub struct WeakMessage {
279 weak: Weak<MessageWithId>,
280}
281
282impl WeakMessage {
283 pub fn upgrade(&self) -> Option<Message> {
284 Some(Message {
285 msg_and_id: self.weak.upgrade()?,
286 })
287 }
288}
289
290#[serde_as]
291#[derive(Debug, Serialize, Deserialize, Clone)]
292pub(crate) struct MetaData {
293 pub sender: EnvelopeAddress,
294 #[serde_as(as = "OneOrMany<_, PreferOne>")]
295 pub recipient: Vec<EnvelopeAddress>,
296 pub meta: serde_json::Value,
297 #[serde(default)]
298 pub schedule: Option<Scheduling>,
299}
300
301impl Drop for MessageInner {
302 fn drop(&mut self) {
303 if self.metadata.is_some() {
304 META_COUNT.dec();
305 }
306 if !self.data.is_empty() {
307 DATA_COUNT.dec();
308 }
309 MESSAGE_COUNT.dec();
310 }
311}
312
313impl Message {
314 pub fn new_dirty(
317 id: SpoolId,
318 sender: EnvelopeAddress,
319 recipient: Vec<EnvelopeAddress>,
320 meta: serde_json::Value,
321 data: Arc<Box<[u8]>>,
322 ) -> anyhow::Result<Self> {
323 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
324 MESSAGE_COUNT.inc();
325 DATA_COUNT.inc();
326 META_COUNT.inc();
327 Ok(Self {
328 msg_and_id: Arc::new(MessageWithId {
329 id,
330 inner: Mutex::new(MessageInner {
331 metadata: Some(Box::new(MetaData {
332 sender,
333 recipient,
334 meta,
335 schedule: None,
336 })),
337 data,
338 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
339 num_attempts: 0,
340 due: None,
341 }),
342 link: LinkedListAtomicLink::default(),
343 }),
344 })
345 }
346
347 pub fn weak(&self) -> WeakMessage {
348 WeakMessage {
349 weak: Arc::downgrade(&self.msg_and_id),
350 }
351 }
352
353 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
357 let metadata: MetaData = serde_json::from_slice(&metadata)?;
358 MESSAGE_COUNT.inc();
359 META_COUNT.inc();
360
361 let flags = if metadata.schedule.is_some() {
362 MessageFlags::SCHEDULED
363 } else {
364 MessageFlags::empty()
365 };
366
367 Ok(Self {
368 msg_and_id: Arc::new(MessageWithId {
369 id,
370 inner: Mutex::new(MessageInner {
371 metadata: Some(Box::new(metadata)),
372 data: NO_DATA.clone(),
373 flags,
374 num_attempts: 0,
375 due: None,
376 }),
377 link: LinkedListAtomicLink::default(),
378 }),
379 })
380 }
381
382 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
383 MESSAGE_COUNT.inc();
384 META_COUNT.inc();
385
386 let flags = if metadata.schedule.is_some() {
387 MessageFlags::SCHEDULED
388 } else {
389 MessageFlags::empty()
390 };
391
392 Self {
393 msg_and_id: Arc::new(MessageWithId {
394 id,
395 inner: Mutex::new(MessageInner {
396 metadata: Some(Box::new(metadata)),
397 data,
398 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
399 num_attempts: 0,
400 due: None,
401 }),
402 link: LinkedListAtomicLink::default(),
403 }),
404 }
405 }
406
407 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
408 let meta_spool = get_meta_spool();
409 let data = meta_spool.load(id).await?;
410 Self::new_from_spool(id, data)
411 }
412
413 pub fn get_num_attempts(&self) -> u16 {
414 let inner = self.msg_and_id.inner.lock();
415 inner.num_attempts
416 }
417
418 pub fn set_num_attempts(&self, num_attempts: u16) {
419 let mut inner = self.msg_and_id.inner.lock();
420 inner.num_attempts = num_attempts;
421 }
422
423 pub fn increment_num_attempts(&self) {
424 let mut inner = self.msg_and_id.inner.lock();
425 inner.num_attempts += 1;
426 }
427
428 pub async fn set_scheduling(
429 &self,
430 scheduling: Option<Scheduling>,
431 ) -> anyhow::Result<Option<Scheduling>> {
432 self.load_meta_if_needed().await?;
433 let mut inner = self.msg_and_id.inner.lock();
434 match &mut inner.metadata {
435 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
436 Some(meta) => {
437 meta.schedule = scheduling;
438 inner
439 .flags
440 .set(MessageFlags::SCHEDULED, scheduling.is_some());
441 if let Some(sched) = scheduling {
442 let due = inner.due.unwrap_or_else(Utc::now);
443 inner.due = Some(sched.adjust_for_schedule(due));
444 }
445 Ok(scheduling)
446 }
447 }
448 }
449
450 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
451 self.load_meta_if_needed().await?;
452 let inner = self.msg_and_id.inner.lock();
453 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
454 }
455
456 pub fn get_due(&self) -> Option<DateTime<Utc>> {
457 let inner = self.msg_and_id.inner.lock();
458 inner.due
459 }
460
461 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
462 let scale = rand::random::<f32>();
463 let value = (scale * limit as f32) as i64;
464 self.delay_by(seconds(value)?).await
465 }
466
467 pub async fn delay_by(
468 &self,
469 duration: chrono::Duration,
470 ) -> anyhow::Result<Option<DateTime<Utc>>> {
471 let due = Utc::now() + duration;
472 self.set_due(Some(due)).await
473 }
474
475 pub async fn delay_by_and_jitter(
477 &self,
478 duration: chrono::Duration,
479 ) -> anyhow::Result<Option<DateTime<Utc>>> {
480 let scale = rand::random::<f32>();
481 let value = (scale * 60.) as i64;
482 let due = Utc::now() + duration + seconds(value)?;
483 self.set_due(Some(due)).await
484 }
485
486 pub async fn set_due(
487 &self,
488 due: Option<DateTime<Utc>>,
489 ) -> anyhow::Result<Option<DateTime<Utc>>> {
490 let due = {
491 let mut inner = self.msg_and_id.inner.lock();
492
493 if !inner.flags.contains(MessageFlags::SCHEDULED) {
494 inner.due = due;
496 return Ok(inner.due);
497 }
498
499 let due = due.unwrap_or_else(Utc::now);
500
501 if let Some(meta) = &inner.metadata {
502 inner.due = match &meta.schedule {
503 Some(sched) => Some(sched.adjust_for_schedule(due)),
504 None => Some(due),
505 };
506 return Ok(inner.due);
507 }
508
509 due
512 };
513
514 self.load_meta().await?;
515
516 {
517 let mut inner = self.msg_and_id.inner.lock();
518 match &inner.metadata {
519 Some(meta) => {
520 inner.due = match &meta.schedule {
521 Some(sched) => Some(sched.adjust_for_schedule(due)),
522 None => Some(due),
523 };
524 Ok(inner.due)
525 }
526 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
527 }
528 }
529 }
530
531 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
532 let inner = self.msg_and_id.inner.lock();
533 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
534 Some(Arc::clone(&inner.data))
535 } else {
536 None
537 }
538 }
539
540 fn get_meta_if_dirty(&self) -> Option<MetaData> {
541 let inner = self.msg_and_id.inner.lock();
542 if inner.flags.contains(MessageFlags::META_DIRTY) {
543 inner.metadata.as_ref().map(|md| (**md).clone())
544 } else {
545 None
546 }
547 }
548
549 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
550 self.load_meta_if_needed().await?;
551 let inner = self.msg_and_id.inner.lock();
552 inner
553 .metadata
554 .as_ref()
555 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
556 .map(|md| (**md).clone())
557 }
558
559 pub fn set_force_sync(&self, force: bool) {
560 let mut inner = self.msg_and_id.inner.lock();
561 inner.flags.set(MessageFlags::FORCE_SYNC, force);
562 }
563
564 pub fn needs_save(&self) -> bool {
565 let inner = self.msg_and_id.inner.lock();
566 inner.flags.contains(MessageFlags::META_DIRTY)
567 || inner.flags.contains(MessageFlags::DATA_DIRTY)
568 }
569
570 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
571 let _timer = SAVE_HIST.start_timer();
572 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
573 .await
574 }
575
576 pub async fn save_to(
577 &self,
578 meta_spool: &(dyn Spool + Send + Sync),
579 data_spool: &(dyn Spool + Send + Sync),
580 deadline: Option<Instant>,
581 ) -> anyhow::Result<()> {
582 let force_sync = self
583 .msg_and_id
584 .inner
585 .lock()
586 .flags
587 .contains(MessageFlags::FORCE_SYNC);
588
589 let data_fut: futures::future::BoxFuture<'_, anyhow::Result<bool>> =
590 if let Some(data) = self.get_data_if_dirty() {
591 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
592 data_spool
593 .store(self.msg_and_id.id, data, force_sync, deadline)
594 .map_ok(|_| true)
595 .boxed()
596 } else {
597 futures::future::ready(Ok(false)).boxed()
598 };
599 let meta_fut: futures::future::BoxFuture<'_, anyhow::Result<bool>> =
600 if let Some(meta) = self.get_meta_if_dirty() {
601 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
602 meta_spool
603 .store(self.msg_and_id.id, meta, force_sync, deadline)
604 .map_ok(|_| true)
605 .boxed()
606 } else {
607 futures::future::ready(Ok(false)).boxed()
608 };
609
610 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
616
617 if matches!(data_res, Ok(true)) {
623 self.msg_and_id
624 .inner
625 .lock()
626 .flags
627 .remove(MessageFlags::DATA_DIRTY);
628 }
629 if matches!(meta_res, Ok(true)) {
630 self.msg_and_id
631 .inner
632 .lock()
633 .flags
634 .remove(MessageFlags::META_DIRTY);
635 }
636
637 match (data_res, meta_res) {
651 (Ok(_), Ok(_)) => Ok(()),
652 (Err(data_err), Ok(_)) => Err(data_err.context("data spool store")),
653 (Ok(_), Err(meta_err)) => Err(meta_err.context("meta spool store")),
654 (Err(data_err), Err(meta_err)) => {
655 let data_unhealthy = data_err.root_cause().is::<::spool::SpoolUnhealthyError>();
656 let meta_unhealthy = meta_err.root_cause().is::<::spool::SpoolUnhealthyError>();
657 match (data_unhealthy, meta_unhealthy) {
658 (true, _) => Err(data_err.context(format!(
659 "data spool store; meta spool store also failed: {meta_err:#}"
660 ))),
661 (false, true) => Err(meta_err.context(format!(
662 "meta spool store; data spool store also failed: {data_err:#}"
663 ))),
664 (false, false) => Err(anyhow::anyhow!(
665 "data spool store: {data_err:#}; meta spool store: {meta_err:#}"
666 )),
667 }
668 }
669 }
670 }
671
672 pub fn id(&self) -> &SpoolId {
673 &self.msg_and_id.id
674 }
675
676 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
678 self.save(None).await?;
679 self.shrink()
680 }
681
682 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
684 self.save(None).await?;
685 self.shrink_data()
686 }
687
688 pub fn shrink_data(&self) -> anyhow::Result<bool> {
689 let mut inner = self.msg_and_id.inner.lock();
690 let mut did_shrink = false;
691 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
692 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
693 }
694 if !inner.data.is_empty() {
695 DATA_COUNT.dec();
696 did_shrink = true;
697 }
698 if !inner.data.is_empty() {
699 inner.data = NO_DATA.clone();
700 did_shrink = true;
701 }
702 Ok(did_shrink)
703 }
704
705 pub fn shrink(&self) -> anyhow::Result<bool> {
706 let mut inner = self.msg_and_id.inner.lock();
707 let mut did_shrink = false;
708 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
709 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
710 }
711 if inner.flags.contains(MessageFlags::META_DIRTY) {
712 anyhow::bail!("Cannot shrink message: META_DIRTY");
713 }
714 if inner.metadata.take().is_some() {
715 META_COUNT.dec();
716 did_shrink = true;
717 }
718 if !inner.data.is_empty() {
719 DATA_COUNT.dec();
720 did_shrink = true;
721 }
722 if !inner.data.is_empty() {
723 inner.data = NO_DATA.clone();
724 did_shrink = true;
725 }
726 Ok(did_shrink)
727 }
728
729 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
730 self.load_meta_if_needed().await?;
731 let inner = self.msg_and_id.inner.lock();
732 match &inner.metadata {
733 Some(meta) => Ok(meta.sender.clone()),
734 None => anyhow::bail!("Message::sender metadata is not loaded"),
735 }
736 }
737
738 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
739 self.load_meta_if_needed().await?;
740 let mut inner = self.msg_and_id.inner.lock();
741 match &mut inner.metadata {
742 Some(meta) => {
743 meta.sender = sender;
744 inner.flags.set(MessageFlags::DATA_DIRTY, true);
745 Ok(())
746 }
747 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
748 }
749 }
750
751 #[deprecated = "use recipient_list or first_recipient instead"]
752 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
753 self.first_recipient().await
754 }
755
756 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
757 self.load_meta_if_needed().await?;
758 let inner = self.msg_and_id.inner.lock();
759 match &inner.metadata {
760 Some(meta) => match meta.recipient.first() {
761 Some(recip) => Ok(recip.clone()),
762 None => anyhow::bail!("recipient list is empty!?"),
763 },
764 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
765 }
766 }
767
768 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
769 self.load_meta_if_needed().await?;
770 let inner = self.msg_and_id.inner.lock();
771 match &inner.metadata {
772 Some(meta) => Ok(meta.recipient.clone()),
773 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
774 }
775 }
776
777 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
778 self.load_meta_if_needed().await?;
779 let inner = self.msg_and_id.inner.lock();
780 match &inner.metadata {
781 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
782 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
783 }
784 }
785
786 #[deprecated = "use set_recipient_list instead"]
787 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
788 self.set_recipient_list(vec![recipient]).await
789 }
790
791 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
792 self.load_meta_if_needed().await?;
793
794 let mut inner = self.msg_and_id.inner.lock();
795 match &mut inner.metadata {
796 Some(meta) => {
797 meta.recipient = recipient;
798 inner.flags.set(MessageFlags::DATA_DIRTY, true);
799 Ok(())
800 }
801 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
802 }
803 }
804
805 pub fn is_meta_loaded(&self) -> bool {
806 self.msg_and_id.inner.lock().metadata.is_some()
807 }
808
809 pub fn is_data_loaded(&self) -> bool {
810 !self.msg_and_id.inner.lock().data.is_empty()
811 }
812
813 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
814 if self.is_meta_loaded() {
815 return Ok(());
816 }
817 self.load_meta().await
818 }
819
820 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
821 self.load_data_if_needed().await
822 }
823
824 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
825 if self.is_data_loaded() {
826 return Ok(self.get_data_maybe_not_loaded());
827 }
828 self.load_data().await
829 }
830
831 pub async fn load_meta(&self) -> anyhow::Result<()> {
832 let _timer = LOAD_META_HIST.start_timer();
833 self.load_meta_from(&**get_meta_spool()).await
834 }
835
836 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
837 let id = self.id();
838 let data = meta_spool.load(*id).await?;
839 let mut inner = self.msg_and_id.inner.lock();
840 let was_not_loaded = inner.metadata.is_none();
841 let metadata: MetaData = serde_json::from_slice(&data)?;
842 inner.metadata.replace(Box::new(metadata));
843 if was_not_loaded {
844 META_COUNT.inc();
845 }
846 Ok(())
847 }
848
849 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
850 let _timer = LOAD_DATA_HIST.start_timer();
851 self.load_data_from(&**get_data_spool()).await
852 }
853
854 async fn load_data_from(
855 &self,
856 data_spool: &(dyn Spool + Send + Sync),
857 ) -> anyhow::Result<Arc<Box<[u8]>>> {
858 let data = data_spool.load(*self.id()).await?;
859 let mut inner = self.msg_and_id.inner.lock();
860 let was_empty = inner.data.is_empty();
861 inner.data = Arc::new(data.into_boxed_slice());
862 if was_empty {
863 DATA_COUNT.inc();
864 }
865 Ok(inner.data.clone())
866 }
867
868 pub fn assign_data(&self, data: Vec<u8>) {
869 let mut inner = self.msg_and_id.inner.lock();
870 let was_empty = inner.data.is_empty();
871 inner.data = Arc::new(data.into_boxed_slice());
872 inner.flags.set(MessageFlags::DATA_DIRTY, true);
873 if was_empty {
874 DATA_COUNT.inc();
875 }
876 }
877
878 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
879 let inner = self.msg_and_id.inner.lock();
880 inner.data.clone()
881 }
882
883 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
884 &self,
885 key: S,
886 value: V,
887 ) -> anyhow::Result<()> {
888 self.load_meta_if_needed().await?;
889 let mut inner = self.msg_and_id.inner.lock();
890 match &mut inner.metadata {
891 None => anyhow::bail!("set_meta: metadata must be loaded first"),
892 Some(meta) => {
893 let key = key.as_ref();
894 let value = value.into();
895
896 match &mut meta.meta {
897 serde_json::Value::Object(map) => {
898 map.insert(key.to_string(), value);
899 }
900 _ => anyhow::bail!("metadata is somehow not a json object"),
901 }
902
903 inner.flags.set(MessageFlags::META_DIRTY, true);
904 Ok(())
905 }
906 }
907 }
908
909 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
910 self.load_meta_if_needed().await?;
911 let mut inner = self.msg_and_id.inner.lock();
912 match &mut inner.metadata {
913 None => anyhow::bail!("set_meta: metadata must be loaded first"),
914 Some(meta) => {
915 let key = key.as_ref();
916
917 match &mut meta.meta {
918 serde_json::Value::Object(map) => {
919 map.remove(key);
920 }
921 _ => anyhow::bail!("metadata is somehow not a json object"),
922 }
923
924 inner.flags.set(MessageFlags::META_DIRTY, true);
925 Ok(())
926 }
927 }
928 }
929
930 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
932 &self,
933 key: S,
934 ) -> anyhow::Result<Option<String>> {
935 match self.get_meta(key).await {
936 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
937 Ok(serde_json::Value::Null) => Ok(None),
938 hmm => {
939 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
940 }
941 }
942 }
943
944 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
945 self.load_meta_if_needed().await?;
946 let inner = self.msg_and_id.inner.lock();
947 match &inner.metadata {
948 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
949 Some(meta) => Ok(meta.meta.clone()),
950 }
951 }
952
953 pub async fn get_meta<S: serde_json::value::Index>(
954 &self,
955 key: S,
956 ) -> anyhow::Result<serde_json::Value> {
957 self.load_meta_if_needed().await?;
958 let inner = self.msg_and_id.inner.lock();
959 match &inner.metadata {
960 None => anyhow::bail!("get_meta: metadata must be loaded first"),
961 Some(meta) => match meta.meta.get(key) {
962 Some(value) => Ok(value.clone()),
963 None => Ok(serde_json::Value::Null),
964 },
965 }
966 }
967
968 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
969 self.msg_and_id.id.age(now)
970 }
971
972 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
973 Ok(match self.get_meta_string("queue").await? {
974 Some(name) => name,
975 None => {
976 let name = QueueNameComponents::format(
977 self.get_meta_string("campaign").await?,
978 self.get_meta_string("tenant").await?,
979 self.first_recipient()
980 .await?
981 .domain()
982 .to_string()
983 .to_lowercase(),
984 self.get_meta_string("routing_domain").await?,
985 );
986 name.to_string()
987 }
988 })
989 }
990
991 #[cfg(feature = "impl")]
992 pub async fn arc_verify(
993 &self,
994 opt_resolver_name: Option<String>,
995 ) -> anyhow::Result<AuthenticationResult> {
996 let resolver = get_resolver_instance(&opt_resolver_name)?;
997 let data = self.data().await?;
998 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
999
1000 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1001 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1002
1003 let arc = ARC::verify(&message, &**resolver).await;
1004 Ok(arc.authentication_result())
1005 }
1006
1007 #[cfg(feature = "impl")]
1008 pub async fn arc_seal(
1009 &self,
1010 signer: Signer,
1011 auth_results: AuthenticationResults,
1012 opt_resolver_name: Option<String>,
1013 ) -> anyhow::Result<()> {
1014 let resolver = get_resolver_instance(&opt_resolver_name)?;
1015 let data = self.data().await?;
1016 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1017 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1018 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1019 let arc = ARC::verify(&message, &**resolver).await;
1020
1021 let headers = arc.seal(&message, auth_results, signer.signer())?;
1022 if !headers.is_empty() {
1023 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
1024
1025 for hdr in headers {
1026 hdr.write_header(&mut new_data).ok();
1027 }
1028 new_data.extend_from_slice(&data);
1029 self.assign_data(new_data);
1030 }
1031
1032 Ok(())
1033 }
1034
1035 #[cfg(feature = "impl")]
1036 pub async fn dkim_verify(
1037 &self,
1038 opt_resolver_name: Option<String>,
1039 ) -> anyhow::Result<Vec<AuthenticationResult>> {
1040 let resolver = get_resolver_instance(&opt_resolver_name)?;
1041 let data = self.data().await?;
1042 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1043
1044 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1045 if parsed
1046 .overall_conformance
1047 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1048 {
1049 return Ok(vec![AuthenticationResult {
1050 method: "dkim".into(),
1051 method_version: None,
1052 result: "permerror".into(),
1053 reason: Some("message has non-canonical line endings".into()),
1054 props: Default::default(),
1055 }]);
1056 }
1057 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1058
1059 let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1060 Ok(results)
1061 }
1062
1063 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1068 let data = self.data().await?;
1069 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1070 Ok(MimePart::parse(owned_data)?)
1071 }
1072
1073 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1074 let data = self.data().await?;
1075 Report::parse(&data)
1076 }
1077
1078 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1079 let data = self.data().await?;
1080 ARFReport::parse(&data)
1081 }
1082
1083 pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1084 let data = self.data().await?;
1085 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1086 emit_header(&mut new_data, name, value);
1087 new_data.extend_from_slice(&data);
1088 self.assign_data(new_data);
1089 Ok(())
1090 }
1091
1092 pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1093 let data = self.data().await?;
1094 let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1095 for (idx, window) in data.windows(4).enumerate() {
1096 if window == b"\r\n\r\n" {
1097 let headers = &data[0..idx + 2];
1098 let body = &data[idx + 2..];
1099
1100 new_data.extend_from_slice(headers);
1101 emit_header(&mut new_data, name, value);
1102 new_data.extend_from_slice(body);
1103 self.assign_data(new_data);
1104 return Ok(());
1105 }
1106 }
1107
1108 anyhow::bail!("append_header could not find the end of the header block");
1109 }
1110
1111 pub async fn get_address_header(
1112 &self,
1113 header_name: &str,
1114 ) -> anyhow::Result<Option<HeaderAddressList>> {
1115 let data = self.data().await?;
1116 let HeaderParseResult { headers, .. } =
1117 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1118
1119 match headers.get_first(header_name) {
1120 Some(hdr) => {
1121 let list = hdr.as_address_list()?;
1122 let result: HeaderAddressList = list.into();
1123 Ok(Some(result))
1124 }
1125 None => Ok(None),
1126 }
1127 }
1128
1129 pub async fn get_first_named_header_value(
1130 &self,
1131 name: &str,
1132 ) -> anyhow::Result<Option<BString>> {
1133 let data = self.data().await?;
1134 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1135
1136 match headers.get_first(name) {
1137 Some(hdr) => Ok(Some(
1138 hdr.as_unstructured()
1139 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1140 )),
1141 None => Ok(None),
1142 }
1143 }
1144
1145 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1146 let data = self.data().await?;
1147 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1148
1149 let mut values = vec![];
1150 for hdr in headers.iter_named(name) {
1151 values.push(
1152 hdr.as_unstructured()
1153 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1154 );
1155 }
1156 Ok(values)
1157 }
1158
1159 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1160 let data = self.data().await?;
1161 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1162
1163 let mut values = vec![];
1164 for hdr in headers.iter() {
1165 values.push((
1166 hdr.get_name().into(),
1167 hdr.as_unstructured()
1168 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1169 ));
1170 }
1171 Ok(values)
1172 }
1173
1174 pub async fn retain_headers<F: FnMut(usize, &Header) -> bool>(
1175 &self,
1176 mut func: F,
1177 ) -> anyhow::Result<()> {
1178 let data = self.data().await?;
1179 let mut new_data = Vec::with_capacity(data.len());
1180 let HeaderParseResult {
1181 headers,
1182 body_offset,
1183 ..
1184 } = Header::parse_headers(data.as_ref().as_ref())?;
1185 for (idx, hdr) in headers.iter().enumerate() {
1186 let retain = (func)(idx, hdr);
1187 if !retain {
1188 continue;
1189 }
1190 hdr.write_header(&mut new_data)?;
1191 }
1192 new_data.extend_from_slice(b"\r\n");
1193 new_data.extend_from_slice(&data[body_offset..]);
1194 self.assign_data(new_data);
1195 Ok(())
1196 }
1197
1198 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1199 let mut removed = false;
1200 self.retain_headers(|_, hdr| {
1201 if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1202 removed = true;
1203 false
1204 } else {
1205 true
1206 }
1207 })
1208 .await
1209 }
1210
1211 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1212 let specs = if names.is_empty() {
1213 vec![ImportHeaderSpec {
1214 name: "X-*".to_string(),
1215 ..ImportHeaderSpec::default()
1216 }]
1217 } else {
1218 names
1219 .into_iter()
1220 .map(|name| ImportHeaderSpec {
1221 name,
1222 ..ImportHeaderSpec::default()
1223 })
1224 .collect()
1225 };
1226 self.import_headers(specs).await
1227 }
1228
1229 pub async fn import_headers(&self, specs: Vec<ImportHeaderSpec>) -> anyhow::Result<()> {
1230 let compiled: Vec<CompiledImportHeaderSpec> = specs
1231 .into_iter()
1232 .map(CompiledImportHeaderSpec::compile)
1233 .collect::<anyhow::Result<_>>()?;
1234
1235 let data = self.data().await?;
1236 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1237
1238 let mut accumulators: Vec<PerSpecAccumulator> = compiled
1239 .iter()
1240 .map(|s| PerSpecAccumulator::new(s.match_mode))
1241 .collect();
1242 let mut indices_to_remove: HashSet<usize> = HashSet::new();
1243
1244 for (idx, hdr) in headers.iter().enumerate() {
1245 let hdr_name = hdr.get_name();
1246 for (spec_idx, spec) in compiled.iter().enumerate() {
1247 if spec.matches(hdr_name) {
1248 let key = spec.target_key(&hdr.get_name_lossy());
1249 let value = hdr.as_unstructured()?.to_str_lossy().to_string();
1250 accumulators[spec_idx].record(key, value);
1251 if spec.remove {
1252 indices_to_remove.insert(idx);
1253 }
1254 break;
1255 }
1256 }
1257 }
1258
1259 for acc in accumulators {
1260 acc.write_to(self).await?;
1261 }
1262
1263 if !indices_to_remove.is_empty() {
1264 self.retain_headers(|idx, _| !indices_to_remove.contains(&idx))
1265 .await?;
1266 }
1267
1268 Ok(())
1269 }
1270
1271 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1272 self.retain_headers(|_, hdr| {
1273 if names.is_empty() {
1274 !is_x_header(hdr.get_name())
1275 } else {
1276 !names
1277 .iter()
1278 .any(|n| hdr.get_name().eq_ignore_ascii_case(n.as_bytes()))
1279 }
1280 })
1281 .await
1282 }
1283
1284 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1285 self.retain_headers(|_, hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1286 .await
1287 }
1288
1289 #[cfg(feature = "impl")]
1290 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1291 let data = self.data().await?;
1292 let header = if let Some(runtime) = SIGN_POOL.get() {
1293 runtime.spawn_blocking(move || signer.sign(&data)).await??
1294 } else {
1295 signer.sign(&data)?
1296 };
1297 self.prepend_header(None, header.as_bytes()).await?;
1298 Ok(())
1299 }
1300
1301 pub async fn import_scheduling_header(
1302 &self,
1303 header_name: &str,
1304 remove: bool,
1305 ) -> anyhow::Result<Option<Scheduling>> {
1306 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1307 let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1308 format!("{value} from header {header_name} is not a valid Scheduling header")
1309 })?;
1310 let result = self.set_scheduling(Some(sched)).await?;
1311
1312 if remove {
1313 self.remove_all_named_headers(header_name).await?;
1314 }
1315
1316 Ok(result)
1317 } else {
1318 Ok(None)
1319 }
1320 }
1321
1322 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1323 let data = self.data().await?;
1324 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1325 let parts = msg.simplified_structure_pointers()?;
1326 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1327 match p.body()? {
1328 DecodedBody::Text(text) => {
1329 let mut text = text.as_bytes().to_vec();
1330 text.push_str("\r\n");
1331 text.push_str(content);
1332 p.replace_text_body("text/plain", &*text)?;
1333
1334 let new_data = msg.to_message_bytes();
1335 self.assign_data(new_data);
1336 Ok(true)
1337 }
1338 DecodedBody::Binary(_) => {
1339 anyhow::bail!("expected text/plain part to be text, but it is binary");
1340 }
1341 }
1342 } else {
1343 Ok(false)
1344 }
1345 }
1346
1347 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1348 let data = self.data().await?;
1349 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1350 let parts = msg.simplified_structure_pointers()?;
1351 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1352 match p.body()? {
1353 DecodedBody::Text(text) => {
1354 let mut text = text.as_bytes().to_vec();
1355
1356 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1357 Some(idx) => {
1358 text.insert_str(idx, content);
1359 text.insert_str(idx, "\r\n");
1360 }
1361 None => {
1362 text.push_str("\r\n");
1364 text.push_str(content);
1365 }
1366 }
1367
1368 p.replace_text_body("text/html", &*text)?;
1369
1370 let new_data = msg.to_message_bytes();
1371 self.assign_data(new_data);
1372 Ok(true)
1373 }
1374 DecodedBody::Binary(_) => {
1375 anyhow::bail!("expected text/html part to be text, but it is binary");
1376 }
1377 }
1378 } else {
1379 Ok(false)
1380 }
1381 }
1382
1383 pub async fn check_fix_conformance(
1384 &self,
1385 check: MessageConformance,
1386 fix: MessageConformance,
1387 settings: Option<&CheckFixSettings>,
1388 ) -> anyhow::Result<()> {
1389 let data = self.data().await?;
1390 let data_bytes = data.as_ref().as_ref();
1391 let msg = MimePart::parse(data_bytes)?;
1392
1393 let mut settings = settings.map(Clone::clone).unwrap_or_default();
1394
1395 if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1396 && settings.message_id.is_none()
1397 && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1398 {
1399 let sender = self.sender().await?;
1400 let domain = sender.domain();
1401 let id = *self.id();
1402 settings.message_id.replace(format!("{id}@{domain}"));
1403 }
1404
1405 if settings.detect_encoding {
1406 settings.data_bytes.replace(data.clone());
1407 }
1408
1409 let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1410
1411 if let Some(msg) = opt_msg {
1412 let new_data = msg.to_message_bytes();
1413 self.assign_data(new_data);
1414 }
1415
1416 Ok(())
1417 }
1418}
1419
1420fn imported_header_name(name: &str) -> String {
1421 name.chars()
1422 .map(|c| match c.to_ascii_lowercase() {
1423 '-' => '_',
1424 c => c,
1425 })
1426 .collect()
1427}
1428
1429fn is_x_header(name: &[u8]) -> bool {
1430 name.starts_with_str("X-") || name.starts_with_str("x-")
1431}
1432
1433#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1434#[serde(rename_all = "snake_case")]
1435pub enum MatchMode {
1436 First,
1437 #[default]
1438 Last,
1439 All,
1440}
1441
1442#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
1443#[serde(rename_all = "snake_case")]
1444pub enum NameTransform {
1445 #[default]
1446 SnakeCase,
1447 KebabCase,
1448 CamelCase,
1449 PascalCase,
1450}
1451
1452#[derive(Debug, Clone, Deserialize)]
1453#[serde(deny_unknown_fields)]
1454pub struct ImportHeaderSpec {
1455 pub name: String,
1456 #[serde(default, rename = "match")]
1457 pub match_mode: MatchMode,
1458 #[serde(default)]
1459 pub transform: NameTransform,
1460 #[serde(default)]
1461 pub target: Option<String>,
1462 #[serde(default)]
1463 pub remove: bool,
1464}
1465
1466impl Default for ImportHeaderSpec {
1467 fn default() -> Self {
1468 Self {
1469 name: String::new(),
1470 match_mode: MatchMode::default(),
1471 transform: NameTransform::default(),
1472 target: None,
1473 remove: false,
1474 }
1475 }
1476}
1477
1478#[derive(Debug, Clone)]
1479enum HeaderPattern {
1480 Exact(String),
1482 Prefix(String),
1485}
1486
1487#[derive(Debug, Clone)]
1488struct CompiledImportHeaderSpec {
1489 pattern: HeaderPattern,
1490 match_mode: MatchMode,
1491 transform: NameTransform,
1492 target: Option<String>,
1493 remove: bool,
1494}
1495
1496impl CompiledImportHeaderSpec {
1497 fn compile(spec: ImportHeaderSpec) -> anyhow::Result<Self> {
1498 let pattern = compile_header_pattern(&spec.name)?;
1499 if spec.target.is_some() && matches!(pattern, HeaderPattern::Prefix(_)) {
1500 anyhow::bail!(
1501 "import_headers: `target` cannot be used with wildcard pattern {:?}",
1502 spec.name
1503 );
1504 }
1505 Ok(Self {
1506 pattern,
1507 match_mode: spec.match_mode,
1508 transform: spec.transform,
1509 target: spec.target,
1510 remove: spec.remove,
1511 })
1512 }
1513
1514 fn matches(&self, hdr_name: &[u8]) -> bool {
1515 match &self.pattern {
1516 HeaderPattern::Exact(s) => hdr_name.eq_ignore_ascii_case(s.as_bytes()),
1517 HeaderPattern::Prefix(p) => {
1518 hdr_name.len() >= p.len() && hdr_name[..p.len()].eq_ignore_ascii_case(p.as_bytes())
1519 }
1520 }
1521 }
1522
1523 fn target_key(&self, matched_name: &str) -> String {
1524 if let Some(target) = &self.target {
1525 return target.clone();
1526 }
1527 apply_name_transform(matched_name, self.transform)
1528 }
1529}
1530
1531fn compile_header_pattern(pat: &str) -> anyhow::Result<HeaderPattern> {
1532 if pat.is_empty() {
1533 anyhow::bail!("import_headers: header name pattern must not be empty");
1534 }
1535 let star_count = pat.chars().filter(|c| *c == '*').count();
1536 if star_count == 0 {
1537 return Ok(HeaderPattern::Exact(pat.to_string()));
1538 }
1539 let Some(prefix) = pat.strip_suffix('*') else {
1540 anyhow::bail!(
1541 "import_headers: only a single trailing `*` is supported in pattern {:?}",
1542 pat
1543 );
1544 };
1545 if star_count > 1 {
1546 anyhow::bail!(
1547 "import_headers: only a single trailing `*` is supported in pattern {:?}",
1548 pat
1549 );
1550 }
1551 if prefix.is_empty() {
1552 anyhow::bail!("import_headers: bare `*` patterns are not supported");
1553 }
1554 Ok(HeaderPattern::Prefix(prefix.to_string()))
1555}
1556
1557fn apply_name_transform(name: &str, transform: NameTransform) -> String {
1558 match transform {
1559 NameTransform::SnakeCase => imported_header_name(name),
1560 NameTransform::KebabCase => name
1561 .split('-')
1562 .map(|p| p.to_ascii_lowercase())
1563 .collect::<Vec<_>>()
1564 .join("-"),
1565 NameTransform::CamelCase => name
1566 .split('-')
1567 .enumerate()
1568 .map(|(i, p)| {
1569 if i == 0 {
1570 p.to_ascii_lowercase()
1571 } else {
1572 titlecase_ascii(p)
1573 }
1574 })
1575 .collect::<Vec<_>>()
1576 .join(""),
1577 NameTransform::PascalCase => name
1578 .split('-')
1579 .map(titlecase_ascii)
1580 .collect::<Vec<_>>()
1581 .join(""),
1582 }
1583}
1584
1585fn titlecase_ascii(s: &str) -> String {
1586 let mut chars = s.chars();
1587 match chars.next() {
1588 None => String::new(),
1589 Some(first) => {
1590 let mut out = String::with_capacity(s.len());
1591 out.push(first.to_ascii_uppercase());
1592 for c in chars {
1593 out.push(c.to_ascii_lowercase());
1594 }
1595 out
1596 }
1597 }
1598}
1599
1600enum AccValue {
1601 Str(String),
1602 Arr(Vec<String>),
1603}
1604
1605struct PerSpecAccumulator {
1606 mode: MatchMode,
1607 by_key: BTreeMap<String, AccValue>,
1608}
1609
1610impl PerSpecAccumulator {
1611 fn new(mode: MatchMode) -> Self {
1612 Self {
1613 mode,
1614 by_key: BTreeMap::new(),
1615 }
1616 }
1617
1618 fn record(&mut self, key: String, value: String) {
1619 match self.mode {
1620 MatchMode::First => {
1621 self.by_key.entry(key).or_insert(AccValue::Str(value));
1622 }
1623 MatchMode::Last => {
1624 self.by_key.insert(key, AccValue::Str(value));
1625 }
1626 MatchMode::All => {
1627 let entry = self
1628 .by_key
1629 .entry(key)
1630 .or_insert_with(|| AccValue::Arr(Vec::new()));
1631 if let AccValue::Arr(v) = entry {
1632 v.push(value);
1633 }
1634 }
1635 }
1636 }
1637
1638 async fn write_to(self, msg: &Message) -> anyhow::Result<()> {
1639 for (key, value) in self.by_key {
1640 match value {
1641 AccValue::Str(s) => msg.set_meta(key, s).await?,
1642 AccValue::Arr(arr) => {
1643 let json = serde_json::Value::Array(
1644 arr.into_iter().map(serde_json::Value::String).collect(),
1645 );
1646 msg.set_meta(key, json).await?;
1647 }
1648 }
1649 }
1650 Ok(())
1651 }
1652}
1653
1654fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1655 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1656}
1657
1658fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1659 if let Some(name) = name {
1660 dest.extend_from_slice(name.as_bytes());
1661 dest.extend_from_slice(b": ");
1662 }
1663 dest.extend_from_slice(value);
1664 if !value.ends_with_str("\r\n") {
1665 dest.extend_from_slice(b"\r\n");
1666 }
1667}
1668
1669#[cfg(feature = "impl")]
1670impl UserData for Message {
1671 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1672 methods.add_async_method(
1673 "set_meta",
1674 move |_, this, (name, value): (String, mlua::Value)| async move {
1675 let value = serde_json::value::to_value(value).map_err(any_err)?;
1676 this.set_meta(name, value).await.map_err(any_err)?;
1677 Ok(())
1678 },
1679 );
1680 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1681 let value = this.get_meta(name).await.map_err(any_err)?;
1682 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1683 });
1684 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1685 let data = this.data().await.map_err(any_err)?;
1686 lua.create_string(&*data)
1687 });
1688 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1689 this.assign_data(data.as_bytes().to_vec());
1690 Ok(())
1691 });
1692
1693 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1694 let data = this.data().await.map_err(any_err)?;
1695 let owned_data = BString::new(data.as_ref().to_vec());
1696 let part = MimePart::parse(owned_data).map_err(any_err)?;
1697 Ok(mod_mimepart::PartRef::new(part))
1698 });
1699
1700 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1701 this.append_text_plain(&data).await.map_err(any_err)
1702 });
1703
1704 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1705 this.append_text_html(&data).await.map_err(any_err)
1706 });
1707
1708 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1709 methods.add_async_method("sender", move |_, this, _: ()| async move {
1710 this.sender().await.map_err(any_err)
1711 });
1712
1713 methods.add_method("num_attempts", move |_, this, _: ()| {
1714 Ok(this.get_num_attempts())
1715 });
1716 methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1717 Ok(this.increment_num_attempts())
1718 });
1719
1720 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1721 this.get_queue_name().await.map_err(any_err)
1722 });
1723
1724 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1725 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1726 let revised_due = this.set_due(due).await.map_err(any_err)?;
1727 lua.to_value(&revised_due)
1728 });
1729
1730 methods.add_async_method(
1731 "set_sender",
1732 move |lua, this, value: mlua::Value| async move {
1733 let sender = match value {
1734 mlua::Value::String(s) => {
1735 let s = s.to_str()?;
1736 EnvelopeAddress::parse(&s).map_err(any_err)?
1737 }
1738 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1739 };
1740 this.set_sender(sender).await.map_err(any_err)
1741 },
1742 );
1743
1744 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1745 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1746 match recipients.len() {
1747 0 => Ok(mlua::Value::Nil),
1748 1 => {
1749 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1750 recip.into_lua(&lua)
1751 }
1752 _ => recipients.into_lua(&lua),
1753 }
1754 });
1755
1756 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1757 let recipients = this.recipient_list().await.map_err(any_err)?;
1758 recipients.into_lua(&lua)
1759 });
1760
1761 methods.add_async_method(
1762 "set_recipient",
1763 move |lua, this, value: mlua::Value| async move {
1764 let recipients = match value {
1765 mlua::Value::String(s) => {
1766 let s = s.to_str()?;
1767 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1768 }
1769 _ => {
1770 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1771 recips
1772 } else {
1773 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1774 }
1775 }
1776 };
1777 this.set_recipient_list(recipients).await.map_err(any_err)
1778 },
1779 );
1780
1781 #[cfg(feature = "impl")]
1782 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1783 this.dkim_sign(signer).await.map_err(any_err)
1784 });
1785
1786 methods.add_async_method("shrink", |_, this, _: ()| async move {
1787 if this.needs_save() {
1788 this.save(None).await.map_err(any_err)?;
1789 }
1790 this.shrink().map_err(any_err)
1791 });
1792
1793 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1794 if this.needs_save() {
1795 this.save(None).await.map_err(any_err)?;
1796 }
1797 this.shrink_data().map_err(any_err)
1798 });
1799
1800 methods.add_async_method(
1801 "add_authentication_results",
1802 |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1803 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1804 let results = AuthenticationResults {
1805 serv_id: serv_id.as_bytes().as_ref().into(),
1806 version: None,
1807 results,
1808 };
1809
1810 this.prepend_header(
1811 Some("Authentication-Results"),
1812 results.encode_value().as_bytes(),
1813 )
1814 .await
1815 .map_err(any_err)?;
1816
1817 Ok(())
1818 },
1819 );
1820
1821 #[cfg(feature = "impl")]
1822 methods.add_async_method(
1823 "arc_verify",
1824 |lua, this, opt_resolver_name: Option<String>| async move {
1825 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1826 lua.to_value_with(&results, serialize_options())
1827 },
1828 );
1829
1830 #[cfg(feature = "impl")]
1831 methods.add_async_method(
1832 "arc_seal",
1833 |lua,
1834 this,
1835 (signer, serv_id, auth_res, opt_resolver_name): (
1836 Signer,
1837 mlua::String,
1838 mlua::Value,
1839 Option<String>,
1840 )| async move {
1841 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1842 this.arc_seal(
1843 signer,
1844 AuthenticationResults {
1845 serv_id: serv_id.as_bytes().as_ref().into(),
1846 version: None,
1847 results,
1848 },
1849 opt_resolver_name,
1850 )
1851 .await
1852 .map_err(any_err)
1853 },
1854 );
1855
1856 #[cfg(feature = "impl")]
1857 methods.add_async_method(
1858 "dkim_verify",
1859 |lua, this, opt_resolver_name: Option<String>| async move {
1860 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1861 lua.to_value_with(&results, serialize_options())
1862 },
1863 );
1864
1865 methods.add_async_method(
1866 "prepend_header",
1867 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1868 let encode = encode.unwrap_or(false);
1869 if encode {
1870 let header = Header::new_unstructured(name.clone(), value);
1871 this.prepend_header(Some(&name), header.get_raw_value())
1872 .await
1873 .map_err(any_err)?;
1874 } else {
1875 this.prepend_header(Some(&name), value.as_bytes())
1876 .await
1877 .map_err(any_err)?;
1878 }
1879 Ok(())
1880 },
1881 );
1882 methods.add_async_method(
1883 "append_header",
1884 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1885 let encode = encode.unwrap_or(false);
1886 if encode {
1887 let header = Header::new_unstructured(name.clone(), value);
1888 this.append_header(Some(&name), header.get_raw_value())
1889 .await
1890 .map_err(any_err)?;
1891 } else {
1892 this.append_header(Some(&name), value.as_bytes())
1893 .await
1894 .map_err(any_err)?;
1895 }
1896 Ok(())
1897 },
1898 );
1899 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1900 this.get_address_header(&name).await.map_err(any_err)
1901 });
1902 methods.add_async_method("from_header", |_, this, ()| async move {
1903 this.get_address_header("From").await.map_err(any_err)
1904 });
1905 methods.add_async_method("to_header", |_, this, ()| async move {
1906 this.get_address_header("To").await.map_err(any_err)
1907 });
1908
1909 methods.add_async_method(
1910 "get_first_named_header_value",
1911 |_, this, name: String| async move {
1912 this.get_first_named_header_value(&name)
1913 .await
1914 .map_err(any_err)
1915 },
1916 );
1917 methods.add_async_method(
1918 "get_all_named_header_values",
1919 |_, this, name: String| async move {
1920 this.get_all_named_header_values(&name)
1921 .await
1922 .map_err(any_err)
1923 },
1924 );
1925 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1926 Ok(this
1927 .get_all_headers()
1928 .await
1929 .map_err(any_err)?
1930 .into_iter()
1931 .map(|(name, value)| vec![name, value])
1932 .collect::<Vec<Vec<BString>>>())
1933 });
1934 methods.add_async_method(
1935 "import_x_headers",
1936 |_, this, names: Option<Vec<String>>| async move {
1937 this.import_x_headers(names.unwrap_or_default())
1938 .await
1939 .map_err(any_err)
1940 },
1941 );
1942 methods.add_async_method(
1943 "import_headers",
1944 |_, this, specs: SerdeWrappedValue<Vec<ImportHeaderSpec>>| async move {
1945 this.import_headers(specs.0).await.map_err(any_err)
1946 },
1947 );
1948
1949 methods.add_async_method(
1950 "remove_x_headers",
1951 |_, this, names: Option<Vec<String>>| async move {
1952 this.remove_x_headers(names.unwrap_or_default())
1953 .await
1954 .map_err(any_err)
1955 },
1956 );
1957 methods.add_async_method(
1958 "remove_all_named_headers",
1959 |_, this, name: String| async move {
1960 this.remove_all_named_headers(&name).await.map_err(any_err)
1961 },
1962 );
1963
1964 methods.add_async_method(
1965 "import_scheduling_header",
1966 |lua, this, (header_name, remove): (String, bool)| async move {
1967 let opt_schedule = this
1968 .import_scheduling_header(&header_name, remove)
1969 .await
1970 .map_err(any_err)?;
1971 lua.to_value(&opt_schedule)
1972 },
1973 );
1974
1975 methods.add_async_method(
1976 "set_scheduling",
1977 move |lua, this, params: mlua::Value| async move {
1978 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1979 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1980 lua.to_value(&opt_schedule)
1981 },
1982 );
1983
1984 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1985 let report = this.parse_rfc3464().await.map_err(any_err)?;
1986 match report {
1987 Some(report) => lua.to_value_with(&report, serialize_options()),
1988 None => Ok(mlua::Value::Nil),
1989 }
1990 });
1991
1992 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1993 let report = this.parse_rfc5965().await.map_err(any_err)?;
1994 match report {
1995 Some(report) => lua.to_value_with(&report, serialize_options()),
1996 None => Ok(mlua::Value::Nil),
1997 }
1998 });
1999
2000 methods.add_async_method("save", |_, this, ()| async move {
2001 this.save(None).await.map_err(any_err)
2002 });
2003
2004 methods.add_method("set_force_sync", move |_, this, force: bool| {
2005 this.set_force_sync(force);
2006 Ok(())
2007 });
2008
2009 methods.add_async_method(
2010 "check_fix_conformance",
2011 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
2012 use std::str::FromStr;
2013 let check = MessageConformance::from_str(&check).map_err(any_err)?;
2014 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
2015
2016 let settings = match settings {
2017 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
2018 None => None,
2019 };
2020
2021 match this
2022 .check_fix_conformance(check, fix, settings.as_ref())
2023 .await
2024 {
2025 Ok(_) => Ok(None),
2026 Err(err) => Ok(Some(format!("{err:#}"))),
2027 }
2028 },
2029 );
2030 }
2031}
2032
2033impl TimerEntryWithDelay for WeakMessage {
2034 fn delay(&self) -> Duration {
2035 match self.upgrade() {
2036 None => {
2037 Duration::from_millis(0)
2039 }
2040 Some(msg) => msg.delay(),
2041 }
2042 }
2043}
2044
2045impl TimerEntryWithDelay for Message {
2046 fn delay(&self) -> Duration {
2047 let inner = self.msg_and_id.inner.lock();
2048 match inner.due {
2049 Some(time) => {
2050 let now = Utc::now();
2051 let delta = time - now;
2052 delta.to_std().unwrap_or(Duration::from_millis(0))
2053 }
2054 None => Duration::from_millis(0),
2055 }
2056 }
2057}
2058
2059#[cfg(test)]
2060pub(crate) mod test {
2061 use super::*;
2062 use serde_json::json;
2063
2064 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
2065 Message::new_dirty(
2066 SpoolId::new(),
2067 EnvelopeAddress::parse("sender@example.com").unwrap(),
2068 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
2069 serde_json::json!({}),
2070 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
2071 )
2072 .unwrap()
2073 }
2074
2075 fn data_as_string(msg: &Message) -> String {
2076 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
2077 }
2078
2079 const X_HDR_CONTENT: &str =
2080 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
2081
2082 #[tokio::test]
2083 async fn import_all_x_headers() {
2084 let msg = new_msg_body(X_HDR_CONTENT);
2085
2086 msg.import_x_headers(vec![]).await.unwrap();
2087 k9::assert_equal!(
2088 msg.get_meta_obj().await.unwrap(),
2089 json!({
2090 "x_hello": "there",
2091 "x_header": "value",
2092 })
2093 );
2094 }
2095
2096 #[tokio::test]
2097 async fn meta_and_nil() {
2098 let msg = new_msg_body(X_HDR_CONTENT);
2099 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
2101 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
2102
2103 let lua = mlua::Lua::new();
2105 lua.globals().set("msg", msg).unwrap();
2106 lua.load("assert(msg:get_meta('test') == nil)")
2107 .exec()
2108 .unwrap();
2109 }
2110
2111 #[tokio::test]
2112 async fn import_some_x_headers() {
2113 let msg = new_msg_body(X_HDR_CONTENT);
2114
2115 msg.import_x_headers(vec!["x-hello".to_string()])
2116 .await
2117 .unwrap();
2118 k9::assert_equal!(
2119 msg.get_meta_obj().await.unwrap(),
2120 json!({
2121 "x_hello": "there",
2122 })
2123 );
2124 }
2125
2126 #[tokio::test]
2127 async fn import_headers_wildcard_remove() {
2128 let msg = new_msg_body(X_HDR_CONTENT);
2129
2130 msg.import_headers(vec![ImportHeaderSpec {
2131 name: "X-*".to_string(),
2132 remove: true,
2133 ..ImportHeaderSpec::default()
2134 }])
2135 .await
2136 .unwrap();
2137 k9::assert_equal!(
2138 msg.get_meta_obj().await.unwrap(),
2139 json!({
2140 "x_hello": "there",
2141 "x_header": "value",
2142 })
2143 );
2144 k9::assert_equal!(
2145 data_as_string(&msg),
2146 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2147 );
2148 }
2149
2150 #[tokio::test]
2151 async fn import_headers_match_modes() {
2152 let body =
2153 "Received: from a\r\nReceived: from b\r\nReceived: from c\r\nSubject: hi\r\n\r\nBody";
2154 for (mode, expected) in [
2155 (MatchMode::First, json!("from a")),
2156 (MatchMode::Last, json!("from c")),
2157 (MatchMode::All, json!(["from a", "from b", "from c"])),
2158 ] {
2159 let msg = new_msg_body(body);
2160 msg.import_headers(vec![ImportHeaderSpec {
2161 name: "Received".to_string(),
2162 match_mode: mode,
2163 ..ImportHeaderSpec::default()
2164 }])
2165 .await
2166 .unwrap();
2167 k9::assert_equal!(
2168 msg.get_meta_obj().await.unwrap(),
2169 json!({ "received": expected })
2170 );
2171 }
2172 }
2173
2174 #[tokio::test]
2175 async fn import_headers_no_match_skips_meta() {
2176 let msg = new_msg_body(X_HDR_CONTENT);
2177 msg.import_headers(vec![ImportHeaderSpec {
2178 name: "Nonexistent".to_string(),
2179 match_mode: MatchMode::All,
2180 ..ImportHeaderSpec::default()
2181 }])
2182 .await
2183 .unwrap();
2184 k9::assert_equal!(msg.get_meta_obj().await.unwrap(), json!({}));
2185 }
2186
2187 #[tokio::test]
2188 async fn import_headers_specific_before_wildcard() {
2189 let body = "X-Campaign-Id: 42\r\nX-Mailer: foo\r\n\r\nBody";
2190 let msg = new_msg_body(body);
2191 msg.import_headers(vec![
2192 ImportHeaderSpec {
2193 name: "X-Campaign-Id".to_string(),
2194 target: Some("campaign".to_string()),
2195 ..ImportHeaderSpec::default()
2196 },
2197 ImportHeaderSpec {
2198 name: "X-*".to_string(),
2199 ..ImportHeaderSpec::default()
2200 },
2201 ])
2202 .await
2203 .unwrap();
2204 k9::assert_equal!(
2205 msg.get_meta_obj().await.unwrap(),
2206 json!({
2207 "campaign": "42",
2208 "x_mailer": "foo",
2209 })
2210 );
2211 }
2212
2213 #[test]
2214 fn name_transforms() {
2215 let n = "X-Campaign-Id";
2216 k9::assert_equal!(
2217 apply_name_transform(n, NameTransform::SnakeCase),
2218 "x_campaign_id"
2219 );
2220 k9::assert_equal!(
2221 apply_name_transform(n, NameTransform::KebabCase),
2222 "x-campaign-id"
2223 );
2224 k9::assert_equal!(
2225 apply_name_transform(n, NameTransform::CamelCase),
2226 "xCampaignId"
2227 );
2228 k9::assert_equal!(
2229 apply_name_transform(n, NameTransform::PascalCase),
2230 "XCampaignId"
2231 );
2232 }
2233
2234 #[test]
2235 fn pattern_compile_rejects_bad_patterns() {
2236 compile_header_pattern("").unwrap_err();
2237 compile_header_pattern("*").unwrap_err();
2238 compile_header_pattern("*-Id").unwrap_err();
2239 compile_header_pattern("X-*-Id").unwrap_err();
2240 compile_header_pattern("X-**").unwrap_err();
2241 assert!(matches!(
2242 compile_header_pattern("Subject").unwrap(),
2243 HeaderPattern::Exact(_)
2244 ));
2245 assert!(matches!(
2246 compile_header_pattern("X-*").unwrap(),
2247 HeaderPattern::Prefix(_)
2248 ));
2249 }
2250
2251 #[tokio::test]
2252 async fn import_headers_target_with_wildcard_rejected() {
2253 let msg = new_msg_body(X_HDR_CONTENT);
2254 msg.import_headers(vec![ImportHeaderSpec {
2255 name: "X-*".to_string(),
2256 target: Some("oops".to_string()),
2257 ..ImportHeaderSpec::default()
2258 }])
2259 .await
2260 .unwrap_err();
2261 }
2262
2263 #[tokio::test]
2264 async fn remove_all_x_headers() {
2265 let msg = new_msg_body(X_HDR_CONTENT);
2266
2267 msg.remove_x_headers(vec![]).await.unwrap();
2268 k9::assert_equal!(
2269 data_as_string(&msg),
2270 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
2271 );
2272 }
2273
2274 #[tokio::test]
2275 async fn prepend_header_2_params() {
2276 let msg = new_msg_body(X_HDR_CONTENT);
2277
2278 msg.prepend_header(Some("Date"), b"Today").await.unwrap();
2279 k9::assert_equal!(
2280 data_as_string(&msg),
2281 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2282 );
2283 }
2284
2285 #[tokio::test]
2286 async fn prepend_header_1_params() {
2287 let msg = new_msg_body(X_HDR_CONTENT);
2288
2289 msg.prepend_header(None, b"Date: Today").await.unwrap();
2290 k9::assert_equal!(
2291 data_as_string(&msg),
2292 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
2293 );
2294 }
2295
2296 #[tokio::test]
2297 async fn append_header_2_params() {
2298 let msg = new_msg_body(X_HDR_CONTENT);
2299
2300 msg.append_header(Some("Date"), b"Today").await.unwrap();
2301 k9::assert_equal!(
2302 data_as_string(&msg),
2303 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2304 );
2305 }
2306
2307 #[tokio::test]
2308 async fn append_header_1_params() {
2309 let msg = new_msg_body(X_HDR_CONTENT);
2310
2311 msg.append_header(None, b"Date: Today").await.unwrap();
2312 k9::assert_equal!(
2313 data_as_string(&msg),
2314 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
2315 );
2316 }
2317
2318 const MULTI_HEADER_CONTENT: &str =
2319 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
2320
2321 #[tokio::test]
2322 async fn get_first_header() {
2323 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2324 k9::assert_equal!(
2325 msg.get_first_named_header_value("X-header")
2326 .await
2327 .unwrap()
2328 .unwrap(),
2329 "value"
2330 );
2331 }
2332
2333 #[tokio::test]
2334 async fn get_all_header() {
2335 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2336 k9::assert_equal!(
2337 msg.get_all_named_header_values("X-header").await.unwrap(),
2338 vec!["value".to_string(), "another value".to_string()]
2339 );
2340 }
2341
2342 #[tokio::test]
2343 async fn remove_first() {
2344 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2345 msg.remove_first_named_header("X-header").await.unwrap();
2346 k9::assert_equal!(
2347 data_as_string(&msg),
2348 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
2349 );
2350 }
2351
2352 #[tokio::test]
2353 async fn remove_all() {
2354 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2355 msg.remove_all_named_headers("X-header").await.unwrap();
2356 k9::assert_equal!(
2357 data_as_string(&msg),
2358 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
2359 );
2360 }
2361
2362 #[tokio::test]
2363 async fn append_text_plain() {
2364 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2365 msg.append_text_plain("I am at the bottom").await.unwrap();
2366 k9::assert_equal!(
2367 data_as_string(&msg),
2368 "X-Hello: there\r\n\
2369 X-Header: value\r\n\
2370 Subject: Hello\r\n\
2371 X-Header: another value\r\n\
2372 From :Someone@somewhere\r\n\
2373 Content-Type: text/plain;\r\n\
2374 \tcharset=\"us-ascii\"\r\n\
2375 \r\n\
2376 Body\r\n\
2377 I am at the bottom\r\n"
2378 );
2379 }
2380
2381 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
2382\tboundary=\"my-boundary\"\r\n\
2383\r\n\
2384--my-boundary\r\n\
2385Content-Type: text/plain;\r\n\
2386\tcharset=\"us-ascii\"\r\n\
2387\r\n\
2388plain text\r\n\
2389--my-boundary\r\n\
2390Content-Type: text/html;\r\n\
2391\tcharset=\"us-ascii\"\r\n\
2392\r\n\
2393<b>rich</b> text\r\n\
2394--my-boundary\r\n\
2395Content-Type: application/octet-stream\r\n\
2396Content-Transfer-Encoding: base64\r\n\
2397Content-Disposition: attachment;\r\n\
2398\tfilename=\"woot.bin\"\r\n\
2399Content-ID: <woot.id@somewhere>\r\n\
2400\r\n\
2401AAECAw==\r\n\
2402--my-boundary--\r\n\
2403\r\n";
2404
2405 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2406\tboundary=\"my-boundary\"\r\n\
2407\r\n\
2408--my-boundary\r\n\
2409Content-Type: text/plain;\r\n\
2410\tcharset=\"us-ascii\"\r\n\
2411\r\n\
2412plain text\r\n\
2413--my-boundary\r\n\
2414Content-Type: text/html;\r\n\
2415\tcharset=\"us-ascii\"\r\n\
2416\r\n\
2417<BODY>\r\n\
2418<b>rich</b> text\r\n\
2419</BODY>\r\n\
2420--my-boundary\r\n\
2421Content-Type: application/octet-stream\r\n\
2422Content-Transfer-Encoding: base64\r\n\
2423Content-Disposition: attachment;\r\n\
2424\tfilename=\"woot.bin\"\r\n\
2425Content-ID: <woot.id>\r\n\
2426\r\n\
2427AAECAw==\r\n\
2428--my-boundary--\r\n\
2429\r\n";
2430
2431 #[tokio::test]
2432 async fn append_text_html() {
2433 let msg = new_msg_body(MIXED_CONTENT);
2434 msg.append_text_html("bottom html").await.unwrap();
2435 k9::snapshot!(
2436 data_as_string(&msg),
2437 r#"
2438Content-Type: multipart/mixed;\r
2439\tboundary="my-boundary"\r
2440\r
2441--my-boundary\r
2442Content-Type: text/plain;\r
2443\tcharset="us-ascii"\r
2444\r
2445plain text\r
2446--my-boundary\r
2447Content-Type: text/html;\r
2448\tcharset="us-ascii"\r
2449\r
2450<b>rich</b> text\r
2451\r
2452bottom html\r
2453--my-boundary\r
2454Content-Type: application/octet-stream\r
2455Content-Transfer-Encoding: base64\r
2456Content-Disposition: attachment;\r
2457\tfilename="woot.bin"\r
2458Content-ID: <woot.id@somewhere>\r
2459\r
2460AAECAw==\r
2461--my-boundary--\r
2462\r
2463
2464"#
2465 );
2466
2467 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2468 msg.append_text_html("bottom html 👻").await.unwrap();
2469 k9::snapshot!(
2470 data_as_string(&msg),
2471 r#"
2472Content-Type: multipart/mixed;\r
2473\tboundary="my-boundary"\r
2474\r
2475--my-boundary\r
2476Content-Type: text/plain;\r
2477\tcharset="us-ascii"\r
2478\r
2479plain text\r
2480--my-boundary\r
2481Content-Type: text/html;\r
2482\tcharset="utf-8"\r
2483Content-Transfer-Encoding: quoted-printable\r
2484\r
2485<BODY>\r
2486<b>rich</b> text\r
2487\r
2488bottom html =F0=9F=91=BB</BODY>\r
2489--my-boundary\r
2490Content-Type: application/octet-stream\r
2491Content-Transfer-Encoding: base64\r
2492Content-Disposition: attachment;\r
2493\tfilename="woot.bin"\r
2494Content-ID: <woot.id>\r
2495\r
2496AAECAw==\r
2497--my-boundary--\r
2498\r
2499
2500"#
2501 );
2502 }
2503
2504 #[tokio::test]
2505 async fn append_text_plain_mixed() {
2506 let msg = new_msg_body(MIXED_CONTENT);
2507 msg.append_text_plain("bottom text 👾").await.unwrap();
2508 k9::snapshot!(
2509 data_as_string(&msg),
2510 r#"
2511Content-Type: multipart/mixed;\r
2512\tboundary="my-boundary"\r
2513\r
2514--my-boundary\r
2515Content-Type: text/plain;\r
2516\tcharset="utf-8"\r
2517Content-Transfer-Encoding: quoted-printable\r
2518\r
2519plain text\r
2520\r
2521bottom text =F0=9F=91=BE\r
2522--my-boundary\r
2523Content-Type: text/html;\r
2524\tcharset="us-ascii"\r
2525\r
2526<b>rich</b> text\r
2527--my-boundary\r
2528Content-Type: application/octet-stream\r
2529Content-Transfer-Encoding: base64\r
2530Content-Disposition: attachment;\r
2531\tfilename="woot.bin"\r
2532Content-ID: <woot.id@somewhere>\r
2533\r
2534AAECAw==\r
2535--my-boundary--\r
2536\r
2537
2538"#
2539 );
2540 }
2541
2542 #[tokio::test]
2543 async fn check_conformance_angle_msg_id() {
2544 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2545Message-ID: <<1234@example.com>>\r
2546\r
2547Hello";
2548 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2549 k9::snapshot!(
2550 msg.check_fix_conformance(
2551 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2552 MessageConformance::empty(),
2553 None,
2554 )
2555 .await
2556 .unwrap_err()
2557 .to_string(),
2558 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2559 );
2560
2561 msg.check_fix_conformance(
2562 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2563 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2564 None,
2565 )
2566 .await
2567 .unwrap();
2568
2569 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2584Message-ID: <<1234@example.com>>\r
2585\r
2586Hello this is a really long line Hello this is a really long line \
2587Hello this is a really long line Hello this is a really long line \
2588Hello this is a really long line Hello this is a really long line \
2589Hello this is a really long line Hello this is a really long line \
2590Hello this is a really long line Hello this is a really long line \
2591Hello this is a really long line Hello this is a really long line \
2592Hello this is a really long line Hello this is a really long line
2593";
2594 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2595 msg.check_fix_conformance(
2596 MessageConformance::MISSING_COLON_VALUE,
2597 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2598 None,
2599 )
2600 .await
2601 .unwrap();
2602
2603 }
2627
2628 #[tokio::test]
2629 async fn check_conformance() {
2630 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2631 msg.check_fix_conformance(
2632 MessageConformance::default(),
2633 MessageConformance::MISSING_MIME_VERSION,
2634 None,
2635 )
2636 .await
2637 .unwrap();
2638 k9::snapshot!(
2639 data_as_string(&msg),
2640 r#"
2641X-Hello: there\r
2642X-Header: value\r
2643Subject: Hello\r
2644X-Header: another value\r
2645From :Someone@somewhere\r
2646Mime-Version: 1.0\r
2647\r
2648Body
2649"#
2650 );
2651
2652 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2653 msg.check_fix_conformance(
2654 MessageConformance::default(),
2655 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2656 None,
2657 )
2658 .await
2659 .unwrap();
2660 k9::snapshot!(
2661 data_as_string(&msg),
2662 r#"
2663Content-Type: text/plain;\r
2664\tcharset="us-ascii"\r
2665X-Hello: there\r
2666X-Header: value\r
2667Subject: Hello\r
2668X-Header: another value\r
2669From: <Someone@somewhere>\r
2670Mime-Version: 1.0\r
2671\r
2672Body\r
2673
2674"#
2675 );
2676 }
2677
2678 #[tokio::test]
2679 async fn check_fix_latin_input() {
2680 const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2681 let msg = new_msg_body(&*POUNDS);
2682 msg.check_fix_conformance(
2683 MessageConformance::default(),
2684 MessageConformance::NEEDS_TRANSFER_ENCODING,
2685 Some(&CheckFixSettings {
2686 detect_encoding: true,
2687 include_encodings: vec!["iso-8859-1".to_string()],
2688 ..Default::default()
2689 }),
2690 )
2691 .await
2692 .unwrap();
2693
2694 let subject = msg
2695 .get_first_named_header_value("subject")
2696 .await
2697 .unwrap()
2698 .unwrap();
2699 assert_eq!(subject, "£");
2700 }
2701
2702 #[tokio::test]
2703 async fn set_scheduling() -> anyhow::Result<()> {
2704 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2705 assert!(msg.get_due().is_none(), "due is implicitly now");
2706
2707 let now = Utc::now();
2708 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2709
2710 msg.set_scheduling(Some(Scheduling {
2711 restriction: None,
2712 first_attempt: Some((now + one_day).into()),
2713 expires: None,
2714 }))
2715 .await?;
2716
2717 let due = msg.get_due().expect("due to now be set");
2718 assert!(due - now >= one_day, "due time is at least 1 day away");
2719
2720 Ok(())
2721 }
2722
2723 #[cfg(all(test, target_pointer_width = "64"))]
2724 #[test]
2725 fn sizes() {
2726 assert_eq!(std::mem::size_of::<Message>(), 8);
2727 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2728 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2729 }
2730}