1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24 CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use serde::{Deserialize, Serialize};
32use serde_with::formats::PreferOne;
33use serde_with::{serde_as, OneOrMany};
34use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
35use std::hash::Hash;
36use std::sync::{Arc, LazyLock, Weak};
37use std::time::{Duration, Instant};
38use timeq::TimerEntryWithDelay;
39
40bitflags::bitflags! {
41 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
42 struct MessageFlags: u8 {
43 const META_DIRTY = 1;
45 const DATA_DIRTY = 2;
47 const SCHEDULED = 4;
49 const FORCE_SYNC = 8;
51 }
52}
53
54declare_metric! {
55static MESSAGE_COUNT: IntGauge("message_count");
62}
63
64declare_metric! {
65static META_COUNT: IntGauge("message_meta_resident_count");
72}
73
74declare_metric! {
75static DATA_COUNT: IntGauge("message_data_resident_count");
85}
86
87static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
88
89declare_metric! {
90static SAVE_HIST: Histogram("message_save_latency");
100}
101
102declare_metric! {
103static LOAD_DATA_HIST: Histogram("message_data_load_latency");
114}
115
116declare_metric! {
117static LOAD_META_HIST: Histogram("message_meta_load_latency");
123}
124
125#[derive(Debug)]
126struct MessageInner {
127 metadata: Option<Box<MetaData>>,
128 data: Arc<Box<[u8]>>,
129 flags: MessageFlags,
130 num_attempts: u16,
131 due: Option<DateTime<Utc>>,
132}
133
134#[derive(Debug)]
135pub(crate) struct MessageWithId {
136 id: SpoolId,
137 inner: Mutex<MessageInner>,
138 link: LinkedListAtomicLink,
139}
140
141intrusive_adapter!(
142 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
143);
144
145pub struct MessageList {
150 list: LinkedList<MessageWithIdAdapter>,
151 len: usize,
152}
153
154impl Default for MessageList {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160impl MessageList {
161 pub fn new() -> Self {
163 Self {
164 list: LinkedList::new(MessageWithIdAdapter::default()),
165 len: 0,
166 }
167 }
168
169 pub fn len(&self) -> usize {
171 self.len
172 }
173
174 pub fn is_empty(&self) -> bool {
175 self.len == 0
176 }
177
178 pub fn take(&mut self) -> Self {
181 let new_list = Self {
182 list: self.list.take(),
183 len: self.len,
184 };
185 self.len = 0;
186 new_list
187 }
188
189 pub fn push_back(&mut self, message: Message) {
191 self.list.push_back(message.msg_and_id);
192 self.len += 1;
193 }
194
195 pub fn pop_front(&mut self) -> Option<Message> {
197 self.list.pop_front().map(|msg_and_id| {
198 self.len -= 1;
199 Message { msg_and_id }
200 })
201 }
202
203 pub fn pop_back(&mut self) -> Option<Message> {
205 self.list.pop_back().map(|msg_and_id| {
206 self.len -= 1;
207 Message { msg_and_id }
208 })
209 }
210
211 pub fn drain(&mut self) -> Vec<Message> {
215 let mut messages = Vec::with_capacity(self.len);
216 while let Some(msg) = self.pop_front() {
217 messages.push(msg);
218 }
219 messages
220 }
221
222 pub fn extend_from_iter<I>(&mut self, iter: I)
223 where
224 I: Iterator<Item = Message>,
225 {
226 for msg in iter {
227 self.push_back(msg)
228 }
229 }
230}
231
232impl IntoIterator for MessageList {
233 type Item = Message;
234 type IntoIter = MessageListIter;
235 fn into_iter(self) -> MessageListIter {
236 MessageListIter { list: self.list }
237 }
238}
239
240pub struct MessageListIter {
241 list: LinkedList<MessageWithIdAdapter>,
242}
243
244impl Iterator for MessageListIter {
245 type Item = Message;
246 fn next(&mut self) -> Option<Message> {
247 self.list
248 .pop_front()
249 .map(|msg_and_id| Message { msg_and_id })
250 }
251}
252
253#[derive(Clone, Debug)]
254#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
255pub struct Message {
256 pub(crate) msg_and_id: Arc<MessageWithId>,
257}
258
259impl PartialEq for Message {
260 fn eq(&self, other: &Self) -> bool {
261 self.id() == other.id()
262 }
263}
264impl Eq for Message {}
265
266impl Hash for Message {
267 fn hash<H>(&self, hasher: &mut H)
268 where
269 H: std::hash::Hasher,
270 {
271 self.id().hash(hasher)
272 }
273}
274
275#[derive(Clone, Debug)]
276pub struct WeakMessage {
277 weak: Weak<MessageWithId>,
278}
279
280impl WeakMessage {
281 pub fn upgrade(&self) -> Option<Message> {
282 Some(Message {
283 msg_and_id: self.weak.upgrade()?,
284 })
285 }
286}
287
288#[serde_as]
289#[derive(Debug, Serialize, Deserialize, Clone)]
290pub(crate) struct MetaData {
291 pub sender: EnvelopeAddress,
292 #[serde_as(as = "OneOrMany<_, PreferOne>")]
293 pub recipient: Vec<EnvelopeAddress>,
294 pub meta: serde_json::Value,
295 #[serde(default)]
296 pub schedule: Option<Scheduling>,
297}
298
299impl Drop for MessageInner {
300 fn drop(&mut self) {
301 if self.metadata.is_some() {
302 META_COUNT.dec();
303 }
304 if !self.data.is_empty() {
305 DATA_COUNT.dec();
306 }
307 MESSAGE_COUNT.dec();
308 }
309}
310
311impl Message {
312 pub fn new_dirty(
315 id: SpoolId,
316 sender: EnvelopeAddress,
317 recipient: Vec<EnvelopeAddress>,
318 meta: serde_json::Value,
319 data: Arc<Box<[u8]>>,
320 ) -> anyhow::Result<Self> {
321 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
322 MESSAGE_COUNT.inc();
323 DATA_COUNT.inc();
324 META_COUNT.inc();
325 Ok(Self {
326 msg_and_id: Arc::new(MessageWithId {
327 id,
328 inner: Mutex::new(MessageInner {
329 metadata: Some(Box::new(MetaData {
330 sender,
331 recipient,
332 meta,
333 schedule: None,
334 })),
335 data,
336 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
337 num_attempts: 0,
338 due: None,
339 }),
340 link: LinkedListAtomicLink::default(),
341 }),
342 })
343 }
344
345 pub fn weak(&self) -> WeakMessage {
346 WeakMessage {
347 weak: Arc::downgrade(&self.msg_and_id),
348 }
349 }
350
351 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
355 let metadata: MetaData = serde_json::from_slice(&metadata)?;
356 MESSAGE_COUNT.inc();
357 META_COUNT.inc();
358
359 let flags = if metadata.schedule.is_some() {
360 MessageFlags::SCHEDULED
361 } else {
362 MessageFlags::empty()
363 };
364
365 Ok(Self {
366 msg_and_id: Arc::new(MessageWithId {
367 id,
368 inner: Mutex::new(MessageInner {
369 metadata: Some(Box::new(metadata)),
370 data: NO_DATA.clone(),
371 flags,
372 num_attempts: 0,
373 due: None,
374 }),
375 link: LinkedListAtomicLink::default(),
376 }),
377 })
378 }
379
380 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
381 MESSAGE_COUNT.inc();
382 META_COUNT.inc();
383
384 let flags = if metadata.schedule.is_some() {
385 MessageFlags::SCHEDULED
386 } else {
387 MessageFlags::empty()
388 };
389
390 Self {
391 msg_and_id: Arc::new(MessageWithId {
392 id,
393 inner: Mutex::new(MessageInner {
394 metadata: Some(Box::new(metadata)),
395 data,
396 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
397 num_attempts: 0,
398 due: None,
399 }),
400 link: LinkedListAtomicLink::default(),
401 }),
402 }
403 }
404
405 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
406 let meta_spool = get_meta_spool();
407 let data = meta_spool.load(id).await?;
408 Self::new_from_spool(id, data)
409 }
410
411 pub fn get_num_attempts(&self) -> u16 {
412 let inner = self.msg_and_id.inner.lock();
413 inner.num_attempts
414 }
415
416 pub fn set_num_attempts(&self, num_attempts: u16) {
417 let mut inner = self.msg_and_id.inner.lock();
418 inner.num_attempts = num_attempts;
419 }
420
421 pub fn increment_num_attempts(&self) {
422 let mut inner = self.msg_and_id.inner.lock();
423 inner.num_attempts += 1;
424 }
425
426 pub async fn set_scheduling(
427 &self,
428 scheduling: Option<Scheduling>,
429 ) -> anyhow::Result<Option<Scheduling>> {
430 self.load_meta_if_needed().await?;
431 let mut inner = self.msg_and_id.inner.lock();
432 match &mut inner.metadata {
433 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
434 Some(meta) => {
435 meta.schedule = scheduling;
436 inner
437 .flags
438 .set(MessageFlags::SCHEDULED, scheduling.is_some());
439 if let Some(sched) = scheduling {
440 let due = inner.due.unwrap_or_else(Utc::now);
441 inner.due = Some(sched.adjust_for_schedule(due));
442 }
443 Ok(scheduling)
444 }
445 }
446 }
447
448 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
449 self.load_meta_if_needed().await?;
450 let inner = self.msg_and_id.inner.lock();
451 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
452 }
453
454 pub fn get_due(&self) -> Option<DateTime<Utc>> {
455 let inner = self.msg_and_id.inner.lock();
456 inner.due
457 }
458
459 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
460 let scale = rand::random::<f32>();
461 let value = (scale * limit as f32) as i64;
462 self.delay_by(seconds(value)?).await
463 }
464
465 pub async fn delay_by(
466 &self,
467 duration: chrono::Duration,
468 ) -> anyhow::Result<Option<DateTime<Utc>>> {
469 let due = Utc::now() + duration;
470 self.set_due(Some(due)).await
471 }
472
473 pub async fn delay_by_and_jitter(
475 &self,
476 duration: chrono::Duration,
477 ) -> anyhow::Result<Option<DateTime<Utc>>> {
478 let scale = rand::random::<f32>();
479 let value = (scale * 60.) as i64;
480 let due = Utc::now() + duration + seconds(value)?;
481 self.set_due(Some(due)).await
482 }
483
484 pub async fn set_due(
485 &self,
486 due: Option<DateTime<Utc>>,
487 ) -> anyhow::Result<Option<DateTime<Utc>>> {
488 let due = {
489 let mut inner = self.msg_and_id.inner.lock();
490
491 if !inner.flags.contains(MessageFlags::SCHEDULED) {
492 inner.due = due;
494 return Ok(inner.due);
495 }
496
497 let due = due.unwrap_or_else(Utc::now);
498
499 if let Some(meta) = &inner.metadata {
500 inner.due = match &meta.schedule {
501 Some(sched) => Some(sched.adjust_for_schedule(due)),
502 None => Some(due),
503 };
504 return Ok(inner.due);
505 }
506
507 due
510 };
511
512 self.load_meta().await?;
513
514 {
515 let mut inner = self.msg_and_id.inner.lock();
516 match &inner.metadata {
517 Some(meta) => {
518 inner.due = match &meta.schedule {
519 Some(sched) => Some(sched.adjust_for_schedule(due)),
520 None => Some(due),
521 };
522 Ok(inner.due)
523 }
524 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
525 }
526 }
527 }
528
529 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
530 let inner = self.msg_and_id.inner.lock();
531 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
532 Some(Arc::clone(&inner.data))
533 } else {
534 None
535 }
536 }
537
538 fn get_meta_if_dirty(&self) -> Option<MetaData> {
539 let inner = self.msg_and_id.inner.lock();
540 if inner.flags.contains(MessageFlags::META_DIRTY) {
541 inner.metadata.as_ref().map(|md| (**md).clone())
542 } else {
543 None
544 }
545 }
546
547 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
548 self.load_meta_if_needed().await?;
549 let inner = self.msg_and_id.inner.lock();
550 inner
551 .metadata
552 .as_ref()
553 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
554 .map(|md| (**md).clone())
555 }
556
557 pub fn set_force_sync(&self, force: bool) {
558 let mut inner = self.msg_and_id.inner.lock();
559 inner.flags.set(MessageFlags::FORCE_SYNC, force);
560 }
561
562 pub fn needs_save(&self) -> bool {
563 let inner = self.msg_and_id.inner.lock();
564 inner.flags.contains(MessageFlags::META_DIRTY)
565 || inner.flags.contains(MessageFlags::DATA_DIRTY)
566 }
567
568 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
569 let _timer = SAVE_HIST.start_timer();
570 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
571 .await
572 }
573
574 pub async fn save_to(
575 &self,
576 meta_spool: &(dyn Spool + Send + Sync),
577 data_spool: &(dyn Spool + Send + Sync),
578 deadline: Option<Instant>,
579 ) -> anyhow::Result<()> {
580 let force_sync = self
581 .msg_and_id
582 .inner
583 .lock()
584 .flags
585 .contains(MessageFlags::FORCE_SYNC);
586
587 let data_fut = if let Some(data) = self.get_data_if_dirty() {
588 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
589 data_spool
590 .store(self.msg_and_id.id, data, force_sync, deadline)
591 .map(|_| true)
592 .boxed()
593 } else {
594 futures::future::ready(false).boxed()
595 };
596 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
597 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
598 meta_spool
599 .store(self.msg_and_id.id, meta, force_sync, deadline)
600 .map(|_| true)
601 .boxed()
602 } else {
603 futures::future::ready(false).boxed()
604 };
605
606 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
612
613 if data_res {
614 self.msg_and_id
615 .inner
616 .lock()
617 .flags
618 .remove(MessageFlags::DATA_DIRTY);
619 }
620 if meta_res {
621 self.msg_and_id
622 .inner
623 .lock()
624 .flags
625 .remove(MessageFlags::META_DIRTY);
626 }
627 Ok(())
628 }
629
630 pub fn id(&self) -> &SpoolId {
631 &self.msg_and_id.id
632 }
633
634 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
636 self.save(None).await?;
637 self.shrink()
638 }
639
640 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
642 self.save(None).await?;
643 self.shrink_data()
644 }
645
646 pub fn shrink_data(&self) -> anyhow::Result<bool> {
647 let mut inner = self.msg_and_id.inner.lock();
648 let mut did_shrink = false;
649 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
650 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
651 }
652 if !inner.data.is_empty() {
653 DATA_COUNT.dec();
654 did_shrink = true;
655 }
656 if !inner.data.is_empty() {
657 inner.data = NO_DATA.clone();
658 did_shrink = true;
659 }
660 Ok(did_shrink)
661 }
662
663 pub fn shrink(&self) -> anyhow::Result<bool> {
664 let mut inner = self.msg_and_id.inner.lock();
665 let mut did_shrink = false;
666 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
667 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
668 }
669 if inner.flags.contains(MessageFlags::META_DIRTY) {
670 anyhow::bail!("Cannot shrink message: META_DIRTY");
671 }
672 if inner.metadata.take().is_some() {
673 META_COUNT.dec();
674 did_shrink = true;
675 }
676 if !inner.data.is_empty() {
677 DATA_COUNT.dec();
678 did_shrink = true;
679 }
680 if !inner.data.is_empty() {
681 inner.data = NO_DATA.clone();
682 did_shrink = true;
683 }
684 Ok(did_shrink)
685 }
686
687 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
688 self.load_meta_if_needed().await?;
689 let inner = self.msg_and_id.inner.lock();
690 match &inner.metadata {
691 Some(meta) => Ok(meta.sender.clone()),
692 None => anyhow::bail!("Message::sender metadata is not loaded"),
693 }
694 }
695
696 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
697 self.load_meta_if_needed().await?;
698 let mut inner = self.msg_and_id.inner.lock();
699 match &mut inner.metadata {
700 Some(meta) => {
701 meta.sender = sender;
702 inner.flags.set(MessageFlags::DATA_DIRTY, true);
703 Ok(())
704 }
705 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
706 }
707 }
708
709 #[deprecated = "use recipient_list or first_recipient instead"]
710 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
711 self.first_recipient().await
712 }
713
714 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
715 self.load_meta_if_needed().await?;
716 let inner = self.msg_and_id.inner.lock();
717 match &inner.metadata {
718 Some(meta) => match meta.recipient.first() {
719 Some(recip) => Ok(recip.clone()),
720 None => anyhow::bail!("recipient list is empty!?"),
721 },
722 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
723 }
724 }
725
726 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
727 self.load_meta_if_needed().await?;
728 let inner = self.msg_and_id.inner.lock();
729 match &inner.metadata {
730 Some(meta) => Ok(meta.recipient.clone()),
731 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
732 }
733 }
734
735 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
736 self.load_meta_if_needed().await?;
737 let inner = self.msg_and_id.inner.lock();
738 match &inner.metadata {
739 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
740 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
741 }
742 }
743
744 #[deprecated = "use set_recipient_list instead"]
745 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
746 self.set_recipient_list(vec![recipient]).await
747 }
748
749 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
750 self.load_meta_if_needed().await?;
751
752 let mut inner = self.msg_and_id.inner.lock();
753 match &mut inner.metadata {
754 Some(meta) => {
755 meta.recipient = recipient;
756 inner.flags.set(MessageFlags::DATA_DIRTY, true);
757 Ok(())
758 }
759 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
760 }
761 }
762
763 pub fn is_meta_loaded(&self) -> bool {
764 self.msg_and_id.inner.lock().metadata.is_some()
765 }
766
767 pub fn is_data_loaded(&self) -> bool {
768 !self.msg_and_id.inner.lock().data.is_empty()
769 }
770
771 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
772 if self.is_meta_loaded() {
773 return Ok(());
774 }
775 self.load_meta().await
776 }
777
778 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
779 self.load_data_if_needed().await
780 }
781
782 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
783 if self.is_data_loaded() {
784 return Ok(self.get_data_maybe_not_loaded());
785 }
786 self.load_data().await
787 }
788
789 pub async fn load_meta(&self) -> anyhow::Result<()> {
790 let _timer = LOAD_META_HIST.start_timer();
791 self.load_meta_from(&**get_meta_spool()).await
792 }
793
794 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
795 let id = self.id();
796 let data = meta_spool.load(*id).await?;
797 let mut inner = self.msg_and_id.inner.lock();
798 let was_not_loaded = inner.metadata.is_none();
799 let metadata: MetaData = serde_json::from_slice(&data)?;
800 inner.metadata.replace(Box::new(metadata));
801 if was_not_loaded {
802 META_COUNT.inc();
803 }
804 Ok(())
805 }
806
807 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
808 let _timer = LOAD_DATA_HIST.start_timer();
809 self.load_data_from(&**get_data_spool()).await
810 }
811
812 async fn load_data_from(
813 &self,
814 data_spool: &(dyn Spool + Send + Sync),
815 ) -> anyhow::Result<Arc<Box<[u8]>>> {
816 let data = data_spool.load(*self.id()).await?;
817 let mut inner = self.msg_and_id.inner.lock();
818 let was_empty = inner.data.is_empty();
819 inner.data = Arc::new(data.into_boxed_slice());
820 if was_empty {
821 DATA_COUNT.inc();
822 }
823 Ok(inner.data.clone())
824 }
825
826 pub fn assign_data(&self, data: Vec<u8>) {
827 let mut inner = self.msg_and_id.inner.lock();
828 let was_empty = inner.data.is_empty();
829 inner.data = Arc::new(data.into_boxed_slice());
830 inner.flags.set(MessageFlags::DATA_DIRTY, true);
831 if was_empty {
832 DATA_COUNT.inc();
833 }
834 }
835
836 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
837 let inner = self.msg_and_id.inner.lock();
838 inner.data.clone()
839 }
840
841 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
842 &self,
843 key: S,
844 value: V,
845 ) -> anyhow::Result<()> {
846 self.load_meta_if_needed().await?;
847 let mut inner = self.msg_and_id.inner.lock();
848 match &mut inner.metadata {
849 None => anyhow::bail!("set_meta: metadata must be loaded first"),
850 Some(meta) => {
851 let key = key.as_ref();
852 let value = value.into();
853
854 match &mut meta.meta {
855 serde_json::Value::Object(map) => {
856 map.insert(key.to_string(), value);
857 }
858 _ => anyhow::bail!("metadata is somehow not a json object"),
859 }
860
861 inner.flags.set(MessageFlags::META_DIRTY, true);
862 Ok(())
863 }
864 }
865 }
866
867 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
868 self.load_meta_if_needed().await?;
869 let mut inner = self.msg_and_id.inner.lock();
870 match &mut inner.metadata {
871 None => anyhow::bail!("set_meta: metadata must be loaded first"),
872 Some(meta) => {
873 let key = key.as_ref();
874
875 match &mut meta.meta {
876 serde_json::Value::Object(map) => {
877 map.remove(key);
878 }
879 _ => anyhow::bail!("metadata is somehow not a json object"),
880 }
881
882 inner.flags.set(MessageFlags::META_DIRTY, true);
883 Ok(())
884 }
885 }
886 }
887
888 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
890 &self,
891 key: S,
892 ) -> anyhow::Result<Option<String>> {
893 match self.get_meta(key).await {
894 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
895 Ok(serde_json::Value::Null) => Ok(None),
896 hmm => {
897 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
898 }
899 }
900 }
901
902 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
903 self.load_meta_if_needed().await?;
904 let inner = self.msg_and_id.inner.lock();
905 match &inner.metadata {
906 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
907 Some(meta) => Ok(meta.meta.clone()),
908 }
909 }
910
911 pub async fn get_meta<S: serde_json::value::Index>(
912 &self,
913 key: S,
914 ) -> anyhow::Result<serde_json::Value> {
915 self.load_meta_if_needed().await?;
916 let inner = self.msg_and_id.inner.lock();
917 match &inner.metadata {
918 None => anyhow::bail!("get_meta: metadata must be loaded first"),
919 Some(meta) => match meta.meta.get(key) {
920 Some(value) => Ok(value.clone()),
921 None => Ok(serde_json::Value::Null),
922 },
923 }
924 }
925
926 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
927 self.msg_and_id.id.age(now)
928 }
929
930 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
931 Ok(match self.get_meta_string("queue").await? {
932 Some(name) => name,
933 None => {
934 let name = QueueNameComponents::format(
935 self.get_meta_string("campaign").await?,
936 self.get_meta_string("tenant").await?,
937 self.first_recipient()
938 .await?
939 .domain()
940 .to_string()
941 .to_lowercase(),
942 self.get_meta_string("routing_domain").await?,
943 );
944 name.to_string()
945 }
946 })
947 }
948
949 #[cfg(feature = "impl")]
950 pub async fn arc_verify(
951 &self,
952 opt_resolver_name: Option<String>,
953 ) -> anyhow::Result<AuthenticationResult> {
954 let resolver = get_resolver_instance(&opt_resolver_name)?;
955 let data = self.data().await?;
956 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
957
958 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
959 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
960
961 let arc = ARC::verify(&message, &**resolver).await;
962 Ok(arc.authentication_result())
963 }
964
965 #[cfg(feature = "impl")]
966 pub async fn arc_seal(
967 &self,
968 signer: Signer,
969 auth_results: AuthenticationResults,
970 opt_resolver_name: Option<String>,
971 ) -> anyhow::Result<()> {
972 let resolver = get_resolver_instance(&opt_resolver_name)?;
973 let data = self.data().await?;
974 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
975 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
976 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
977 let arc = ARC::verify(&message, &**resolver).await;
978
979 let headers = arc.seal(&message, auth_results, signer.signer())?;
980 if !headers.is_empty() {
981 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
982
983 for hdr in headers {
984 hdr.write_header(&mut new_data).ok();
985 }
986 new_data.extend_from_slice(&data);
987 self.assign_data(new_data);
988 }
989
990 Ok(())
991 }
992
993 #[cfg(feature = "impl")]
994 pub async fn dkim_verify(
995 &self,
996 opt_resolver_name: Option<String>,
997 ) -> anyhow::Result<Vec<AuthenticationResult>> {
998 let resolver = get_resolver_instance(&opt_resolver_name)?;
999 let data = self.data().await?;
1000 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1001
1002 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1003 if parsed
1004 .overall_conformance
1005 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1006 {
1007 return Ok(vec![AuthenticationResult {
1008 method: "dkim".to_string(),
1009 method_version: None,
1010 result: "permerror".to_string(),
1011 reason: Some("message has non-canonical line endings".to_string()),
1012 props: Default::default(),
1013 }]);
1014 }
1015 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1016
1017 let from = match message.get_headers().from() {
1018 Ok(Some(from)) if from.0.len() == 1 => from.0,
1019 Ok(Some(from)) => {
1020 return Ok(vec![AuthenticationResult {
1021 method: "dkim".to_string(),
1022 method_version: None,
1023 result: "permerror".to_string(),
1024 reason: Some(format!(
1025 "From header must have a single sender, found {}",
1026 from.len()
1027 )),
1028 props: Default::default(),
1029 }]);
1030 }
1031 Ok(None) => {
1032 return Ok(vec![AuthenticationResult {
1033 method: "dkim".to_string(),
1034 method_version: None,
1035 result: "permerror".to_string(),
1036 reason: Some("From header is missing".to_string()),
1037 props: Default::default(),
1038 }]);
1039 }
1040 Err(err) => {
1041 return Ok(vec![AuthenticationResult {
1042 method: "dkim".to_string(),
1043 method_version: None,
1044 result: "permerror".to_string(),
1045 reason: Some(format!("From header is invalid: {err:#}")),
1046 props: Default::default(),
1047 }]);
1048 }
1049 };
1050 let from_domain = &from[0].address.domain;
1051
1052 let results =
1053 kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1054 Ok(results)
1055 }
1056
1057 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1062 let data = self.data().await?;
1063 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1064 Ok(MimePart::parse(owned_data)?)
1065 }
1066
1067 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1068 let data = self.data().await?;
1069 Report::parse(&data)
1070 }
1071
1072 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1073 let data = self.data().await?;
1074 ARFReport::parse(&data)
1075 }
1076
1077 pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1078 let data = self.data().await?;
1079 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1080 emit_header(&mut new_data, name, value);
1081 new_data.extend_from_slice(&data);
1082 self.assign_data(new_data);
1083 Ok(())
1084 }
1085
1086 pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1087 let data = self.data().await?;
1088 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1089 for (idx, window) in data.windows(4).enumerate() {
1090 if window == b"\r\n\r\n" {
1091 let headers = &data[0..idx + 2];
1092 let body = &data[idx + 2..];
1093
1094 new_data.extend_from_slice(headers);
1095 emit_header(&mut new_data, name, value);
1096 new_data.extend_from_slice(body);
1097 self.assign_data(new_data);
1098 return Ok(());
1099 }
1100 }
1101
1102 anyhow::bail!("append_header could not find the end of the header block");
1103 }
1104
1105 pub async fn get_address_header(
1106 &self,
1107 header_name: &str,
1108 ) -> anyhow::Result<Option<HeaderAddressList>> {
1109 let data = self.data().await?;
1110 let HeaderParseResult { headers, .. } =
1111 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1112
1113 match headers.get_first(header_name) {
1114 Some(hdr) => {
1115 let list = hdr.as_address_list()?;
1116 let result: HeaderAddressList = list.into();
1117 Ok(Some(result))
1118 }
1119 None => Ok(None),
1120 }
1121 }
1122
1123 pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1124 let data = self.data().await?;
1125 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1126
1127 match headers.get_first(name) {
1128 Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1129 None => Ok(None),
1130 }
1131 }
1132
1133 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1134 let data = self.data().await?;
1135 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1136
1137 let mut values = vec![];
1138 for hdr in headers.iter_named(name) {
1139 values.push(hdr.as_unstructured()?);
1140 }
1141 Ok(values)
1142 }
1143
1144 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1145 let data = self.data().await?;
1146 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1147
1148 let mut values = vec![];
1149 for hdr in headers.iter() {
1150 values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1151 }
1152 Ok(values)
1153 }
1154
1155 pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1156 &self,
1157 mut func: F,
1158 ) -> anyhow::Result<()> {
1159 let data = self.data().await?;
1160 let mut new_data = Vec::with_capacity(data.len());
1161 let HeaderParseResult {
1162 headers,
1163 body_offset,
1164 ..
1165 } = Header::parse_headers(data.as_ref().as_ref())?;
1166 for hdr in headers.iter() {
1167 let retain = (func)(hdr);
1168 if !retain {
1169 continue;
1170 }
1171 hdr.write_header(&mut new_data)?;
1172 }
1173 new_data.extend_from_slice(b"\r\n");
1174 new_data.extend_from_slice(&data[body_offset..]);
1175 self.assign_data(new_data);
1176 Ok(())
1177 }
1178
1179 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1180 let mut removed = false;
1181 self.retain_headers(|hdr| {
1182 if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1183 removed = true;
1184 false
1185 } else {
1186 true
1187 }
1188 })
1189 .await
1190 }
1191
1192 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1193 let data = self.data().await?;
1194 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1195
1196 for hdr in headers.iter() {
1197 let do_import = if names.is_empty() {
1198 is_x_header(hdr.get_name())
1199 } else {
1200 is_header_in_names_list(hdr.get_name(), &names)
1201 };
1202 if do_import {
1203 let name = imported_header_name(hdr.get_name());
1204 self.set_meta(name, hdr.as_unstructured()?).await?;
1205 }
1206 }
1207
1208 Ok(())
1209 }
1210
1211 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1212 self.retain_headers(|hdr| {
1213 if names.is_empty() {
1214 !is_x_header(hdr.get_name())
1215 } else {
1216 !is_header_in_names_list(hdr.get_name(), &names)
1217 }
1218 })
1219 .await
1220 }
1221
1222 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1223 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1224 .await
1225 }
1226
1227 #[cfg(feature = "impl")]
1228 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1229 let data = self.data().await?;
1230 let header = if let Some(runtime) = SIGN_POOL.get() {
1231 runtime.spawn_blocking(move || signer.sign(&data)).await??
1232 } else {
1233 signer.sign(&data)?
1234 };
1235 self.prepend_header(None, &header).await?;
1236 Ok(())
1237 }
1238
1239 pub async fn import_scheduling_header(
1240 &self,
1241 header_name: &str,
1242 remove: bool,
1243 ) -> anyhow::Result<Option<Scheduling>> {
1244 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1245 let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1246 format!("{value} from header {header_name} is not a valid Scheduling header")
1247 })?;
1248 let result = self.set_scheduling(Some(sched)).await?;
1249
1250 if remove {
1251 self.remove_all_named_headers(header_name).await?;
1252 }
1253
1254 Ok(result)
1255 } else {
1256 Ok(None)
1257 }
1258 }
1259
1260 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1261 let data = self.data().await?;
1262 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1263 let parts = msg.simplified_structure_pointers()?;
1264 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1265 match p.body()? {
1266 DecodedBody::Text(text) => {
1267 let mut text = text.as_str().to_string();
1268 text.push_str("\r\n");
1269 text.push_str(content);
1270 p.replace_text_body("text/plain", &text)?;
1271
1272 let new_data = msg.to_message_string();
1273 self.assign_data(new_data.into_bytes());
1274 Ok(true)
1275 }
1276 DecodedBody::Binary(_) => {
1277 anyhow::bail!("expected text/plain part to be text, but it is binary");
1278 }
1279 }
1280 } else {
1281 Ok(false)
1282 }
1283 }
1284
1285 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1286 let data = self.data().await?;
1287 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1288 let parts = msg.simplified_structure_pointers()?;
1289 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1290 match p.body()? {
1291 DecodedBody::Text(text) => {
1292 let mut text = text.as_str().to_string();
1293
1294 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1295 Some(idx) => {
1296 text.insert_str(idx, content);
1297 text.insert_str(idx, "\r\n");
1298 }
1299 None => {
1300 text.push_str("\r\n");
1302 text.push_str(content);
1303 }
1304 }
1305
1306 p.replace_text_body("text/html", &text)?;
1307
1308 let new_data = msg.to_message_string();
1309 self.assign_data(new_data.into_bytes());
1310 Ok(true)
1311 }
1312 DecodedBody::Binary(_) => {
1313 anyhow::bail!("expected text/html part to be text, but it is binary");
1314 }
1315 }
1316 } else {
1317 Ok(false)
1318 }
1319 }
1320
1321 pub async fn check_fix_conformance(
1322 &self,
1323 check: MessageConformance,
1324 fix: MessageConformance,
1325 settings: Option<&CheckFixSettings>,
1326 ) -> anyhow::Result<()> {
1327 let data = self.data().await?;
1328 let data_bytes = data.as_ref().as_ref();
1329 let msg = MimePart::parse(data_bytes)?;
1330
1331 let mut settings = settings.map(Clone::clone).unwrap_or_default();
1332
1333 if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1334 && settings.message_id.is_none()
1335 && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1336 {
1337 let sender = self.sender().await?;
1338 let domain = sender.domain();
1339 let id = *self.id();
1340 settings.message_id.replace(format!("{id}@{domain}"));
1341 }
1342
1343 if settings.detect_encoding {
1344 settings.data_bytes.replace(data.clone());
1345 }
1346
1347 let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1348
1349 if let Some(msg) = opt_msg {
1350 let new_data = msg.to_message_string();
1351 self.assign_data(new_data.into_bytes());
1352 }
1353
1354 Ok(())
1355 }
1356}
1357
1358fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1359 for name in names {
1360 if hdr_name.eq_ignore_ascii_case(name) {
1361 return true;
1362 }
1363 }
1364 false
1365}
1366
1367fn imported_header_name(name: &str) -> String {
1368 name.chars()
1369 .map(|c| match c.to_ascii_lowercase() {
1370 '-' => '_',
1371 c => c,
1372 })
1373 .collect()
1374}
1375
1376fn is_x_header(name: &str) -> bool {
1377 name.starts_with("X-") || name.starts_with("x-")
1378}
1379
1380fn size_header(name: Option<&str>, value: &str) -> usize {
1381 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1382}
1383
1384fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1385 if let Some(name) = name {
1386 dest.extend_from_slice(name.as_bytes());
1387 dest.extend_from_slice(b": ");
1388 }
1389 dest.extend_from_slice(value.as_bytes());
1390 if !value.ends_with("\r\n") {
1391 dest.extend_from_slice(b"\r\n");
1392 }
1393}
1394
1395#[cfg(feature = "impl")]
1396impl UserData for Message {
1397 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1398 methods.add_async_method(
1399 "set_meta",
1400 move |_, this, (name, value): (String, mlua::Value)| async move {
1401 let value = serde_json::value::to_value(value).map_err(any_err)?;
1402 this.set_meta(name, value).await.map_err(any_err)?;
1403 Ok(())
1404 },
1405 );
1406 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1407 let value = this.get_meta(name).await.map_err(any_err)?;
1408 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1409 });
1410 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1411 let data = this.data().await.map_err(any_err)?;
1412 lua.create_string(&*data)
1413 });
1414 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1415 this.assign_data(data.as_bytes().to_vec());
1416 Ok(())
1417 });
1418
1419 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1420 let data = this.data().await.map_err(any_err)?;
1421 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1422 let part = MimePart::parse(owned_data).map_err(any_err)?;
1423 Ok(mod_mimepart::PartRef::new(part))
1424 });
1425
1426 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1427 this.append_text_plain(&data).await.map_err(any_err)
1428 });
1429
1430 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1431 this.append_text_html(&data).await.map_err(any_err)
1432 });
1433
1434 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1435 methods.add_async_method("sender", move |_, this, _: ()| async move {
1436 this.sender().await.map_err(any_err)
1437 });
1438
1439 methods.add_method("num_attempts", move |_, this, _: ()| {
1440 Ok(this.get_num_attempts())
1441 });
1442 methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1443 Ok(this.increment_num_attempts())
1444 });
1445
1446 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1447 this.get_queue_name().await.map_err(any_err)
1448 });
1449
1450 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1451 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1452 let revised_due = this.set_due(due).await.map_err(any_err)?;
1453 lua.to_value(&revised_due)
1454 });
1455
1456 methods.add_async_method(
1457 "set_sender",
1458 move |lua, this, value: mlua::Value| async move {
1459 let sender = match value {
1460 mlua::Value::String(s) => {
1461 let s = s.to_str()?;
1462 EnvelopeAddress::parse(&s).map_err(any_err)?
1463 }
1464 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1465 };
1466 this.set_sender(sender).await.map_err(any_err)
1467 },
1468 );
1469
1470 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1471 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1472 match recipients.len() {
1473 0 => Ok(mlua::Value::Nil),
1474 1 => {
1475 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1476 recip.into_lua(&lua)
1477 }
1478 _ => recipients.into_lua(&lua),
1479 }
1480 });
1481
1482 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1483 let recipients = this.recipient_list().await.map_err(any_err)?;
1484 recipients.into_lua(&lua)
1485 });
1486
1487 methods.add_async_method(
1488 "set_recipient",
1489 move |lua, this, value: mlua::Value| async move {
1490 let recipients = match value {
1491 mlua::Value::String(s) => {
1492 let s = s.to_str()?;
1493 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1494 }
1495 _ => {
1496 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1497 recips
1498 } else {
1499 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1500 }
1501 }
1502 };
1503 this.set_recipient_list(recipients).await.map_err(any_err)
1504 },
1505 );
1506
1507 #[cfg(feature = "impl")]
1508 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1509 this.dkim_sign(signer).await.map_err(any_err)
1510 });
1511
1512 methods.add_async_method("shrink", |_, this, _: ()| async move {
1513 if this.needs_save() {
1514 this.save(None).await.map_err(any_err)?;
1515 }
1516 this.shrink().map_err(any_err)
1517 });
1518
1519 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1520 if this.needs_save() {
1521 this.save(None).await.map_err(any_err)?;
1522 }
1523 this.shrink_data().map_err(any_err)
1524 });
1525
1526 methods.add_async_method(
1527 "add_authentication_results",
1528 |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1529 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1530 let results = AuthenticationResults {
1531 serv_id,
1532 version: None,
1533 results,
1534 };
1535
1536 this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1537 .await
1538 .map_err(any_err)?;
1539
1540 Ok(())
1541 },
1542 );
1543
1544 #[cfg(feature = "impl")]
1545 methods.add_async_method(
1546 "arc_verify",
1547 |lua, this, opt_resolver_name: Option<String>| async move {
1548 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1549 lua.to_value_with(&results, serialize_options())
1550 },
1551 );
1552
1553 #[cfg(feature = "impl")]
1554 methods.add_async_method(
1555 "arc_seal",
1556 |lua,
1557 this,
1558 (signer, serv_id, auth_res, opt_resolver_name): (
1559 Signer,
1560 String,
1561 mlua::Value,
1562 Option<String>,
1563 )| async move {
1564 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1565 this.arc_seal(
1566 signer,
1567 AuthenticationResults {
1568 serv_id,
1569 version: None,
1570 results,
1571 },
1572 opt_resolver_name,
1573 )
1574 .await
1575 .map_err(any_err)
1576 },
1577 );
1578
1579 #[cfg(feature = "impl")]
1580 methods.add_async_method(
1581 "dkim_verify",
1582 |lua, this, opt_resolver_name: Option<String>| async move {
1583 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1584 lua.to_value_with(&results, serialize_options())
1585 },
1586 );
1587
1588 methods.add_async_method(
1589 "prepend_header",
1590 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1591 let encode = encode.unwrap_or(false);
1592 if encode {
1593 let header = Header::new_unstructured(name.clone(), value);
1594 this.prepend_header(Some(&name), header.get_raw_value())
1595 .await
1596 .map_err(any_err)?;
1597 } else {
1598 this.prepend_header(Some(&name), &value)
1599 .await
1600 .map_err(any_err)?;
1601 }
1602 Ok(())
1603 },
1604 );
1605 methods.add_async_method(
1606 "append_header",
1607 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1608 let encode = encode.unwrap_or(false);
1609 if encode {
1610 let header = Header::new_unstructured(name.clone(), value);
1611 this.append_header(Some(&name), header.get_raw_value())
1612 .await
1613 .map_err(any_err)?;
1614 } else {
1615 this.append_header(Some(&name), &value)
1616 .await
1617 .map_err(any_err)?;
1618 }
1619 Ok(())
1620 },
1621 );
1622 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1623 this.get_address_header(&name).await.map_err(any_err)
1624 });
1625 methods.add_async_method("from_header", |_, this, ()| async move {
1626 this.get_address_header("From").await.map_err(any_err)
1627 });
1628 methods.add_async_method("to_header", |_, this, ()| async move {
1629 this.get_address_header("To").await.map_err(any_err)
1630 });
1631
1632 methods.add_async_method(
1633 "get_first_named_header_value",
1634 |_, this, name: String| async move {
1635 this.get_first_named_header_value(&name)
1636 .await
1637 .map_err(any_err)
1638 },
1639 );
1640 methods.add_async_method(
1641 "get_all_named_header_values",
1642 |_, this, name: String| async move {
1643 this.get_all_named_header_values(&name)
1644 .await
1645 .map_err(any_err)
1646 },
1647 );
1648 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1649 Ok(this
1650 .get_all_headers()
1651 .await
1652 .map_err(any_err)?
1653 .into_iter()
1654 .map(|(name, value)| vec![name, value])
1655 .collect::<Vec<Vec<String>>>())
1656 });
1657 methods.add_async_method(
1658 "import_x_headers",
1659 |_, this, names: Option<Vec<String>>| async move {
1660 this.import_x_headers(names.unwrap_or_default())
1661 .await
1662 .map_err(any_err)
1663 },
1664 );
1665
1666 methods.add_async_method(
1667 "remove_x_headers",
1668 |_, this, names: Option<Vec<String>>| async move {
1669 this.remove_x_headers(names.unwrap_or_default())
1670 .await
1671 .map_err(any_err)
1672 },
1673 );
1674 methods.add_async_method(
1675 "remove_all_named_headers",
1676 |_, this, name: String| async move {
1677 this.remove_all_named_headers(&name).await.map_err(any_err)
1678 },
1679 );
1680
1681 methods.add_async_method(
1682 "import_scheduling_header",
1683 |lua, this, (header_name, remove): (String, bool)| async move {
1684 let opt_schedule = this
1685 .import_scheduling_header(&header_name, remove)
1686 .await
1687 .map_err(any_err)?;
1688 lua.to_value(&opt_schedule)
1689 },
1690 );
1691
1692 methods.add_async_method(
1693 "set_scheduling",
1694 move |lua, this, params: mlua::Value| async move {
1695 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1696 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1697 lua.to_value(&opt_schedule)
1698 },
1699 );
1700
1701 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1702 let report = this.parse_rfc3464().await.map_err(any_err)?;
1703 match report {
1704 Some(report) => lua.to_value_with(&report, serialize_options()),
1705 None => Ok(mlua::Value::Nil),
1706 }
1707 });
1708
1709 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1710 let report = this.parse_rfc5965().await.map_err(any_err)?;
1711 match report {
1712 Some(report) => lua.to_value_with(&report, serialize_options()),
1713 None => Ok(mlua::Value::Nil),
1714 }
1715 });
1716
1717 methods.add_async_method("save", |_, this, ()| async move {
1718 this.save(None).await.map_err(any_err)
1719 });
1720
1721 methods.add_method("set_force_sync", move |_, this, force: bool| {
1722 this.set_force_sync(force);
1723 Ok(())
1724 });
1725
1726 methods.add_async_method(
1727 "check_fix_conformance",
1728 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1729 use std::str::FromStr;
1730 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1731 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1732
1733 let settings = match settings {
1734 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1735 None => None,
1736 };
1737
1738 match this
1739 .check_fix_conformance(check, fix, settings.as_ref())
1740 .await
1741 {
1742 Ok(_) => Ok(None),
1743 Err(err) => Ok(Some(format!("{err:#}"))),
1744 }
1745 },
1746 );
1747 }
1748}
1749
1750impl TimerEntryWithDelay for WeakMessage {
1751 fn delay(&self) -> Duration {
1752 match self.upgrade() {
1753 None => {
1754 Duration::from_millis(0)
1756 }
1757 Some(msg) => msg.delay(),
1758 }
1759 }
1760}
1761
1762impl TimerEntryWithDelay for Message {
1763 fn delay(&self) -> Duration {
1764 let inner = self.msg_and_id.inner.lock();
1765 match inner.due {
1766 Some(time) => {
1767 let now = Utc::now();
1768 let delta = time - now;
1769 delta.to_std().unwrap_or(Duration::from_millis(0))
1770 }
1771 None => Duration::from_millis(0),
1772 }
1773 }
1774}
1775
1776#[cfg(test)]
1777pub(crate) mod test {
1778 use super::*;
1779 use serde_json::json;
1780
1781 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1782 Message::new_dirty(
1783 SpoolId::new(),
1784 EnvelopeAddress::parse("sender@example.com").unwrap(),
1785 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1786 serde_json::json!({}),
1787 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1788 )
1789 .unwrap()
1790 }
1791
1792 fn data_as_string(msg: &Message) -> String {
1793 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1794 }
1795
1796 const X_HDR_CONTENT: &str =
1797 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1798
1799 #[tokio::test]
1800 async fn import_all_x_headers() {
1801 let msg = new_msg_body(X_HDR_CONTENT);
1802
1803 msg.import_x_headers(vec![]).await.unwrap();
1804 k9::assert_equal!(
1805 msg.get_meta_obj().await.unwrap(),
1806 json!({
1807 "x_hello": "there",
1808 "x_header": "value",
1809 })
1810 );
1811 }
1812
1813 #[tokio::test]
1814 async fn meta_and_nil() {
1815 let msg = new_msg_body(X_HDR_CONTENT);
1816 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1818 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1819
1820 let lua = mlua::Lua::new();
1822 lua.globals().set("msg", msg).unwrap();
1823 lua.load("assert(msg:get_meta('test') == nil)")
1824 .exec()
1825 .unwrap();
1826 }
1827
1828 #[tokio::test]
1829 async fn import_some_x_headers() {
1830 let msg = new_msg_body(X_HDR_CONTENT);
1831
1832 msg.import_x_headers(vec!["x-hello".to_string()])
1833 .await
1834 .unwrap();
1835 k9::assert_equal!(
1836 msg.get_meta_obj().await.unwrap(),
1837 json!({
1838 "x_hello": "there",
1839 })
1840 );
1841 }
1842
1843 #[tokio::test]
1844 async fn remove_all_x_headers() {
1845 let msg = new_msg_body(X_HDR_CONTENT);
1846
1847 msg.remove_x_headers(vec![]).await.unwrap();
1848 k9::assert_equal!(
1849 data_as_string(&msg),
1850 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1851 );
1852 }
1853
1854 #[tokio::test]
1855 async fn prepend_header_2_params() {
1856 let msg = new_msg_body(X_HDR_CONTENT);
1857
1858 msg.prepend_header(Some("Date"), "Today").await.unwrap();
1859 k9::assert_equal!(
1860 data_as_string(&msg),
1861 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1862 );
1863 }
1864
1865 #[tokio::test]
1866 async fn prepend_header_1_params() {
1867 let msg = new_msg_body(X_HDR_CONTENT);
1868
1869 msg.prepend_header(None, "Date: Today").await.unwrap();
1870 k9::assert_equal!(
1871 data_as_string(&msg),
1872 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1873 );
1874 }
1875
1876 #[tokio::test]
1877 async fn append_header_2_params() {
1878 let msg = new_msg_body(X_HDR_CONTENT);
1879
1880 msg.append_header(Some("Date"), "Today").await.unwrap();
1881 k9::assert_equal!(
1882 data_as_string(&msg),
1883 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1884 );
1885 }
1886
1887 #[tokio::test]
1888 async fn append_header_1_params() {
1889 let msg = new_msg_body(X_HDR_CONTENT);
1890
1891 msg.append_header(None, "Date: Today").await.unwrap();
1892 k9::assert_equal!(
1893 data_as_string(&msg),
1894 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1895 );
1896 }
1897
1898 const MULTI_HEADER_CONTENT: &str =
1899 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1900
1901 #[tokio::test]
1902 async fn get_first_header() {
1903 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1904 k9::assert_equal!(
1905 msg.get_first_named_header_value("X-header")
1906 .await
1907 .unwrap()
1908 .unwrap(),
1909 "value"
1910 );
1911 }
1912
1913 #[tokio::test]
1914 async fn get_all_header() {
1915 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1916 k9::assert_equal!(
1917 msg.get_all_named_header_values("X-header").await.unwrap(),
1918 vec!["value".to_string(), "another value".to_string()]
1919 );
1920 }
1921
1922 #[tokio::test]
1923 async fn remove_first() {
1924 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1925 msg.remove_first_named_header("X-header").await.unwrap();
1926 k9::assert_equal!(
1927 data_as_string(&msg),
1928 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1929 );
1930 }
1931
1932 #[tokio::test]
1933 async fn remove_all() {
1934 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1935 msg.remove_all_named_headers("X-header").await.unwrap();
1936 k9::assert_equal!(
1937 data_as_string(&msg),
1938 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1939 );
1940 }
1941
1942 #[tokio::test]
1943 async fn append_text_plain() {
1944 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1945 msg.append_text_plain("I am at the bottom").await.unwrap();
1946 k9::assert_equal!(
1947 data_as_string(&msg),
1948 "X-Hello: there\r\n\
1949 X-Header: value\r\n\
1950 Subject: Hello\r\n\
1951 X-Header: another value\r\n\
1952 From :Someone@somewhere\r\n\
1953 Content-Type: text/plain;\r\n\
1954 \tcharset=\"us-ascii\"\r\n\
1955 \r\n\
1956 Body\r\n\
1957 I am at the bottom\r\n"
1958 );
1959 }
1960
1961 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1962\tboundary=\"my-boundary\"\r\n\
1963\r\n\
1964--my-boundary\r\n\
1965Content-Type: text/plain;\r\n\
1966\tcharset=\"us-ascii\"\r\n\
1967\r\n\
1968plain text\r\n\
1969--my-boundary\r\n\
1970Content-Type: text/html;\r\n\
1971\tcharset=\"us-ascii\"\r\n\
1972\r\n\
1973<b>rich</b> text\r\n\
1974--my-boundary\r\n\
1975Content-Type: application/octet-stream\r\n\
1976Content-Transfer-Encoding: base64\r\n\
1977Content-Disposition: attachment;\r\n\
1978\tfilename=\"woot.bin\"\r\n\
1979Content-ID: <woot.id@somewhere>\r\n\
1980\r\n\
1981AAECAw==\r\n\
1982--my-boundary--\r\n\
1983\r\n";
1984
1985 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1986\tboundary=\"my-boundary\"\r\n\
1987\r\n\
1988--my-boundary\r\n\
1989Content-Type: text/plain;\r\n\
1990\tcharset=\"us-ascii\"\r\n\
1991\r\n\
1992plain text\r\n\
1993--my-boundary\r\n\
1994Content-Type: text/html;\r\n\
1995\tcharset=\"us-ascii\"\r\n\
1996\r\n\
1997<BODY>\r\n\
1998<b>rich</b> text\r\n\
1999</BODY>\r\n\
2000--my-boundary\r\n\
2001Content-Type: application/octet-stream\r\n\
2002Content-Transfer-Encoding: base64\r\n\
2003Content-Disposition: attachment;\r\n\
2004\tfilename=\"woot.bin\"\r\n\
2005Content-ID: <woot.id>\r\n\
2006\r\n\
2007AAECAw==\r\n\
2008--my-boundary--\r\n\
2009\r\n";
2010
2011 #[tokio::test]
2012 async fn append_text_html() {
2013 let msg = new_msg_body(MIXED_CONTENT);
2014 msg.append_text_html("bottom html").await.unwrap();
2015 k9::snapshot!(
2016 data_as_string(&msg),
2017 r#"
2018Content-Type: multipart/mixed;\r
2019\tboundary="my-boundary"\r
2020\r
2021--my-boundary\r
2022Content-Type: text/plain;\r
2023\tcharset="us-ascii"\r
2024\r
2025plain text\r
2026--my-boundary\r
2027Content-Type: text/html;\r
2028\tcharset="us-ascii"\r
2029\r
2030<b>rich</b> text\r
2031\r
2032bottom html\r
2033--my-boundary\r
2034Content-Type: application/octet-stream\r
2035Content-Transfer-Encoding: base64\r
2036Content-Disposition: attachment;\r
2037\tfilename="woot.bin"\r
2038Content-ID: <woot.id@somewhere>\r
2039\r
2040AAECAw==\r
2041--my-boundary--\r
2042\r
2043
2044"#
2045 );
2046
2047 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2048 msg.append_text_html("bottom html 👻").await.unwrap();
2049 k9::snapshot!(
2050 data_as_string(&msg),
2051 r#"
2052Content-Type: multipart/mixed;\r
2053\tboundary="my-boundary"\r
2054\r
2055--my-boundary\r
2056Content-Type: text/plain;\r
2057\tcharset="us-ascii"\r
2058\r
2059plain text\r
2060--my-boundary\r
2061Content-Type: text/html;\r
2062\tcharset="utf-8"\r
2063Content-Transfer-Encoding: quoted-printable\r
2064\r
2065<BODY>\r
2066<b>rich</b> text\r
2067\r
2068bottom html =F0=9F=91=BB</BODY>\r
2069--my-boundary\r
2070Content-Type: application/octet-stream\r
2071Content-Transfer-Encoding: base64\r
2072Content-Disposition: attachment;\r
2073\tfilename="woot.bin"\r
2074Content-ID: <woot.id>\r
2075\r
2076AAECAw==\r
2077--my-boundary--\r
2078\r
2079
2080"#
2081 );
2082 }
2083
2084 #[tokio::test]
2085 async fn append_text_plain_mixed() {
2086 let msg = new_msg_body(MIXED_CONTENT);
2087 msg.append_text_plain("bottom text 👾").await.unwrap();
2088 k9::snapshot!(
2089 data_as_string(&msg),
2090 r#"
2091Content-Type: multipart/mixed;\r
2092\tboundary="my-boundary"\r
2093\r
2094--my-boundary\r
2095Content-Type: text/plain;\r
2096\tcharset="utf-8"\r
2097Content-Transfer-Encoding: quoted-printable\r
2098\r
2099plain text\r
2100\r
2101bottom text =F0=9F=91=BE\r
2102--my-boundary\r
2103Content-Type: text/html;\r
2104\tcharset="us-ascii"\r
2105\r
2106<b>rich</b> text\r
2107--my-boundary\r
2108Content-Type: application/octet-stream\r
2109Content-Transfer-Encoding: base64\r
2110Content-Disposition: attachment;\r
2111\tfilename="woot.bin"\r
2112Content-ID: <woot.id@somewhere>\r
2113\r
2114AAECAw==\r
2115--my-boundary--\r
2116\r
2117
2118"#
2119 );
2120 }
2121
2122 #[tokio::test]
2123 async fn check_conformance_angle_msg_id() {
2124 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2125Message-ID: <<1234@example.com>>\r
2126\r
2127Hello";
2128 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2129 k9::snapshot!(
2130 msg.check_fix_conformance(
2131 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2132 MessageConformance::empty(),
2133 None,
2134 )
2135 .await
2136 .unwrap_err()
2137 .to_string(),
2138 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2139 );
2140
2141 msg.check_fix_conformance(
2142 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2143 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2144 None,
2145 )
2146 .await
2147 .unwrap();
2148
2149 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2164Message-ID: <<1234@example.com>>\r
2165\r
2166Hello this is a really long line Hello this is a really long line \
2167Hello this is a really long line Hello this is a really long line \
2168Hello this is a really long line Hello this is a really long line \
2169Hello this is a really long line Hello this is a really long line \
2170Hello this is a really long line Hello this is a really long line \
2171Hello this is a really long line Hello this is a really long line \
2172Hello this is a really long line Hello this is a really long line
2173";
2174 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2175 msg.check_fix_conformance(
2176 MessageConformance::MISSING_COLON_VALUE,
2177 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2178 None,
2179 )
2180 .await
2181 .unwrap();
2182
2183 }
2207
2208 #[tokio::test]
2209 async fn check_conformance() {
2210 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2211 msg.check_fix_conformance(
2212 MessageConformance::default(),
2213 MessageConformance::MISSING_MIME_VERSION,
2214 None,
2215 )
2216 .await
2217 .unwrap();
2218 k9::snapshot!(
2219 data_as_string(&msg),
2220 r#"
2221X-Hello: there\r
2222X-Header: value\r
2223Subject: Hello\r
2224X-Header: another value\r
2225From :Someone@somewhere\r
2226Mime-Version: 1.0\r
2227\r
2228Body
2229"#
2230 );
2231
2232 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2233 msg.check_fix_conformance(
2234 MessageConformance::default(),
2235 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2236 None,
2237 )
2238 .await
2239 .unwrap();
2240 k9::snapshot!(
2241 data_as_string(&msg),
2242 r#"
2243Content-Type: text/plain;\r
2244\tcharset="us-ascii"\r
2245X-Hello: there\r
2246X-Header: value\r
2247Subject: Hello\r
2248X-Header: another value\r
2249From: <Someone@somewhere>\r
2250Mime-Version: 1.0\r
2251\r
2252Body\r
2253
2254"#
2255 );
2256 }
2257
2258 #[tokio::test]
2259 async fn check_fix_latin_input() {
2260 const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2261 let msg = new_msg_body(&*POUNDS);
2262 msg.check_fix_conformance(
2263 MessageConformance::default(),
2264 MessageConformance::NEEDS_TRANSFER_ENCODING,
2265 Some(&CheckFixSettings {
2266 detect_encoding: true,
2267 include_encodings: vec!["iso-8859-1".to_string()],
2268 ..Default::default()
2269 }),
2270 )
2271 .await
2272 .unwrap();
2273
2274 let subject = msg
2275 .get_first_named_header_value("subject")
2276 .await
2277 .unwrap()
2278 .unwrap();
2279 assert_eq!(subject, "£");
2280 }
2281
2282 #[tokio::test]
2283 async fn set_scheduling() -> anyhow::Result<()> {
2284 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2285 assert!(msg.get_due().is_none(), "due is implicitly now");
2286
2287 let now = Utc::now();
2288 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2289
2290 msg.set_scheduling(Some(Scheduling {
2291 restriction: None,
2292 first_attempt: Some((now + one_day).into()),
2293 expires: None,
2294 }))
2295 .await?;
2296
2297 let due = msg.get_due().expect("due to now be set");
2298 assert!(due - now >= one_day, "due time is at least 1 day away");
2299
2300 Ok(())
2301 }
2302
2303 #[cfg(all(test, target_pointer_width = "64"))]
2304 #[test]
2305 fn sizes() {
2306 assert_eq!(std::mem::size_of::<Message>(), 8);
2307 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2308 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2309 }
2310}