1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20#[cfg(feature = "impl")]
21use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
22use mailparsing::{
23 CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
24};
25#[cfg(feature = "impl")]
26use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
27#[cfg(feature = "impl")]
28use mod_dns_resolver::get_resolver_instance;
29use parking_lot::Mutex;
30use prometheus::{Histogram, IntGauge};
31use serde::{Deserialize, Serialize};
32use serde_with::formats::PreferOne;
33use serde_with::{serde_as, OneOrMany};
34use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
35use std::hash::Hash;
36use std::sync::{Arc, LazyLock, Weak};
37use std::time::{Duration, Instant};
38use timeq::TimerEntryWithDelay;
39
40bitflags::bitflags! {
41 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
42 struct MessageFlags: u8 {
43 const META_DIRTY = 1;
45 const DATA_DIRTY = 2;
47 const SCHEDULED = 4;
49 const FORCE_SYNC = 8;
51 }
52}
53
54static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
55 prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
56});
57static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
58 prometheus::register_int_gauge!(
59 "message_meta_resident_count",
60 "total number of Message objects with metadata loaded"
61 )
62 .unwrap()
63});
64static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
65 prometheus::register_int_gauge!(
66 "message_data_resident_count",
67 "total number of Message objects with body data loaded"
68 )
69 .unwrap()
70});
71static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
72static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
73 prometheus::register_histogram!(
74 "message_save_latency",
75 "how long it takes to save a message to spool"
76 )
77 .unwrap()
78});
79static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
80 prometheus::register_histogram!(
81 "message_data_load_latency",
82 "how long it takes to load message data from spool"
83 )
84 .unwrap()
85});
86static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
87 prometheus::register_histogram!(
88 "message_meta_load_latency",
89 "how long it takes to load message metadata from spool"
90 )
91 .unwrap()
92});
93
94#[derive(Debug)]
95struct MessageInner {
96 metadata: Option<Box<MetaData>>,
97 data: Arc<Box<[u8]>>,
98 flags: MessageFlags,
99 num_attempts: u16,
100 due: Option<DateTime<Utc>>,
101}
102
103#[derive(Debug)]
104pub(crate) struct MessageWithId {
105 id: SpoolId,
106 inner: Mutex<MessageInner>,
107 link: LinkedListAtomicLink,
108}
109
110intrusive_adapter!(
111 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
112);
113
114pub struct MessageList {
119 list: LinkedList<MessageWithIdAdapter>,
120 len: usize,
121}
122
123impl Default for MessageList {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129impl MessageList {
130 pub fn new() -> Self {
132 Self {
133 list: LinkedList::new(MessageWithIdAdapter::default()),
134 len: 0,
135 }
136 }
137
138 pub fn len(&self) -> usize {
140 self.len
141 }
142
143 pub fn is_empty(&self) -> bool {
144 self.len == 0
145 }
146
147 pub fn take(&mut self) -> Self {
150 let new_list = Self {
151 list: self.list.take(),
152 len: self.len,
153 };
154 self.len = 0;
155 new_list
156 }
157
158 pub fn push_back(&mut self, message: Message) {
160 self.list.push_back(message.msg_and_id);
161 self.len += 1;
162 }
163
164 pub fn pop_front(&mut self) -> Option<Message> {
166 self.list.pop_front().map(|msg_and_id| {
167 self.len -= 1;
168 Message { msg_and_id }
169 })
170 }
171
172 pub fn pop_back(&mut self) -> Option<Message> {
174 self.list.pop_back().map(|msg_and_id| {
175 self.len -= 1;
176 Message { msg_and_id }
177 })
178 }
179
180 pub fn drain(&mut self) -> Vec<Message> {
184 let mut messages = Vec::with_capacity(self.len);
185 while let Some(msg) = self.pop_front() {
186 messages.push(msg);
187 }
188 messages
189 }
190
191 pub fn extend_from_iter<I>(&mut self, iter: I)
192 where
193 I: Iterator<Item = Message>,
194 {
195 for msg in iter {
196 self.push_back(msg)
197 }
198 }
199}
200
201impl IntoIterator for MessageList {
202 type Item = Message;
203 type IntoIter = MessageListIter;
204 fn into_iter(self) -> MessageListIter {
205 MessageListIter { list: self.list }
206 }
207}
208
209pub struct MessageListIter {
210 list: LinkedList<MessageWithIdAdapter>,
211}
212
213impl Iterator for MessageListIter {
214 type Item = Message;
215 fn next(&mut self) -> Option<Message> {
216 self.list
217 .pop_front()
218 .map(|msg_and_id| Message { msg_and_id })
219 }
220}
221
222#[derive(Clone, Debug)]
223#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
224pub struct Message {
225 pub(crate) msg_and_id: Arc<MessageWithId>,
226}
227
228impl PartialEq for Message {
229 fn eq(&self, other: &Self) -> bool {
230 self.id() == other.id()
231 }
232}
233impl Eq for Message {}
234
235impl Hash for Message {
236 fn hash<H>(&self, hasher: &mut H)
237 where
238 H: std::hash::Hasher,
239 {
240 self.id().hash(hasher)
241 }
242}
243
244#[derive(Clone, Debug)]
245pub struct WeakMessage {
246 weak: Weak<MessageWithId>,
247}
248
249impl WeakMessage {
250 pub fn upgrade(&self) -> Option<Message> {
251 Some(Message {
252 msg_and_id: self.weak.upgrade()?,
253 })
254 }
255}
256
257#[serde_as]
258#[derive(Debug, Serialize, Deserialize, Clone)]
259pub(crate) struct MetaData {
260 pub sender: EnvelopeAddress,
261 #[serde_as(as = "OneOrMany<_, PreferOne>")]
262 pub recipient: Vec<EnvelopeAddress>,
263 pub meta: serde_json::Value,
264 #[serde(default)]
265 pub schedule: Option<Scheduling>,
266}
267
268impl Drop for MessageInner {
269 fn drop(&mut self) {
270 if self.metadata.is_some() {
271 META_COUNT.dec();
272 }
273 if !self.data.is_empty() {
274 DATA_COUNT.dec();
275 }
276 MESSAGE_COUNT.dec();
277 }
278}
279
280impl Message {
281 pub fn new_dirty(
284 id: SpoolId,
285 sender: EnvelopeAddress,
286 recipient: Vec<EnvelopeAddress>,
287 meta: serde_json::Value,
288 data: Arc<Box<[u8]>>,
289 ) -> anyhow::Result<Self> {
290 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
291 MESSAGE_COUNT.inc();
292 DATA_COUNT.inc();
293 META_COUNT.inc();
294 Ok(Self {
295 msg_and_id: Arc::new(MessageWithId {
296 id,
297 inner: Mutex::new(MessageInner {
298 metadata: Some(Box::new(MetaData {
299 sender,
300 recipient,
301 meta,
302 schedule: None,
303 })),
304 data,
305 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
306 num_attempts: 0,
307 due: None,
308 }),
309 link: LinkedListAtomicLink::default(),
310 }),
311 })
312 }
313
314 pub fn weak(&self) -> WeakMessage {
315 WeakMessage {
316 weak: Arc::downgrade(&self.msg_and_id),
317 }
318 }
319
320 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
324 let metadata: MetaData = serde_json::from_slice(&metadata)?;
325 MESSAGE_COUNT.inc();
326 META_COUNT.inc();
327
328 let flags = if metadata.schedule.is_some() {
329 MessageFlags::SCHEDULED
330 } else {
331 MessageFlags::empty()
332 };
333
334 Ok(Self {
335 msg_and_id: Arc::new(MessageWithId {
336 id,
337 inner: Mutex::new(MessageInner {
338 metadata: Some(Box::new(metadata)),
339 data: NO_DATA.clone(),
340 flags,
341 num_attempts: 0,
342 due: None,
343 }),
344 link: LinkedListAtomicLink::default(),
345 }),
346 })
347 }
348
349 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
350 MESSAGE_COUNT.inc();
351 META_COUNT.inc();
352
353 let flags = if metadata.schedule.is_some() {
354 MessageFlags::SCHEDULED
355 } else {
356 MessageFlags::empty()
357 };
358
359 Self {
360 msg_and_id: Arc::new(MessageWithId {
361 id,
362 inner: Mutex::new(MessageInner {
363 metadata: Some(Box::new(metadata)),
364 data,
365 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
366 num_attempts: 0,
367 due: None,
368 }),
369 link: LinkedListAtomicLink::default(),
370 }),
371 }
372 }
373
374 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
375 let meta_spool = get_meta_spool();
376 let data = meta_spool.load(id).await?;
377 Self::new_from_spool(id, data)
378 }
379
380 pub fn get_num_attempts(&self) -> u16 {
381 let inner = self.msg_and_id.inner.lock();
382 inner.num_attempts
383 }
384
385 pub fn set_num_attempts(&self, num_attempts: u16) {
386 let mut inner = self.msg_and_id.inner.lock();
387 inner.num_attempts = num_attempts;
388 }
389
390 pub fn increment_num_attempts(&self) {
391 let mut inner = self.msg_and_id.inner.lock();
392 inner.num_attempts += 1;
393 }
394
395 pub async fn set_scheduling(
396 &self,
397 scheduling: Option<Scheduling>,
398 ) -> anyhow::Result<Option<Scheduling>> {
399 self.load_meta_if_needed().await?;
400 let mut inner = self.msg_and_id.inner.lock();
401 match &mut inner.metadata {
402 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
403 Some(meta) => {
404 meta.schedule = scheduling;
405 inner
406 .flags
407 .set(MessageFlags::SCHEDULED, scheduling.is_some());
408 if let Some(sched) = scheduling {
409 let due = inner.due.unwrap_or_else(Utc::now);
410 inner.due = Some(sched.adjust_for_schedule(due));
411 }
412 Ok(scheduling)
413 }
414 }
415 }
416
417 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
418 self.load_meta_if_needed().await?;
419 let inner = self.msg_and_id.inner.lock();
420 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
421 }
422
423 pub fn get_due(&self) -> Option<DateTime<Utc>> {
424 let inner = self.msg_and_id.inner.lock();
425 inner.due
426 }
427
428 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
429 let scale = rand::random::<f32>();
430 let value = (scale * limit as f32) as i64;
431 self.delay_by(seconds(value)?).await
432 }
433
434 pub async fn delay_by(
435 &self,
436 duration: chrono::Duration,
437 ) -> anyhow::Result<Option<DateTime<Utc>>> {
438 let due = Utc::now() + duration;
439 self.set_due(Some(due)).await
440 }
441
442 pub async fn delay_by_and_jitter(
444 &self,
445 duration: chrono::Duration,
446 ) -> anyhow::Result<Option<DateTime<Utc>>> {
447 let scale = rand::random::<f32>();
448 let value = (scale * 60.) as i64;
449 let due = Utc::now() + duration + seconds(value)?;
450 self.set_due(Some(due)).await
451 }
452
453 pub async fn set_due(
454 &self,
455 due: Option<DateTime<Utc>>,
456 ) -> anyhow::Result<Option<DateTime<Utc>>> {
457 let due = {
458 let mut inner = self.msg_and_id.inner.lock();
459
460 if !inner.flags.contains(MessageFlags::SCHEDULED) {
461 inner.due = due;
463 return Ok(inner.due);
464 }
465
466 let due = due.unwrap_or_else(Utc::now);
467
468 if let Some(meta) = &inner.metadata {
469 inner.due = match &meta.schedule {
470 Some(sched) => Some(sched.adjust_for_schedule(due)),
471 None => Some(due),
472 };
473 return Ok(inner.due);
474 }
475
476 due
479 };
480
481 self.load_meta().await?;
482
483 {
484 let mut inner = self.msg_and_id.inner.lock();
485 match &inner.metadata {
486 Some(meta) => {
487 inner.due = match &meta.schedule {
488 Some(sched) => Some(sched.adjust_for_schedule(due)),
489 None => Some(due),
490 };
491 Ok(inner.due)
492 }
493 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
494 }
495 }
496 }
497
498 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
499 let inner = self.msg_and_id.inner.lock();
500 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
501 Some(Arc::clone(&inner.data))
502 } else {
503 None
504 }
505 }
506
507 fn get_meta_if_dirty(&self) -> Option<MetaData> {
508 let inner = self.msg_and_id.inner.lock();
509 if inner.flags.contains(MessageFlags::META_DIRTY) {
510 inner.metadata.as_ref().map(|md| (**md).clone())
511 } else {
512 None
513 }
514 }
515
516 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
517 self.load_meta_if_needed().await?;
518 let inner = self.msg_and_id.inner.lock();
519 inner
520 .metadata
521 .as_ref()
522 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
523 .map(|md| (**md).clone())
524 }
525
526 pub fn set_force_sync(&self, force: bool) {
527 let mut inner = self.msg_and_id.inner.lock();
528 inner.flags.set(MessageFlags::FORCE_SYNC, force);
529 }
530
531 pub fn needs_save(&self) -> bool {
532 let inner = self.msg_and_id.inner.lock();
533 inner.flags.contains(MessageFlags::META_DIRTY)
534 || inner.flags.contains(MessageFlags::DATA_DIRTY)
535 }
536
537 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
538 let _timer = SAVE_HIST.start_timer();
539 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
540 .await
541 }
542
543 pub async fn save_to(
544 &self,
545 meta_spool: &(dyn Spool + Send + Sync),
546 data_spool: &(dyn Spool + Send + Sync),
547 deadline: Option<Instant>,
548 ) -> anyhow::Result<()> {
549 let force_sync = self
550 .msg_and_id
551 .inner
552 .lock()
553 .flags
554 .contains(MessageFlags::FORCE_SYNC);
555
556 let data_fut = if let Some(data) = self.get_data_if_dirty() {
557 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
558 data_spool
559 .store(self.msg_and_id.id, data, force_sync, deadline)
560 .map(|_| true)
561 .boxed()
562 } else {
563 futures::future::ready(false).boxed()
564 };
565 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
566 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
567 meta_spool
568 .store(self.msg_and_id.id, meta, force_sync, deadline)
569 .map(|_| true)
570 .boxed()
571 } else {
572 futures::future::ready(false).boxed()
573 };
574
575 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
581
582 if data_res {
583 self.msg_and_id
584 .inner
585 .lock()
586 .flags
587 .remove(MessageFlags::DATA_DIRTY);
588 }
589 if meta_res {
590 self.msg_and_id
591 .inner
592 .lock()
593 .flags
594 .remove(MessageFlags::META_DIRTY);
595 }
596 Ok(())
597 }
598
599 pub fn id(&self) -> &SpoolId {
600 &self.msg_and_id.id
601 }
602
603 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
605 self.save(None).await?;
606 self.shrink()
607 }
608
609 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
611 self.save(None).await?;
612 self.shrink_data()
613 }
614
615 pub fn shrink_data(&self) -> anyhow::Result<bool> {
616 let mut inner = self.msg_and_id.inner.lock();
617 let mut did_shrink = false;
618 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
619 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
620 }
621 if !inner.data.is_empty() {
622 DATA_COUNT.dec();
623 did_shrink = true;
624 }
625 if !inner.data.is_empty() {
626 inner.data = NO_DATA.clone();
627 did_shrink = true;
628 }
629 Ok(did_shrink)
630 }
631
632 pub fn shrink(&self) -> anyhow::Result<bool> {
633 let mut inner = self.msg_and_id.inner.lock();
634 let mut did_shrink = false;
635 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
636 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
637 }
638 if inner.flags.contains(MessageFlags::META_DIRTY) {
639 anyhow::bail!("Cannot shrink message: META_DIRTY");
640 }
641 if inner.metadata.take().is_some() {
642 META_COUNT.dec();
643 did_shrink = true;
644 }
645 if !inner.data.is_empty() {
646 DATA_COUNT.dec();
647 did_shrink = true;
648 }
649 if !inner.data.is_empty() {
650 inner.data = NO_DATA.clone();
651 did_shrink = true;
652 }
653 Ok(did_shrink)
654 }
655
656 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
657 self.load_meta_if_needed().await?;
658 let inner = self.msg_and_id.inner.lock();
659 match &inner.metadata {
660 Some(meta) => Ok(meta.sender.clone()),
661 None => anyhow::bail!("Message::sender metadata is not loaded"),
662 }
663 }
664
665 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
666 self.load_meta_if_needed().await?;
667 let mut inner = self.msg_and_id.inner.lock();
668 match &mut inner.metadata {
669 Some(meta) => {
670 meta.sender = sender;
671 inner.flags.set(MessageFlags::DATA_DIRTY, true);
672 Ok(())
673 }
674 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
675 }
676 }
677
678 #[deprecated = "use recipient_list or first_recipient instead"]
679 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
680 self.first_recipient().await
681 }
682
683 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
684 self.load_meta_if_needed().await?;
685 let inner = self.msg_and_id.inner.lock();
686 match &inner.metadata {
687 Some(meta) => match meta.recipient.first() {
688 Some(recip) => Ok(recip.clone()),
689 None => anyhow::bail!("recipient list is empty!?"),
690 },
691 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
692 }
693 }
694
695 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
696 self.load_meta_if_needed().await?;
697 let inner = self.msg_and_id.inner.lock();
698 match &inner.metadata {
699 Some(meta) => Ok(meta.recipient.clone()),
700 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
701 }
702 }
703
704 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
705 self.load_meta_if_needed().await?;
706 let inner = self.msg_and_id.inner.lock();
707 match &inner.metadata {
708 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
709 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
710 }
711 }
712
713 #[deprecated = "use set_recipient_list instead"]
714 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
715 self.set_recipient_list(vec![recipient]).await
716 }
717
718 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
719 self.load_meta_if_needed().await?;
720
721 let mut inner = self.msg_and_id.inner.lock();
722 match &mut inner.metadata {
723 Some(meta) => {
724 meta.recipient = recipient;
725 inner.flags.set(MessageFlags::DATA_DIRTY, true);
726 Ok(())
727 }
728 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
729 }
730 }
731
732 pub fn is_meta_loaded(&self) -> bool {
733 self.msg_and_id.inner.lock().metadata.is_some()
734 }
735
736 pub fn is_data_loaded(&self) -> bool {
737 !self.msg_and_id.inner.lock().data.is_empty()
738 }
739
740 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
741 if self.is_meta_loaded() {
742 return Ok(());
743 }
744 self.load_meta().await
745 }
746
747 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
748 self.load_data_if_needed().await
749 }
750
751 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
752 if self.is_data_loaded() {
753 return Ok(self.get_data_maybe_not_loaded());
754 }
755 self.load_data().await
756 }
757
758 pub async fn load_meta(&self) -> anyhow::Result<()> {
759 let _timer = LOAD_META_HIST.start_timer();
760 self.load_meta_from(&**get_meta_spool()).await
761 }
762
763 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
764 let id = self.id();
765 let data = meta_spool.load(*id).await?;
766 let mut inner = self.msg_and_id.inner.lock();
767 let was_not_loaded = inner.metadata.is_none();
768 let metadata: MetaData = serde_json::from_slice(&data)?;
769 inner.metadata.replace(Box::new(metadata));
770 if was_not_loaded {
771 META_COUNT.inc();
772 }
773 Ok(())
774 }
775
776 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
777 let _timer = LOAD_DATA_HIST.start_timer();
778 self.load_data_from(&**get_data_spool()).await
779 }
780
781 async fn load_data_from(
782 &self,
783 data_spool: &(dyn Spool + Send + Sync),
784 ) -> anyhow::Result<Arc<Box<[u8]>>> {
785 let data = data_spool.load(*self.id()).await?;
786 let mut inner = self.msg_and_id.inner.lock();
787 let was_empty = inner.data.is_empty();
788 inner.data = Arc::new(data.into_boxed_slice());
789 if was_empty {
790 DATA_COUNT.inc();
791 }
792 Ok(inner.data.clone())
793 }
794
795 pub fn assign_data(&self, data: Vec<u8>) {
796 let mut inner = self.msg_and_id.inner.lock();
797 let was_empty = inner.data.is_empty();
798 inner.data = Arc::new(data.into_boxed_slice());
799 inner.flags.set(MessageFlags::DATA_DIRTY, true);
800 if was_empty {
801 DATA_COUNT.inc();
802 }
803 }
804
805 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
806 let inner = self.msg_and_id.inner.lock();
807 inner.data.clone()
808 }
809
810 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
811 &self,
812 key: S,
813 value: V,
814 ) -> anyhow::Result<()> {
815 self.load_meta_if_needed().await?;
816 let mut inner = self.msg_and_id.inner.lock();
817 match &mut inner.metadata {
818 None => anyhow::bail!("set_meta: metadata must be loaded first"),
819 Some(meta) => {
820 let key = key.as_ref();
821 let value = value.into();
822
823 match &mut meta.meta {
824 serde_json::Value::Object(map) => {
825 map.insert(key.to_string(), value);
826 }
827 _ => anyhow::bail!("metadata is somehow not a json object"),
828 }
829
830 inner.flags.set(MessageFlags::META_DIRTY, true);
831 Ok(())
832 }
833 }
834 }
835
836 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
837 self.load_meta_if_needed().await?;
838 let mut inner = self.msg_and_id.inner.lock();
839 match &mut inner.metadata {
840 None => anyhow::bail!("set_meta: metadata must be loaded first"),
841 Some(meta) => {
842 let key = key.as_ref();
843
844 match &mut meta.meta {
845 serde_json::Value::Object(map) => {
846 map.remove(key);
847 }
848 _ => anyhow::bail!("metadata is somehow not a json object"),
849 }
850
851 inner.flags.set(MessageFlags::META_DIRTY, true);
852 Ok(())
853 }
854 }
855 }
856
857 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
859 &self,
860 key: S,
861 ) -> anyhow::Result<Option<String>> {
862 match self.get_meta(key).await {
863 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
864 Ok(serde_json::Value::Null) => Ok(None),
865 hmm => {
866 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
867 }
868 }
869 }
870
871 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
872 self.load_meta_if_needed().await?;
873 let inner = self.msg_and_id.inner.lock();
874 match &inner.metadata {
875 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
876 Some(meta) => Ok(meta.meta.clone()),
877 }
878 }
879
880 pub async fn get_meta<S: serde_json::value::Index>(
881 &self,
882 key: S,
883 ) -> anyhow::Result<serde_json::Value> {
884 self.load_meta_if_needed().await?;
885 let inner = self.msg_and_id.inner.lock();
886 match &inner.metadata {
887 None => anyhow::bail!("get_meta: metadata must be loaded first"),
888 Some(meta) => match meta.meta.get(key) {
889 Some(value) => Ok(value.clone()),
890 None => Ok(serde_json::Value::Null),
891 },
892 }
893 }
894
895 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
896 self.msg_and_id.id.age(now)
897 }
898
899 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
900 Ok(match self.get_meta_string("queue").await? {
901 Some(name) => name,
902 None => {
903 let name = QueueNameComponents::format(
904 self.get_meta_string("campaign").await?,
905 self.get_meta_string("tenant").await?,
906 self.first_recipient()
907 .await?
908 .domain()
909 .to_string()
910 .to_lowercase(),
911 self.get_meta_string("routing_domain").await?,
912 );
913 name.to_string()
914 }
915 })
916 }
917
918 #[cfg(feature = "impl")]
919 pub async fn arc_verify(
920 &self,
921 opt_resolver_name: Option<String>,
922 ) -> anyhow::Result<AuthenticationResult> {
923 let resolver = get_resolver_instance(&opt_resolver_name)?;
924 let data = self.data().await?;
925 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
926
927 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
928 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
929
930 let arc = ARC::verify(&message, &**resolver).await;
931 Ok(arc.authentication_result())
932 }
933
934 #[cfg(feature = "impl")]
935 pub async fn arc_seal(
936 &self,
937 signer: Signer,
938 auth_results: AuthenticationResults,
939 opt_resolver_name: Option<String>,
940 ) -> anyhow::Result<()> {
941 let resolver = get_resolver_instance(&opt_resolver_name)?;
942 let data = self.data().await?;
943 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
944 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
945 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
946 let arc = ARC::verify(&message, &**resolver).await;
947
948 let headers = arc.seal(&message, auth_results, signer.signer())?;
949 if !headers.is_empty() {
950 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
951
952 for hdr in headers {
953 hdr.write_header(&mut new_data).ok();
954 }
955 new_data.extend_from_slice(&data);
956 self.assign_data(new_data);
957 }
958
959 Ok(())
960 }
961
962 #[cfg(feature = "impl")]
963 pub async fn dkim_verify(
964 &self,
965 opt_resolver_name: Option<String>,
966 ) -> anyhow::Result<Vec<AuthenticationResult>> {
967 let resolver = get_resolver_instance(&opt_resolver_name)?;
968 let data = self.data().await?;
969 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
970
971 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
972 if parsed
973 .overall_conformance
974 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
975 {
976 return Ok(vec![AuthenticationResult {
977 method: "dkim".to_string(),
978 method_version: None,
979 result: "permerror".to_string(),
980 reason: Some("message has non-canonical line endings".to_string()),
981 props: Default::default(),
982 }]);
983 }
984 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
985
986 let from = match message.get_headers().from() {
987 Ok(Some(from)) if from.0.len() == 1 => from.0,
988 Ok(Some(from)) => {
989 return Ok(vec![AuthenticationResult {
990 method: "dkim".to_string(),
991 method_version: None,
992 result: "permerror".to_string(),
993 reason: Some(format!(
994 "From header must have a single sender, found {}",
995 from.len()
996 )),
997 props: Default::default(),
998 }]);
999 }
1000 Ok(None) => {
1001 return Ok(vec![AuthenticationResult {
1002 method: "dkim".to_string(),
1003 method_version: None,
1004 result: "permerror".to_string(),
1005 reason: Some("From header is missing".to_string()),
1006 props: Default::default(),
1007 }]);
1008 }
1009 Err(err) => {
1010 return Ok(vec![AuthenticationResult {
1011 method: "dkim".to_string(),
1012 method_version: None,
1013 result: "permerror".to_string(),
1014 reason: Some(format!("From header is invalid: {err:#}")),
1015 props: Default::default(),
1016 }]);
1017 }
1018 };
1019 let from_domain = &from[0].address.domain;
1020
1021 let results =
1022 kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1023 Ok(results)
1024 }
1025
1026 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1031 let data = self.data().await?;
1032 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1033 Ok(MimePart::parse(owned_data)?)
1034 }
1035
1036 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1037 let data = self.data().await?;
1038 Report::parse(&data)
1039 }
1040
1041 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1042 let data = self.data().await?;
1043 ARFReport::parse(&data)
1044 }
1045
1046 pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1047 let data = self.data().await?;
1048 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1049 emit_header(&mut new_data, name, value);
1050 new_data.extend_from_slice(&data);
1051 self.assign_data(new_data);
1052 Ok(())
1053 }
1054
1055 pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1056 let data = self.data().await?;
1057 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1058 for (idx, window) in data.windows(4).enumerate() {
1059 if window == b"\r\n\r\n" {
1060 let headers = &data[0..idx + 2];
1061 let body = &data[idx + 2..];
1062
1063 new_data.extend_from_slice(headers);
1064 emit_header(&mut new_data, name, value);
1065 new_data.extend_from_slice(body);
1066 self.assign_data(new_data);
1067 return Ok(());
1068 }
1069 }
1070
1071 anyhow::bail!("append_header could not find the end of the header block");
1072 }
1073
1074 pub async fn get_address_header(
1075 &self,
1076 header_name: &str,
1077 ) -> anyhow::Result<Option<HeaderAddressList>> {
1078 let data = self.data().await?;
1079 let HeaderParseResult { headers, .. } =
1080 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1081
1082 match headers.get_first(header_name) {
1083 Some(hdr) => {
1084 let list = hdr.as_address_list()?;
1085 let result: HeaderAddressList = list.into();
1086 Ok(Some(result))
1087 }
1088 None => Ok(None),
1089 }
1090 }
1091
1092 pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1093 let data = self.data().await?;
1094 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1095
1096 match headers.get_first(name) {
1097 Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1098 None => Ok(None),
1099 }
1100 }
1101
1102 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1103 let data = self.data().await?;
1104 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1105
1106 let mut values = vec![];
1107 for hdr in headers.iter_named(name) {
1108 values.push(hdr.as_unstructured()?);
1109 }
1110 Ok(values)
1111 }
1112
1113 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1114 let data = self.data().await?;
1115 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1116
1117 let mut values = vec![];
1118 for hdr in headers.iter() {
1119 values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1120 }
1121 Ok(values)
1122 }
1123
1124 pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1125 &self,
1126 mut func: F,
1127 ) -> anyhow::Result<()> {
1128 let data = self.data().await?;
1129 let mut new_data = Vec::with_capacity(data.len());
1130 let HeaderParseResult {
1131 headers,
1132 body_offset,
1133 ..
1134 } = Header::parse_headers(data.as_ref().as_ref())?;
1135 for hdr in headers.iter() {
1136 let retain = (func)(hdr);
1137 if !retain {
1138 continue;
1139 }
1140 hdr.write_header(&mut new_data)?;
1141 }
1142 new_data.extend_from_slice(b"\r\n");
1143 new_data.extend_from_slice(&data[body_offset..]);
1144 self.assign_data(new_data);
1145 Ok(())
1146 }
1147
1148 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1149 let mut removed = false;
1150 self.retain_headers(|hdr| {
1151 if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1152 removed = true;
1153 false
1154 } else {
1155 true
1156 }
1157 })
1158 .await
1159 }
1160
1161 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1162 let data = self.data().await?;
1163 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1164
1165 for hdr in headers.iter() {
1166 let do_import = if names.is_empty() {
1167 is_x_header(hdr.get_name())
1168 } else {
1169 is_header_in_names_list(hdr.get_name(), &names)
1170 };
1171 if do_import {
1172 let name = imported_header_name(hdr.get_name());
1173 self.set_meta(name, hdr.as_unstructured()?).await?;
1174 }
1175 }
1176
1177 Ok(())
1178 }
1179
1180 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1181 self.retain_headers(|hdr| {
1182 if names.is_empty() {
1183 !is_x_header(hdr.get_name())
1184 } else {
1185 !is_header_in_names_list(hdr.get_name(), &names)
1186 }
1187 })
1188 .await
1189 }
1190
1191 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1192 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1193 .await
1194 }
1195
1196 #[cfg(feature = "impl")]
1197 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1198 let data = self.data().await?;
1199 let header = if let Some(runtime) = SIGN_POOL.get() {
1200 runtime.spawn_blocking(move || signer.sign(&data)).await??
1201 } else {
1202 signer.sign(&data)?
1203 };
1204 self.prepend_header(None, &header).await?;
1205 Ok(())
1206 }
1207
1208 pub async fn import_scheduling_header(
1209 &self,
1210 header_name: &str,
1211 remove: bool,
1212 ) -> anyhow::Result<Option<Scheduling>> {
1213 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1214 let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1215 format!("{value} from header {header_name} is not a valid Scheduling header")
1216 })?;
1217 let result = self.set_scheduling(Some(sched)).await?;
1218
1219 if remove {
1220 self.remove_all_named_headers(header_name).await?;
1221 }
1222
1223 Ok(result)
1224 } else {
1225 Ok(None)
1226 }
1227 }
1228
1229 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1230 let data = self.data().await?;
1231 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1232 let parts = msg.simplified_structure_pointers()?;
1233 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1234 match p.body()? {
1235 DecodedBody::Text(text) => {
1236 let mut text = text.as_str().to_string();
1237 text.push_str("\r\n");
1238 text.push_str(content);
1239 p.replace_text_body("text/plain", &text)?;
1240
1241 let new_data = msg.to_message_string();
1242 self.assign_data(new_data.into_bytes());
1243 Ok(true)
1244 }
1245 DecodedBody::Binary(_) => {
1246 anyhow::bail!("expected text/plain part to be text, but it is binary");
1247 }
1248 }
1249 } else {
1250 Ok(false)
1251 }
1252 }
1253
1254 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1255 let data = self.data().await?;
1256 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1257 let parts = msg.simplified_structure_pointers()?;
1258 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1259 match p.body()? {
1260 DecodedBody::Text(text) => {
1261 let mut text = text.as_str().to_string();
1262
1263 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1264 Some(idx) => {
1265 text.insert_str(idx, content);
1266 text.insert_str(idx, "\r\n");
1267 }
1268 None => {
1269 text.push_str("\r\n");
1271 text.push_str(content);
1272 }
1273 }
1274
1275 p.replace_text_body("text/html", &text)?;
1276
1277 let new_data = msg.to_message_string();
1278 self.assign_data(new_data.into_bytes());
1279 Ok(true)
1280 }
1281 DecodedBody::Binary(_) => {
1282 anyhow::bail!("expected text/html part to be text, but it is binary");
1283 }
1284 }
1285 } else {
1286 Ok(false)
1287 }
1288 }
1289
1290 pub async fn check_fix_conformance(
1291 &self,
1292 check: MessageConformance,
1293 fix: MessageConformance,
1294 settings: Option<&CheckFixSettings>,
1295 ) -> anyhow::Result<()> {
1296 let data = self.data().await?;
1297 let data_bytes = data.as_ref().as_ref();
1298 let msg = MimePart::parse(data_bytes)?;
1299
1300 let mut settings = settings.map(Clone::clone).unwrap_or_default();
1301
1302 if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1303 && settings.message_id.is_none()
1304 && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1305 {
1306 let sender = self.sender().await?;
1307 let domain = sender.domain();
1308 let id = *self.id();
1309 settings.message_id.replace(format!("{id}@{domain}"));
1310 }
1311
1312 if settings.detect_encoding {
1313 settings.data_bytes.replace(data.clone());
1314 }
1315
1316 let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1317
1318 if let Some(msg) = opt_msg {
1319 let new_data = msg.to_message_string();
1320 self.assign_data(new_data.into_bytes());
1321 }
1322
1323 Ok(())
1324 }
1325}
1326
1327fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1328 for name in names {
1329 if hdr_name.eq_ignore_ascii_case(name) {
1330 return true;
1331 }
1332 }
1333 false
1334}
1335
1336fn imported_header_name(name: &str) -> String {
1337 name.chars()
1338 .map(|c| match c.to_ascii_lowercase() {
1339 '-' => '_',
1340 c => c,
1341 })
1342 .collect()
1343}
1344
1345fn is_x_header(name: &str) -> bool {
1346 name.starts_with("X-") || name.starts_with("x-")
1347}
1348
1349fn size_header(name: Option<&str>, value: &str) -> usize {
1350 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1351}
1352
1353fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1354 if let Some(name) = name {
1355 dest.extend_from_slice(name.as_bytes());
1356 dest.extend_from_slice(b": ");
1357 }
1358 dest.extend_from_slice(value.as_bytes());
1359 if !value.ends_with("\r\n") {
1360 dest.extend_from_slice(b"\r\n");
1361 }
1362}
1363
1364#[cfg(feature = "impl")]
1365impl UserData for Message {
1366 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1367 methods.add_async_method(
1368 "set_meta",
1369 move |_, this, (name, value): (String, mlua::Value)| async move {
1370 let value = serde_json::value::to_value(value).map_err(any_err)?;
1371 this.set_meta(name, value).await.map_err(any_err)?;
1372 Ok(())
1373 },
1374 );
1375 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1376 let value = this.get_meta(name).await.map_err(any_err)?;
1377 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1378 });
1379 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1380 let data = this.data().await.map_err(any_err)?;
1381 lua.create_string(&*data)
1382 });
1383 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1384 this.assign_data(data.as_bytes().to_vec());
1385 Ok(())
1386 });
1387
1388 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1389 let data = this.data().await.map_err(any_err)?;
1390 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1391 let part = MimePart::parse(owned_data).map_err(any_err)?;
1392 Ok(mod_mimepart::PartRef::new(part))
1393 });
1394
1395 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1396 this.append_text_plain(&data).await.map_err(any_err)
1397 });
1398
1399 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1400 this.append_text_html(&data).await.map_err(any_err)
1401 });
1402
1403 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1404 methods.add_async_method("sender", move |_, this, _: ()| async move {
1405 this.sender().await.map_err(any_err)
1406 });
1407
1408 methods.add_method("num_attempts", move |_, this, _: ()| {
1409 Ok(this.get_num_attempts())
1410 });
1411
1412 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1413 this.get_queue_name().await.map_err(any_err)
1414 });
1415
1416 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1417 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1418 let revised_due = this.set_due(due).await.map_err(any_err)?;
1419 lua.to_value(&revised_due)
1420 });
1421
1422 methods.add_async_method(
1423 "set_sender",
1424 move |lua, this, value: mlua::Value| async move {
1425 let sender = match value {
1426 mlua::Value::String(s) => {
1427 let s = s.to_str()?;
1428 EnvelopeAddress::parse(&s).map_err(any_err)?
1429 }
1430 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1431 };
1432 this.set_sender(sender).await.map_err(any_err)
1433 },
1434 );
1435
1436 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1437 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1438 match recipients.len() {
1439 0 => Ok(mlua::Value::Nil),
1440 1 => {
1441 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1442 recip.into_lua(&lua)
1443 }
1444 _ => recipients.into_lua(&lua),
1445 }
1446 });
1447
1448 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1449 let recipients = this.recipient_list().await.map_err(any_err)?;
1450 recipients.into_lua(&lua)
1451 });
1452
1453 methods.add_async_method(
1454 "set_recipient",
1455 move |lua, this, value: mlua::Value| async move {
1456 let recipients = match value {
1457 mlua::Value::String(s) => {
1458 let s = s.to_str()?;
1459 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1460 }
1461 _ => {
1462 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1463 recips
1464 } else {
1465 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1466 }
1467 }
1468 };
1469 this.set_recipient_list(recipients).await.map_err(any_err)
1470 },
1471 );
1472
1473 #[cfg(feature = "impl")]
1474 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1475 this.dkim_sign(signer).await.map_err(any_err)
1476 });
1477
1478 methods.add_async_method("shrink", |_, this, _: ()| async move {
1479 if this.needs_save() {
1480 this.save(None).await.map_err(any_err)?;
1481 }
1482 this.shrink().map_err(any_err)
1483 });
1484
1485 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1486 if this.needs_save() {
1487 this.save(None).await.map_err(any_err)?;
1488 }
1489 this.shrink_data().map_err(any_err)
1490 });
1491
1492 methods.add_async_method(
1493 "add_authentication_results",
1494 |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1495 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1496 let results = AuthenticationResults {
1497 serv_id,
1498 version: None,
1499 results,
1500 };
1501
1502 this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1503 .await
1504 .map_err(any_err)?;
1505
1506 Ok(())
1507 },
1508 );
1509
1510 #[cfg(feature = "impl")]
1511 methods.add_async_method(
1512 "arc_verify",
1513 |lua, this, opt_resolver_name: Option<String>| async move {
1514 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1515 lua.to_value_with(&results, serialize_options())
1516 },
1517 );
1518
1519 #[cfg(feature = "impl")]
1520 methods.add_async_method(
1521 "arc_seal",
1522 |lua,
1523 this,
1524 (signer, serv_id, auth_res, opt_resolver_name): (
1525 Signer,
1526 String,
1527 mlua::Value,
1528 Option<String>,
1529 )| async move {
1530 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1531 this.arc_seal(
1532 signer,
1533 AuthenticationResults {
1534 serv_id,
1535 version: None,
1536 results,
1537 },
1538 opt_resolver_name,
1539 )
1540 .await
1541 .map_err(any_err)
1542 },
1543 );
1544
1545 #[cfg(feature = "impl")]
1546 methods.add_async_method(
1547 "dkim_verify",
1548 |lua, this, opt_resolver_name: Option<String>| async move {
1549 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1550 lua.to_value_with(&results, serialize_options())
1551 },
1552 );
1553
1554 methods.add_async_method(
1555 "prepend_header",
1556 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1557 let encode = encode.unwrap_or(false);
1558 if encode {
1559 let header = Header::new_unstructured(name.clone(), value);
1560 this.prepend_header(Some(&name), header.get_raw_value())
1561 .await
1562 .map_err(any_err)?;
1563 } else {
1564 this.prepend_header(Some(&name), &value)
1565 .await
1566 .map_err(any_err)?;
1567 }
1568 Ok(())
1569 },
1570 );
1571 methods.add_async_method(
1572 "append_header",
1573 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1574 let encode = encode.unwrap_or(false);
1575 if encode {
1576 let header = Header::new_unstructured(name.clone(), value);
1577 this.append_header(Some(&name), header.get_raw_value())
1578 .await
1579 .map_err(any_err)?;
1580 } else {
1581 this.append_header(Some(&name), &value)
1582 .await
1583 .map_err(any_err)?;
1584 }
1585 Ok(())
1586 },
1587 );
1588 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1589 this.get_address_header(&name).await.map_err(any_err)
1590 });
1591 methods.add_async_method("from_header", |_, this, ()| async move {
1592 this.get_address_header("From").await.map_err(any_err)
1593 });
1594 methods.add_async_method("to_header", |_, this, ()| async move {
1595 this.get_address_header("To").await.map_err(any_err)
1596 });
1597
1598 methods.add_async_method(
1599 "get_first_named_header_value",
1600 |_, this, name: String| async move {
1601 this.get_first_named_header_value(&name)
1602 .await
1603 .map_err(any_err)
1604 },
1605 );
1606 methods.add_async_method(
1607 "get_all_named_header_values",
1608 |_, this, name: String| async move {
1609 this.get_all_named_header_values(&name)
1610 .await
1611 .map_err(any_err)
1612 },
1613 );
1614 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1615 Ok(this
1616 .get_all_headers()
1617 .await
1618 .map_err(any_err)?
1619 .into_iter()
1620 .map(|(name, value)| vec![name, value])
1621 .collect::<Vec<Vec<String>>>())
1622 });
1623 methods.add_async_method(
1624 "import_x_headers",
1625 |_, this, names: Option<Vec<String>>| async move {
1626 this.import_x_headers(names.unwrap_or_default())
1627 .await
1628 .map_err(any_err)
1629 },
1630 );
1631
1632 methods.add_async_method(
1633 "remove_x_headers",
1634 |_, this, names: Option<Vec<String>>| async move {
1635 this.remove_x_headers(names.unwrap_or_default())
1636 .await
1637 .map_err(any_err)
1638 },
1639 );
1640 methods.add_async_method(
1641 "remove_all_named_headers",
1642 |_, this, name: String| async move {
1643 this.remove_all_named_headers(&name).await.map_err(any_err)
1644 },
1645 );
1646
1647 methods.add_async_method(
1648 "import_scheduling_header",
1649 |lua, this, (header_name, remove): (String, bool)| async move {
1650 let opt_schedule = this
1651 .import_scheduling_header(&header_name, remove)
1652 .await
1653 .map_err(any_err)?;
1654 lua.to_value(&opt_schedule)
1655 },
1656 );
1657
1658 methods.add_async_method(
1659 "set_scheduling",
1660 move |lua, this, params: mlua::Value| async move {
1661 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1662 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1663 lua.to_value(&opt_schedule)
1664 },
1665 );
1666
1667 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1668 let report = this.parse_rfc3464().await.map_err(any_err)?;
1669 match report {
1670 Some(report) => lua.to_value_with(&report, serialize_options()),
1671 None => Ok(mlua::Value::Nil),
1672 }
1673 });
1674
1675 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1676 let report = this.parse_rfc5965().await.map_err(any_err)?;
1677 match report {
1678 Some(report) => lua.to_value_with(&report, serialize_options()),
1679 None => Ok(mlua::Value::Nil),
1680 }
1681 });
1682
1683 methods.add_async_method("save", |_, this, ()| async move {
1684 this.save(None).await.map_err(any_err)
1685 });
1686
1687 methods.add_method("set_force_sync", move |_, this, force: bool| {
1688 this.set_force_sync(force);
1689 Ok(())
1690 });
1691
1692 methods.add_async_method(
1693 "check_fix_conformance",
1694 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1695 use std::str::FromStr;
1696 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1697 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1698
1699 let settings = match settings {
1700 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1701 None => None,
1702 };
1703
1704 match this
1705 .check_fix_conformance(check, fix, settings.as_ref())
1706 .await
1707 {
1708 Ok(_) => Ok(None),
1709 Err(err) => Ok(Some(format!("{err:#}"))),
1710 }
1711 },
1712 );
1713 }
1714}
1715
1716impl TimerEntryWithDelay for WeakMessage {
1717 fn delay(&self) -> Duration {
1718 match self.upgrade() {
1719 None => {
1720 Duration::from_millis(0)
1722 }
1723 Some(msg) => msg.delay(),
1724 }
1725 }
1726}
1727
1728impl TimerEntryWithDelay for Message {
1729 fn delay(&self) -> Duration {
1730 let inner = self.msg_and_id.inner.lock();
1731 match inner.due {
1732 Some(time) => {
1733 let now = Utc::now();
1734 let delta = time - now;
1735 delta.to_std().unwrap_or(Duration::from_millis(0))
1736 }
1737 None => Duration::from_millis(0),
1738 }
1739 }
1740}
1741
1742#[cfg(test)]
1743pub(crate) mod test {
1744 use super::*;
1745 use serde_json::json;
1746
1747 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1748 Message::new_dirty(
1749 SpoolId::new(),
1750 EnvelopeAddress::parse("sender@example.com").unwrap(),
1751 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1752 serde_json::json!({}),
1753 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1754 )
1755 .unwrap()
1756 }
1757
1758 fn data_as_string(msg: &Message) -> String {
1759 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1760 }
1761
1762 const X_HDR_CONTENT: &str =
1763 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1764
1765 #[tokio::test]
1766 async fn import_all_x_headers() {
1767 let msg = new_msg_body(X_HDR_CONTENT);
1768
1769 msg.import_x_headers(vec![]).await.unwrap();
1770 k9::assert_equal!(
1771 msg.get_meta_obj().await.unwrap(),
1772 json!({
1773 "x_hello": "there",
1774 "x_header": "value",
1775 })
1776 );
1777 }
1778
1779 #[tokio::test]
1780 async fn meta_and_nil() {
1781 let msg = new_msg_body(X_HDR_CONTENT);
1782 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1784 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1785
1786 let lua = mlua::Lua::new();
1788 lua.globals().set("msg", msg).unwrap();
1789 lua.load("assert(msg:get_meta('test') == nil)")
1790 .exec()
1791 .unwrap();
1792 }
1793
1794 #[tokio::test]
1795 async fn import_some_x_headers() {
1796 let msg = new_msg_body(X_HDR_CONTENT);
1797
1798 msg.import_x_headers(vec!["x-hello".to_string()])
1799 .await
1800 .unwrap();
1801 k9::assert_equal!(
1802 msg.get_meta_obj().await.unwrap(),
1803 json!({
1804 "x_hello": "there",
1805 })
1806 );
1807 }
1808
1809 #[tokio::test]
1810 async fn remove_all_x_headers() {
1811 let msg = new_msg_body(X_HDR_CONTENT);
1812
1813 msg.remove_x_headers(vec![]).await.unwrap();
1814 k9::assert_equal!(
1815 data_as_string(&msg),
1816 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1817 );
1818 }
1819
1820 #[tokio::test]
1821 async fn prepend_header_2_params() {
1822 let msg = new_msg_body(X_HDR_CONTENT);
1823
1824 msg.prepend_header(Some("Date"), "Today").await.unwrap();
1825 k9::assert_equal!(
1826 data_as_string(&msg),
1827 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn prepend_header_1_params() {
1833 let msg = new_msg_body(X_HDR_CONTENT);
1834
1835 msg.prepend_header(None, "Date: Today").await.unwrap();
1836 k9::assert_equal!(
1837 data_as_string(&msg),
1838 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1839 );
1840 }
1841
1842 #[tokio::test]
1843 async fn append_header_2_params() {
1844 let msg = new_msg_body(X_HDR_CONTENT);
1845
1846 msg.append_header(Some("Date"), "Today").await.unwrap();
1847 k9::assert_equal!(
1848 data_as_string(&msg),
1849 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1850 );
1851 }
1852
1853 #[tokio::test]
1854 async fn append_header_1_params() {
1855 let msg = new_msg_body(X_HDR_CONTENT);
1856
1857 msg.append_header(None, "Date: Today").await.unwrap();
1858 k9::assert_equal!(
1859 data_as_string(&msg),
1860 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1861 );
1862 }
1863
1864 const MULTI_HEADER_CONTENT: &str =
1865 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1866
1867 #[tokio::test]
1868 async fn get_first_header() {
1869 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1870 k9::assert_equal!(
1871 msg.get_first_named_header_value("X-header")
1872 .await
1873 .unwrap()
1874 .unwrap(),
1875 "value"
1876 );
1877 }
1878
1879 #[tokio::test]
1880 async fn get_all_header() {
1881 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1882 k9::assert_equal!(
1883 msg.get_all_named_header_values("X-header").await.unwrap(),
1884 vec!["value".to_string(), "another value".to_string()]
1885 );
1886 }
1887
1888 #[tokio::test]
1889 async fn remove_first() {
1890 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1891 msg.remove_first_named_header("X-header").await.unwrap();
1892 k9::assert_equal!(
1893 data_as_string(&msg),
1894 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1895 );
1896 }
1897
1898 #[tokio::test]
1899 async fn remove_all() {
1900 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1901 msg.remove_all_named_headers("X-header").await.unwrap();
1902 k9::assert_equal!(
1903 data_as_string(&msg),
1904 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1905 );
1906 }
1907
1908 #[tokio::test]
1909 async fn append_text_plain() {
1910 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1911 msg.append_text_plain("I am at the bottom").await.unwrap();
1912 k9::assert_equal!(
1913 data_as_string(&msg),
1914 "X-Hello: there\r\n\
1915 X-Header: value\r\n\
1916 Subject: Hello\r\n\
1917 X-Header: another value\r\n\
1918 From :Someone@somewhere\r\n\
1919 Content-Type: text/plain;\r\n\
1920 \tcharset=\"us-ascii\"\r\n\
1921 \r\n\
1922 Body\r\n\
1923 I am at the bottom\r\n"
1924 );
1925 }
1926
1927 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1928\tboundary=\"my-boundary\"\r\n\
1929\r\n\
1930--my-boundary\r\n\
1931Content-Type: text/plain;\r\n\
1932\tcharset=\"us-ascii\"\r\n\
1933\r\n\
1934plain text\r\n\
1935--my-boundary\r\n\
1936Content-Type: text/html;\r\n\
1937\tcharset=\"us-ascii\"\r\n\
1938\r\n\
1939<b>rich</b> text\r\n\
1940--my-boundary\r\n\
1941Content-Type: application/octet-stream\r\n\
1942Content-Transfer-Encoding: base64\r\n\
1943Content-Disposition: attachment;\r\n\
1944\tfilename=\"woot.bin\"\r\n\
1945Content-ID: <woot.id@somewhere>\r\n\
1946\r\n\
1947AAECAw==\r\n\
1948--my-boundary--\r\n\
1949\r\n";
1950
1951 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1952\tboundary=\"my-boundary\"\r\n\
1953\r\n\
1954--my-boundary\r\n\
1955Content-Type: text/plain;\r\n\
1956\tcharset=\"us-ascii\"\r\n\
1957\r\n\
1958plain text\r\n\
1959--my-boundary\r\n\
1960Content-Type: text/html;\r\n\
1961\tcharset=\"us-ascii\"\r\n\
1962\r\n\
1963<BODY>\r\n\
1964<b>rich</b> text\r\n\
1965</BODY>\r\n\
1966--my-boundary\r\n\
1967Content-Type: application/octet-stream\r\n\
1968Content-Transfer-Encoding: base64\r\n\
1969Content-Disposition: attachment;\r\n\
1970\tfilename=\"woot.bin\"\r\n\
1971Content-ID: <woot.id>\r\n\
1972\r\n\
1973AAECAw==\r\n\
1974--my-boundary--\r\n\
1975\r\n";
1976
1977 #[tokio::test]
1978 async fn append_text_html() {
1979 let msg = new_msg_body(MIXED_CONTENT);
1980 msg.append_text_html("bottom html").await.unwrap();
1981 k9::snapshot!(
1982 data_as_string(&msg),
1983 r#"
1984Content-Type: multipart/mixed;\r
1985\tboundary="my-boundary"\r
1986\r
1987--my-boundary\r
1988Content-Type: text/plain;\r
1989\tcharset="us-ascii"\r
1990\r
1991plain text\r
1992--my-boundary\r
1993Content-Type: text/html;\r
1994\tcharset="us-ascii"\r
1995\r
1996<b>rich</b> text\r
1997\r
1998bottom html\r
1999--my-boundary\r
2000Content-Type: application/octet-stream\r
2001Content-Transfer-Encoding: base64\r
2002Content-Disposition: attachment;\r
2003\tfilename="woot.bin"\r
2004Content-ID: <woot.id@somewhere>\r
2005\r
2006AAECAw==\r
2007--my-boundary--\r
2008\r
2009
2010"#
2011 );
2012
2013 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2014 msg.append_text_html("bottom html 👻").await.unwrap();
2015 k9::snapshot!(
2016 data_as_string(&msg),
2017 r#"
2018Content-Type: multipart/mixed;\r
2019\tboundary="my-boundary"\r
2020\r
2021--my-boundary\r
2022Content-Type: text/plain;\r
2023\tcharset="us-ascii"\r
2024\r
2025plain text\r
2026--my-boundary\r
2027Content-Type: text/html;\r
2028\tcharset="utf-8"\r
2029Content-Transfer-Encoding: quoted-printable\r
2030\r
2031<BODY>\r
2032<b>rich</b> text\r
2033\r
2034bottom html =F0=9F=91=BB</BODY>\r
2035--my-boundary\r
2036Content-Type: application/octet-stream\r
2037Content-Transfer-Encoding: base64\r
2038Content-Disposition: attachment;\r
2039\tfilename="woot.bin"\r
2040Content-ID: <woot.id>\r
2041\r
2042AAECAw==\r
2043--my-boundary--\r
2044\r
2045
2046"#
2047 );
2048 }
2049
2050 #[tokio::test]
2051 async fn append_text_plain_mixed() {
2052 let msg = new_msg_body(MIXED_CONTENT);
2053 msg.append_text_plain("bottom text 👾").await.unwrap();
2054 k9::snapshot!(
2055 data_as_string(&msg),
2056 r#"
2057Content-Type: multipart/mixed;\r
2058\tboundary="my-boundary"\r
2059\r
2060--my-boundary\r
2061Content-Type: text/plain;\r
2062\tcharset="utf-8"\r
2063Content-Transfer-Encoding: quoted-printable\r
2064\r
2065plain text\r
2066\r
2067bottom text =F0=9F=91=BE\r
2068--my-boundary\r
2069Content-Type: text/html;\r
2070\tcharset="us-ascii"\r
2071\r
2072<b>rich</b> text\r
2073--my-boundary\r
2074Content-Type: application/octet-stream\r
2075Content-Transfer-Encoding: base64\r
2076Content-Disposition: attachment;\r
2077\tfilename="woot.bin"\r
2078Content-ID: <woot.id@somewhere>\r
2079\r
2080AAECAw==\r
2081--my-boundary--\r
2082\r
2083
2084"#
2085 );
2086 }
2087
2088 #[tokio::test]
2089 async fn check_conformance_angle_msg_id() {
2090 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2091Message-ID: <<1234@example.com>>\r
2092\r
2093Hello";
2094 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2095 k9::snapshot!(
2096 msg.check_fix_conformance(
2097 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2098 MessageConformance::empty(),
2099 None,
2100 )
2101 .await
2102 .unwrap_err()
2103 .to_string(),
2104 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2105 );
2106
2107 msg.check_fix_conformance(
2108 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2109 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2110 None,
2111 )
2112 .await
2113 .unwrap();
2114
2115 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2130Message-ID: <<1234@example.com>>\r
2131\r
2132Hello this is a really long line Hello this is a really long line \
2133Hello this is a really long line Hello this is a really long line \
2134Hello this is a really long line Hello this is a really long line \
2135Hello this is a really long line Hello this is a really long line \
2136Hello this is a really long line Hello this is a really long line \
2137Hello this is a really long line Hello this is a really long line \
2138Hello this is a really long line Hello this is a really long line
2139";
2140 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2141 msg.check_fix_conformance(
2142 MessageConformance::MISSING_COLON_VALUE,
2143 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2144 None,
2145 )
2146 .await
2147 .unwrap();
2148
2149 }
2173
2174 #[tokio::test]
2175 async fn check_conformance() {
2176 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2177 msg.check_fix_conformance(
2178 MessageConformance::default(),
2179 MessageConformance::MISSING_MIME_VERSION,
2180 None,
2181 )
2182 .await
2183 .unwrap();
2184 k9::snapshot!(
2185 data_as_string(&msg),
2186 r#"
2187X-Hello: there\r
2188X-Header: value\r
2189Subject: Hello\r
2190X-Header: another value\r
2191From :Someone@somewhere\r
2192Mime-Version: 1.0\r
2193\r
2194Body
2195"#
2196 );
2197
2198 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2199 msg.check_fix_conformance(
2200 MessageConformance::default(),
2201 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2202 None,
2203 )
2204 .await
2205 .unwrap();
2206 k9::snapshot!(
2207 data_as_string(&msg),
2208 r#"
2209Content-Type: text/plain;\r
2210\tcharset="us-ascii"\r
2211X-Hello: there\r
2212X-Header: value\r
2213Subject: Hello\r
2214X-Header: another value\r
2215From: <Someone@somewhere>\r
2216Mime-Version: 1.0\r
2217\r
2218Body\r
2219
2220"#
2221 );
2222 }
2223
2224 #[tokio::test]
2225 async fn check_fix_latin_input() {
2226 const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2227 let msg = new_msg_body(&*POUNDS);
2228 msg.check_fix_conformance(
2229 MessageConformance::default(),
2230 MessageConformance::NEEDS_TRANSFER_ENCODING,
2231 Some(&CheckFixSettings {
2232 detect_encoding: true,
2233 include_encodings: vec!["iso-8859-1".to_string()],
2234 ..Default::default()
2235 }),
2236 )
2237 .await
2238 .unwrap();
2239
2240 let subject = msg
2241 .get_first_named_header_value("subject")
2242 .await
2243 .unwrap()
2244 .unwrap();
2245 assert_eq!(subject, "£");
2246 }
2247
2248 #[tokio::test]
2249 async fn set_scheduling() -> anyhow::Result<()> {
2250 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2251 assert!(msg.get_due().is_none(), "due is implicitly now");
2252
2253 let now = Utc::now();
2254 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2255
2256 msg.set_scheduling(Some(Scheduling {
2257 restriction: None,
2258 first_attempt: Some((now + one_day).into()),
2259 expires: None,
2260 }))
2261 .await?;
2262
2263 let due = msg.get_due().expect("due to now be set");
2264 assert!(due - now >= one_day, "due time is at least 1 day away");
2265
2266 Ok(())
2267 }
2268
2269 #[cfg(all(test, target_pointer_width = "64"))]
2270 #[test]
2271 fn sizes() {
2272 assert_eq!(std::mem::size_of::<Message>(), 8);
2273 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2274 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2275 }
2276}