1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20#[cfg(feature = "impl")]
21use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
22use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
23#[cfg(feature = "impl")]
24use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
25#[cfg(feature = "impl")]
26use mod_dns_resolver::get_resolver_instance;
27use parking_lot::Mutex;
28use prometheus::{Histogram, IntGauge};
29use serde::{Deserialize, Serialize};
30use serde_with::formats::PreferOne;
31use serde_with::{serde_as, OneOrMany};
32use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
33use std::hash::Hash;
34use std::sync::{Arc, LazyLock, Weak};
35use std::time::{Duration, Instant};
36use timeq::TimerEntryWithDelay;
37
38bitflags::bitflags! {
39 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
40 struct MessageFlags: u8 {
41 const META_DIRTY = 1;
43 const DATA_DIRTY = 2;
45 const SCHEDULED = 4;
47 const FORCE_SYNC = 8;
49 }
50}
51
52static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
53 prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
54});
55static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
56 prometheus::register_int_gauge!(
57 "message_meta_resident_count",
58 "total number of Message objects with metadata loaded"
59 )
60 .unwrap()
61});
62static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
63 prometheus::register_int_gauge!(
64 "message_data_resident_count",
65 "total number of Message objects with body data loaded"
66 )
67 .unwrap()
68});
69static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
70static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
71 prometheus::register_histogram!(
72 "message_save_latency",
73 "how long it takes to save a message to spool"
74 )
75 .unwrap()
76});
77static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
78 prometheus::register_histogram!(
79 "message_data_load_latency",
80 "how long it takes to load message data from spool"
81 )
82 .unwrap()
83});
84static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
85 prometheus::register_histogram!(
86 "message_meta_load_latency",
87 "how long it takes to load message metadata from spool"
88 )
89 .unwrap()
90});
91
92#[derive(Debug)]
93struct MessageInner {
94 metadata: Option<Box<MetaData>>,
95 data: Arc<Box<[u8]>>,
96 flags: MessageFlags,
97 num_attempts: u16,
98 due: Option<DateTime<Utc>>,
99}
100
101#[derive(Debug)]
102pub(crate) struct MessageWithId {
103 id: SpoolId,
104 inner: Mutex<MessageInner>,
105 link: LinkedListAtomicLink,
106}
107
108intrusive_adapter!(
109 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
110);
111
112pub struct MessageList {
117 list: LinkedList<MessageWithIdAdapter>,
118 len: usize,
119}
120
121impl Default for MessageList {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl MessageList {
128 pub fn new() -> Self {
130 Self {
131 list: LinkedList::new(MessageWithIdAdapter::default()),
132 len: 0,
133 }
134 }
135
136 pub fn len(&self) -> usize {
138 self.len
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.len == 0
143 }
144
145 pub fn take(&mut self) -> Self {
148 let new_list = Self {
149 list: self.list.take(),
150 len: self.len,
151 };
152 self.len = 0;
153 new_list
154 }
155
156 pub fn push_back(&mut self, message: Message) {
158 self.list.push_back(message.msg_and_id);
159 self.len += 1;
160 }
161
162 pub fn pop_front(&mut self) -> Option<Message> {
164 self.list.pop_front().map(|msg_and_id| {
165 self.len -= 1;
166 Message { msg_and_id }
167 })
168 }
169
170 pub fn pop_back(&mut self) -> Option<Message> {
172 self.list.pop_back().map(|msg_and_id| {
173 self.len -= 1;
174 Message { msg_and_id }
175 })
176 }
177
178 pub fn drain(&mut self) -> Vec<Message> {
182 let mut messages = Vec::with_capacity(self.len);
183 while let Some(msg) = self.pop_front() {
184 messages.push(msg);
185 }
186 messages
187 }
188
189 pub fn extend_from_iter<I>(&mut self, iter: I)
190 where
191 I: Iterator<Item = Message>,
192 {
193 for msg in iter {
194 self.push_back(msg)
195 }
196 }
197}
198
199impl IntoIterator for MessageList {
200 type Item = Message;
201 type IntoIter = MessageListIter;
202 fn into_iter(self) -> MessageListIter {
203 MessageListIter { list: self.list }
204 }
205}
206
207pub struct MessageListIter {
208 list: LinkedList<MessageWithIdAdapter>,
209}
210
211impl Iterator for MessageListIter {
212 type Item = Message;
213 fn next(&mut self) -> Option<Message> {
214 self.list
215 .pop_front()
216 .map(|msg_and_id| Message { msg_and_id })
217 }
218}
219
220#[derive(Clone, Debug)]
221#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
222pub struct Message {
223 pub(crate) msg_and_id: Arc<MessageWithId>,
224}
225
226impl PartialEq for Message {
227 fn eq(&self, other: &Self) -> bool {
228 self.id() == other.id()
229 }
230}
231impl Eq for Message {}
232
233impl Hash for Message {
234 fn hash<H>(&self, hasher: &mut H)
235 where
236 H: std::hash::Hasher,
237 {
238 self.id().hash(hasher)
239 }
240}
241
242#[derive(Clone, Debug)]
243pub struct WeakMessage {
244 weak: Weak<MessageWithId>,
245}
246
247impl WeakMessage {
248 pub fn upgrade(&self) -> Option<Message> {
249 Some(Message {
250 msg_and_id: self.weak.upgrade()?,
251 })
252 }
253}
254
255#[serde_as]
256#[derive(Debug, Serialize, Deserialize, Clone)]
257pub(crate) struct MetaData {
258 pub sender: EnvelopeAddress,
259 #[serde_as(as = "OneOrMany<_, PreferOne>")]
260 pub recipient: Vec<EnvelopeAddress>,
261 pub meta: serde_json::Value,
262 #[serde(default)]
263 pub schedule: Option<Scheduling>,
264}
265
266impl Drop for MessageInner {
267 fn drop(&mut self) {
268 if self.metadata.is_some() {
269 META_COUNT.dec();
270 }
271 if !self.data.is_empty() {
272 DATA_COUNT.dec();
273 }
274 MESSAGE_COUNT.dec();
275 }
276}
277
278impl Message {
279 pub fn new_dirty(
282 id: SpoolId,
283 sender: EnvelopeAddress,
284 recipient: Vec<EnvelopeAddress>,
285 meta: serde_json::Value,
286 data: Arc<Box<[u8]>>,
287 ) -> anyhow::Result<Self> {
288 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
289 MESSAGE_COUNT.inc();
290 DATA_COUNT.inc();
291 META_COUNT.inc();
292 Ok(Self {
293 msg_and_id: Arc::new(MessageWithId {
294 id,
295 inner: Mutex::new(MessageInner {
296 metadata: Some(Box::new(MetaData {
297 sender,
298 recipient,
299 meta,
300 schedule: None,
301 })),
302 data,
303 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
304 num_attempts: 0,
305 due: None,
306 }),
307 link: LinkedListAtomicLink::default(),
308 }),
309 })
310 }
311
312 pub fn weak(&self) -> WeakMessage {
313 WeakMessage {
314 weak: Arc::downgrade(&self.msg_and_id),
315 }
316 }
317
318 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
322 let metadata: MetaData = serde_json::from_slice(&metadata)?;
323 MESSAGE_COUNT.inc();
324 META_COUNT.inc();
325
326 let flags = if metadata.schedule.is_some() {
327 MessageFlags::SCHEDULED
328 } else {
329 MessageFlags::empty()
330 };
331
332 Ok(Self {
333 msg_and_id: Arc::new(MessageWithId {
334 id,
335 inner: Mutex::new(MessageInner {
336 metadata: Some(Box::new(metadata)),
337 data: NO_DATA.clone(),
338 flags,
339 num_attempts: 0,
340 due: None,
341 }),
342 link: LinkedListAtomicLink::default(),
343 }),
344 })
345 }
346
347 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
348 MESSAGE_COUNT.inc();
349 META_COUNT.inc();
350
351 let flags = if metadata.schedule.is_some() {
352 MessageFlags::SCHEDULED
353 } else {
354 MessageFlags::empty()
355 };
356
357 Self {
358 msg_and_id: Arc::new(MessageWithId {
359 id,
360 inner: Mutex::new(MessageInner {
361 metadata: Some(Box::new(metadata)),
362 data,
363 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
364 num_attempts: 0,
365 due: None,
366 }),
367 link: LinkedListAtomicLink::default(),
368 }),
369 }
370 }
371
372 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
373 let meta_spool = get_meta_spool();
374 let data = meta_spool.load(id).await?;
375 Self::new_from_spool(id, data)
376 }
377
378 pub fn get_num_attempts(&self) -> u16 {
379 let inner = self.msg_and_id.inner.lock();
380 inner.num_attempts
381 }
382
383 pub fn set_num_attempts(&self, num_attempts: u16) {
384 let mut inner = self.msg_and_id.inner.lock();
385 inner.num_attempts = num_attempts;
386 }
387
388 pub fn increment_num_attempts(&self) {
389 let mut inner = self.msg_and_id.inner.lock();
390 inner.num_attempts += 1;
391 }
392
393 pub async fn set_scheduling(
394 &self,
395 scheduling: Option<Scheduling>,
396 ) -> anyhow::Result<Option<Scheduling>> {
397 self.load_meta_if_needed().await?;
398 let mut inner = self.msg_and_id.inner.lock();
399 match &mut inner.metadata {
400 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
401 Some(meta) => {
402 meta.schedule = scheduling;
403 inner
404 .flags
405 .set(MessageFlags::SCHEDULED, scheduling.is_some());
406 if let Some(sched) = scheduling {
407 let due = inner.due.unwrap_or_else(Utc::now);
408 inner.due = Some(sched.adjust_for_schedule(due));
409 }
410 Ok(scheduling)
411 }
412 }
413 }
414
415 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
416 self.load_meta_if_needed().await?;
417 let inner = self.msg_and_id.inner.lock();
418 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
419 }
420
421 pub fn get_due(&self) -> Option<DateTime<Utc>> {
422 let inner = self.msg_and_id.inner.lock();
423 inner.due
424 }
425
426 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
427 let scale = rand::random::<f32>();
428 let value = (scale * limit as f32) as i64;
429 self.delay_by(seconds(value)?).await
430 }
431
432 pub async fn delay_by(
433 &self,
434 duration: chrono::Duration,
435 ) -> anyhow::Result<Option<DateTime<Utc>>> {
436 let due = Utc::now() + duration;
437 self.set_due(Some(due)).await
438 }
439
440 pub async fn delay_by_and_jitter(
442 &self,
443 duration: chrono::Duration,
444 ) -> anyhow::Result<Option<DateTime<Utc>>> {
445 let scale = rand::random::<f32>();
446 let value = (scale * 60.) as i64;
447 let due = Utc::now() + duration + seconds(value)?;
448 self.set_due(Some(due)).await
449 }
450
451 pub async fn set_due(
452 &self,
453 due: Option<DateTime<Utc>>,
454 ) -> anyhow::Result<Option<DateTime<Utc>>> {
455 let due = {
456 let mut inner = self.msg_and_id.inner.lock();
457
458 if !inner.flags.contains(MessageFlags::SCHEDULED) {
459 inner.due = due;
461 return Ok(inner.due);
462 }
463
464 let due = due.unwrap_or_else(Utc::now);
465
466 if let Some(meta) = &inner.metadata {
467 inner.due = match &meta.schedule {
468 Some(sched) => Some(sched.adjust_for_schedule(due)),
469 None => Some(due),
470 };
471 return Ok(inner.due);
472 }
473
474 due
477 };
478
479 self.load_meta().await?;
480
481 {
482 let mut inner = self.msg_and_id.inner.lock();
483 match &inner.metadata {
484 Some(meta) => {
485 inner.due = match &meta.schedule {
486 Some(sched) => Some(sched.adjust_for_schedule(due)),
487 None => Some(due),
488 };
489 Ok(inner.due)
490 }
491 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
492 }
493 }
494 }
495
496 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
497 let inner = self.msg_and_id.inner.lock();
498 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
499 Some(Arc::clone(&inner.data))
500 } else {
501 None
502 }
503 }
504
505 fn get_meta_if_dirty(&self) -> Option<MetaData> {
506 let inner = self.msg_and_id.inner.lock();
507 if inner.flags.contains(MessageFlags::META_DIRTY) {
508 inner.metadata.as_ref().map(|md| (**md).clone())
509 } else {
510 None
511 }
512 }
513
514 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
515 self.load_meta_if_needed().await?;
516 let inner = self.msg_and_id.inner.lock();
517 inner
518 .metadata
519 .as_ref()
520 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
521 .map(|md| (**md).clone())
522 }
523
524 pub fn set_force_sync(&self, force: bool) {
525 let mut inner = self.msg_and_id.inner.lock();
526 inner.flags.set(MessageFlags::FORCE_SYNC, force);
527 }
528
529 pub fn needs_save(&self) -> bool {
530 let inner = self.msg_and_id.inner.lock();
531 inner.flags.contains(MessageFlags::META_DIRTY)
532 || inner.flags.contains(MessageFlags::DATA_DIRTY)
533 }
534
535 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
536 let _timer = SAVE_HIST.start_timer();
537 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
538 .await
539 }
540
541 pub async fn save_to(
542 &self,
543 meta_spool: &(dyn Spool + Send + Sync),
544 data_spool: &(dyn Spool + Send + Sync),
545 deadline: Option<Instant>,
546 ) -> anyhow::Result<()> {
547 let force_sync = self
548 .msg_and_id
549 .inner
550 .lock()
551 .flags
552 .contains(MessageFlags::FORCE_SYNC);
553
554 let data_fut = if let Some(data) = self.get_data_if_dirty() {
555 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
556 data_spool
557 .store(self.msg_and_id.id, data, force_sync, deadline)
558 .map(|_| true)
559 .boxed()
560 } else {
561 futures::future::ready(false).boxed()
562 };
563 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
564 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
565 meta_spool
566 .store(self.msg_and_id.id, meta, force_sync, deadline)
567 .map(|_| true)
568 .boxed()
569 } else {
570 futures::future::ready(false).boxed()
571 };
572
573 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
579
580 if data_res {
581 self.msg_and_id
582 .inner
583 .lock()
584 .flags
585 .remove(MessageFlags::DATA_DIRTY);
586 }
587 if meta_res {
588 self.msg_and_id
589 .inner
590 .lock()
591 .flags
592 .remove(MessageFlags::META_DIRTY);
593 }
594 Ok(())
595 }
596
597 pub fn id(&self) -> &SpoolId {
598 &self.msg_and_id.id
599 }
600
601 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
603 self.save(None).await?;
604 self.shrink()
605 }
606
607 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
609 self.save(None).await?;
610 self.shrink_data()
611 }
612
613 pub fn shrink_data(&self) -> anyhow::Result<bool> {
614 let mut inner = self.msg_and_id.inner.lock();
615 let mut did_shrink = false;
616 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
617 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
618 }
619 if !inner.data.is_empty() {
620 DATA_COUNT.dec();
621 did_shrink = true;
622 }
623 if !inner.data.is_empty() {
624 inner.data = NO_DATA.clone();
625 did_shrink = true;
626 }
627 Ok(did_shrink)
628 }
629
630 pub fn shrink(&self) -> anyhow::Result<bool> {
631 let mut inner = self.msg_and_id.inner.lock();
632 let mut did_shrink = false;
633 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
634 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
635 }
636 if inner.flags.contains(MessageFlags::META_DIRTY) {
637 anyhow::bail!("Cannot shrink message: META_DIRTY");
638 }
639 if inner.metadata.take().is_some() {
640 META_COUNT.dec();
641 did_shrink = true;
642 }
643 if !inner.data.is_empty() {
644 DATA_COUNT.dec();
645 did_shrink = true;
646 }
647 if !inner.data.is_empty() {
648 inner.data = NO_DATA.clone();
649 did_shrink = true;
650 }
651 Ok(did_shrink)
652 }
653
654 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
655 self.load_meta_if_needed().await?;
656 let inner = self.msg_and_id.inner.lock();
657 match &inner.metadata {
658 Some(meta) => Ok(meta.sender.clone()),
659 None => anyhow::bail!("Message::sender metadata is not loaded"),
660 }
661 }
662
663 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
664 self.load_meta_if_needed().await?;
665 let mut inner = self.msg_and_id.inner.lock();
666 match &mut inner.metadata {
667 Some(meta) => {
668 meta.sender = sender;
669 inner.flags.set(MessageFlags::DATA_DIRTY, true);
670 Ok(())
671 }
672 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
673 }
674 }
675
676 #[deprecated = "use recipient_list or first_recipient instead"]
677 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
678 self.first_recipient().await
679 }
680
681 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
682 self.load_meta_if_needed().await?;
683 let inner = self.msg_and_id.inner.lock();
684 match &inner.metadata {
685 Some(meta) => match meta.recipient.first() {
686 Some(recip) => Ok(recip.clone()),
687 None => anyhow::bail!("recipient list is empty!?"),
688 },
689 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
690 }
691 }
692
693 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
694 self.load_meta_if_needed().await?;
695 let inner = self.msg_and_id.inner.lock();
696 match &inner.metadata {
697 Some(meta) => Ok(meta.recipient.clone()),
698 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
699 }
700 }
701
702 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
703 self.load_meta_if_needed().await?;
704 let inner = self.msg_and_id.inner.lock();
705 match &inner.metadata {
706 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
707 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
708 }
709 }
710
711 #[deprecated = "use set_recipient_list instead"]
712 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
713 self.set_recipient_list(vec![recipient]).await
714 }
715
716 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
717 self.load_meta_if_needed().await?;
718
719 let mut inner = self.msg_and_id.inner.lock();
720 match &mut inner.metadata {
721 Some(meta) => {
722 meta.recipient = recipient;
723 inner.flags.set(MessageFlags::DATA_DIRTY, true);
724 Ok(())
725 }
726 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
727 }
728 }
729
730 pub fn is_meta_loaded(&self) -> bool {
731 self.msg_and_id.inner.lock().metadata.is_some()
732 }
733
734 pub fn is_data_loaded(&self) -> bool {
735 !self.msg_and_id.inner.lock().data.is_empty()
736 }
737
738 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
739 if self.is_meta_loaded() {
740 return Ok(());
741 }
742 self.load_meta().await
743 }
744
745 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
746 self.load_data_if_needed().await
747 }
748
749 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
750 if self.is_data_loaded() {
751 return Ok(self.get_data_maybe_not_loaded());
752 }
753 self.load_data().await
754 }
755
756 pub async fn load_meta(&self) -> anyhow::Result<()> {
757 let _timer = LOAD_META_HIST.start_timer();
758 self.load_meta_from(&**get_meta_spool()).await
759 }
760
761 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
762 let id = self.id();
763 let data = meta_spool.load(*id).await?;
764 let mut inner = self.msg_and_id.inner.lock();
765 let was_not_loaded = inner.metadata.is_none();
766 let metadata: MetaData = serde_json::from_slice(&data)?;
767 inner.metadata.replace(Box::new(metadata));
768 if was_not_loaded {
769 META_COUNT.inc();
770 }
771 Ok(())
772 }
773
774 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
775 let _timer = LOAD_DATA_HIST.start_timer();
776 self.load_data_from(&**get_data_spool()).await
777 }
778
779 async fn load_data_from(
780 &self,
781 data_spool: &(dyn Spool + Send + Sync),
782 ) -> anyhow::Result<Arc<Box<[u8]>>> {
783 let data = data_spool.load(*self.id()).await?;
784 let mut inner = self.msg_and_id.inner.lock();
785 let was_empty = inner.data.is_empty();
786 inner.data = Arc::new(data.into_boxed_slice());
787 if was_empty {
788 DATA_COUNT.inc();
789 }
790 Ok(inner.data.clone())
791 }
792
793 pub fn assign_data(&self, data: Vec<u8>) {
794 let mut inner = self.msg_and_id.inner.lock();
795 let was_empty = inner.data.is_empty();
796 inner.data = Arc::new(data.into_boxed_slice());
797 inner.flags.set(MessageFlags::DATA_DIRTY, true);
798 if was_empty {
799 DATA_COUNT.inc();
800 }
801 }
802
803 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
804 let inner = self.msg_and_id.inner.lock();
805 inner.data.clone()
806 }
807
808 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
809 &self,
810 key: S,
811 value: V,
812 ) -> anyhow::Result<()> {
813 self.load_meta_if_needed().await?;
814 let mut inner = self.msg_and_id.inner.lock();
815 match &mut inner.metadata {
816 None => anyhow::bail!("set_meta: metadata must be loaded first"),
817 Some(meta) => {
818 let key = key.as_ref();
819 let value = value.into();
820
821 match &mut meta.meta {
822 serde_json::Value::Object(map) => {
823 map.insert(key.to_string(), value);
824 }
825 _ => anyhow::bail!("metadata is somehow not a json object"),
826 }
827
828 inner.flags.set(MessageFlags::META_DIRTY, true);
829 Ok(())
830 }
831 }
832 }
833
834 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
835 self.load_meta_if_needed().await?;
836 let mut inner = self.msg_and_id.inner.lock();
837 match &mut inner.metadata {
838 None => anyhow::bail!("set_meta: metadata must be loaded first"),
839 Some(meta) => {
840 let key = key.as_ref();
841
842 match &mut meta.meta {
843 serde_json::Value::Object(map) => {
844 map.remove(key);
845 }
846 _ => anyhow::bail!("metadata is somehow not a json object"),
847 }
848
849 inner.flags.set(MessageFlags::META_DIRTY, true);
850 Ok(())
851 }
852 }
853 }
854
855 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
857 &self,
858 key: S,
859 ) -> anyhow::Result<Option<String>> {
860 match self.get_meta(key).await {
861 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
862 Ok(serde_json::Value::Null) => Ok(None),
863 hmm => {
864 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
865 }
866 }
867 }
868
869 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
870 self.load_meta_if_needed().await?;
871 let inner = self.msg_and_id.inner.lock();
872 match &inner.metadata {
873 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
874 Some(meta) => Ok(meta.meta.clone()),
875 }
876 }
877
878 pub async fn get_meta<S: serde_json::value::Index>(
879 &self,
880 key: S,
881 ) -> anyhow::Result<serde_json::Value> {
882 self.load_meta_if_needed().await?;
883 let inner = self.msg_and_id.inner.lock();
884 match &inner.metadata {
885 None => anyhow::bail!("get_meta: metadata must be loaded first"),
886 Some(meta) => match meta.meta.get(key) {
887 Some(value) => Ok(value.clone()),
888 None => Ok(serde_json::Value::Null),
889 },
890 }
891 }
892
893 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
894 self.msg_and_id.id.age(now)
895 }
896
897 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
898 Ok(match self.get_meta_string("queue").await? {
899 Some(name) => name,
900 None => {
901 let name = QueueNameComponents::format(
902 self.get_meta_string("campaign").await?,
903 self.get_meta_string("tenant").await?,
904 self.first_recipient()
905 .await?
906 .domain()
907 .to_string()
908 .to_lowercase(),
909 self.get_meta_string("routing_domain").await?,
910 );
911 name.to_string()
912 }
913 })
914 }
915
916 #[cfg(feature = "impl")]
917 pub async fn arc_verify(
918 &self,
919 opt_resolver_name: Option<String>,
920 ) -> anyhow::Result<AuthenticationResult> {
921 let resolver = get_resolver_instance(&opt_resolver_name)?;
922 let data = self.data().await?;
923 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
924
925 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
926 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
927
928 let arc = ARC::verify(&message, &**resolver).await;
929 Ok(arc.authentication_result())
930 }
931
932 #[cfg(feature = "impl")]
933 pub async fn arc_seal(
934 &self,
935 signer: Signer,
936 auth_results: AuthenticationResults,
937 opt_resolver_name: Option<String>,
938 ) -> anyhow::Result<()> {
939 let resolver = get_resolver_instance(&opt_resolver_name)?;
940 let data = self.data().await?;
941 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
942 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
943 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
944 let arc = ARC::verify(&message, &**resolver).await;
945
946 let headers = arc.seal(&message, auth_results, signer.signer())?;
947 if !headers.is_empty() {
948 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
949
950 for hdr in headers {
951 hdr.write_header(&mut new_data).ok();
952 }
953 new_data.extend_from_slice(&data);
954 self.assign_data(new_data);
955 }
956
957 Ok(())
958 }
959
960 #[cfg(feature = "impl")]
961 pub async fn dkim_verify(
962 &self,
963 opt_resolver_name: Option<String>,
964 ) -> anyhow::Result<Vec<AuthenticationResult>> {
965 let resolver = get_resolver_instance(&opt_resolver_name)?;
966 let data = self.data().await?;
967 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
968
969 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
970 if parsed
971 .overall_conformance
972 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
973 {
974 return Ok(vec![AuthenticationResult {
975 method: "dkim".to_string(),
976 method_version: None,
977 result: "permerror".to_string(),
978 reason: Some("message has non-canonical line endings".to_string()),
979 props: Default::default(),
980 }]);
981 }
982 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
983
984 let from = match message.get_headers().from() {
985 Ok(Some(from)) if from.0.len() == 1 => from.0,
986 Ok(Some(from)) => {
987 return Ok(vec![AuthenticationResult {
988 method: "dkim".to_string(),
989 method_version: None,
990 result: "permerror".to_string(),
991 reason: Some(format!(
992 "From header must have a single sender, found {}",
993 from.len()
994 )),
995 props: Default::default(),
996 }]);
997 }
998 Ok(None) => {
999 return Ok(vec![AuthenticationResult {
1000 method: "dkim".to_string(),
1001 method_version: None,
1002 result: "permerror".to_string(),
1003 reason: Some("From header is missing".to_string()),
1004 props: Default::default(),
1005 }]);
1006 }
1007 Err(err) => {
1008 return Ok(vec![AuthenticationResult {
1009 method: "dkim".to_string(),
1010 method_version: None,
1011 result: "permerror".to_string(),
1012 reason: Some(format!("From header is invalid: {err:#}")),
1013 props: Default::default(),
1014 }]);
1015 }
1016 };
1017 let from_domain = &from[0].address.domain;
1018
1019 let results =
1020 kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
1021 Ok(results)
1022 }
1023
1024 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1029 let data = self.data().await?;
1030 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1031 Ok(MimePart::parse(owned_data)?)
1032 }
1033
1034 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1035 let data = self.data().await?;
1036 Report::parse(&data)
1037 }
1038
1039 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1040 let data = self.data().await?;
1041 ARFReport::parse(&data)
1042 }
1043
1044 pub async fn prepend_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1045 let data = self.data().await?;
1046 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1047 emit_header(&mut new_data, name, value);
1048 new_data.extend_from_slice(&data);
1049 self.assign_data(new_data);
1050 Ok(())
1051 }
1052
1053 pub async fn append_header(&self, name: Option<&str>, value: &str) -> anyhow::Result<()> {
1054 let data = self.data().await?;
1055 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1056 for (idx, window) in data.windows(4).enumerate() {
1057 if window == b"\r\n\r\n" {
1058 let headers = &data[0..idx + 2];
1059 let body = &data[idx + 2..];
1060
1061 new_data.extend_from_slice(headers);
1062 emit_header(&mut new_data, name, value);
1063 new_data.extend_from_slice(body);
1064 self.assign_data(new_data);
1065 return Ok(());
1066 }
1067 }
1068
1069 anyhow::bail!("append_header could not find the end of the header block");
1070 }
1071
1072 pub async fn get_address_header(
1073 &self,
1074 header_name: &str,
1075 ) -> anyhow::Result<Option<HeaderAddressList>> {
1076 let data = self.data().await?;
1077 let HeaderParseResult { headers, .. } =
1078 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1079
1080 match headers.get_first(header_name) {
1081 Some(hdr) => {
1082 let list = hdr.as_address_list()?;
1083 let result: HeaderAddressList = list.into();
1084 Ok(Some(result))
1085 }
1086 None => Ok(None),
1087 }
1088 }
1089
1090 pub async fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
1091 let data = self.data().await?;
1092 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1093
1094 match headers.get_first(name) {
1095 Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
1096 None => Ok(None),
1097 }
1098 }
1099
1100 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
1101 let data = self.data().await?;
1102 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1103
1104 let mut values = vec![];
1105 for hdr in headers.iter_named(name) {
1106 values.push(hdr.as_unstructured()?);
1107 }
1108 Ok(values)
1109 }
1110
1111 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
1112 let data = self.data().await?;
1113 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1114
1115 let mut values = vec![];
1116 for hdr in headers.iter() {
1117 values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
1118 }
1119 Ok(values)
1120 }
1121
1122 pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1123 &self,
1124 mut func: F,
1125 ) -> anyhow::Result<()> {
1126 let data = self.data().await?;
1127 let mut new_data = Vec::with_capacity(data.len());
1128 let HeaderParseResult {
1129 headers,
1130 body_offset,
1131 ..
1132 } = Header::parse_headers(data.as_ref().as_ref())?;
1133 for hdr in headers.iter() {
1134 let retain = (func)(hdr);
1135 if !retain {
1136 continue;
1137 }
1138 hdr.write_header(&mut new_data)?;
1139 }
1140 new_data.extend_from_slice(b"\r\n");
1141 new_data.extend_from_slice(&data[body_offset..]);
1142 self.assign_data(new_data);
1143 Ok(())
1144 }
1145
1146 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1147 let mut removed = false;
1148 self.retain_headers(|hdr| {
1149 if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
1150 removed = true;
1151 false
1152 } else {
1153 true
1154 }
1155 })
1156 .await
1157 }
1158
1159 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1160 let data = self.data().await?;
1161 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1162
1163 for hdr in headers.iter() {
1164 let do_import = if names.is_empty() {
1165 is_x_header(hdr.get_name())
1166 } else {
1167 is_header_in_names_list(hdr.get_name(), &names)
1168 };
1169 if do_import {
1170 let name = imported_header_name(hdr.get_name());
1171 self.set_meta(name, hdr.as_unstructured()?).await?;
1172 }
1173 }
1174
1175 Ok(())
1176 }
1177
1178 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1179 self.retain_headers(|hdr| {
1180 if names.is_empty() {
1181 !is_x_header(hdr.get_name())
1182 } else {
1183 !is_header_in_names_list(hdr.get_name(), &names)
1184 }
1185 })
1186 .await
1187 }
1188
1189 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1190 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1191 .await
1192 }
1193
1194 #[cfg(feature = "impl")]
1195 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1196 let data = self.data().await?;
1197 let header = if let Some(runtime) = SIGN_POOL.get() {
1198 runtime.spawn_blocking(move || signer.sign(&data)).await??
1199 } else {
1200 signer.sign(&data)?
1201 };
1202 self.prepend_header(None, &header).await?;
1203 Ok(())
1204 }
1205
1206 pub async fn import_scheduling_header(
1207 &self,
1208 header_name: &str,
1209 remove: bool,
1210 ) -> anyhow::Result<Option<Scheduling>> {
1211 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1212 let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1213 format!("{value} from header {header_name} is not a valid Scheduling header")
1214 })?;
1215 let result = self.set_scheduling(Some(sched)).await?;
1216
1217 if remove {
1218 self.remove_all_named_headers(header_name).await?;
1219 }
1220
1221 Ok(result)
1222 } else {
1223 Ok(None)
1224 }
1225 }
1226
1227 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1228 let data = self.data().await?;
1229 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1230 let parts = msg.simplified_structure_pointers()?;
1231 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1232 match p.body()? {
1233 DecodedBody::Text(text) => {
1234 let mut text = text.as_str().to_string();
1235 text.push_str("\r\n");
1236 text.push_str(content);
1237 p.replace_text_body("text/plain", &text)?;
1238
1239 let new_data = msg.to_message_string();
1240 self.assign_data(new_data.into_bytes());
1241 Ok(true)
1242 }
1243 DecodedBody::Binary(_) => {
1244 anyhow::bail!("expected text/plain part to be text, but it is binary");
1245 }
1246 }
1247 } else {
1248 Ok(false)
1249 }
1250 }
1251
1252 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1253 let data = self.data().await?;
1254 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1255 let parts = msg.simplified_structure_pointers()?;
1256 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1257 match p.body()? {
1258 DecodedBody::Text(text) => {
1259 let mut text = text.as_str().to_string();
1260
1261 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1262 Some(idx) => {
1263 text.insert_str(idx, content);
1264 text.insert_str(idx, "\r\n");
1265 }
1266 None => {
1267 text.push_str("\r\n");
1269 text.push_str(content);
1270 }
1271 }
1272
1273 p.replace_text_body("text/html", &text)?;
1274
1275 let new_data = msg.to_message_string();
1276 self.assign_data(new_data.into_bytes());
1277 Ok(true)
1278 }
1279 DecodedBody::Binary(_) => {
1280 anyhow::bail!("expected text/html part to be text, but it is binary");
1281 }
1282 }
1283 } else {
1284 Ok(false)
1285 }
1286 }
1287
1288 pub async fn check_fix_conformance(
1289 &self,
1290 check: MessageConformance,
1291 fix: MessageConformance,
1292 settings: Option<&CheckFixSettings>,
1293 ) -> anyhow::Result<()> {
1294 let data = self.data().await?;
1295 let data_bytes = data.as_ref().as_ref();
1296 let mut msg = MimePart::parse(data_bytes)?;
1297
1298 let conformance = msg.conformance();
1299
1300 let check = check - fix;
1302
1303 if check.intersects(conformance) {
1304 let problems = check.intersection(conformance).to_string();
1305 anyhow::bail!("Message has conformance issues: {problems}");
1306 }
1307
1308 if fix.intersects(conformance) {
1309 let to_fix = fix.intersection(conformance);
1310 let problems = to_fix.to_string();
1311
1312 let missing_headers_only = to_fix
1313 .difference(
1314 MessageConformance::MISSING_DATE_HEADER
1315 | MessageConformance::MISSING_MIME_VERSION
1316 | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1317 )
1318 .is_empty();
1319
1320 if !missing_headers_only {
1321 if to_fix.contains(MessageConformance::NEEDS_TRANSFER_ENCODING) {
1322 if let Some(settings) = settings {
1331 if settings.detect_encoding {
1332 use charset_normalizer_rs::entity::NormalizerSettings;
1333
1334 let norm_settings = NormalizerSettings {
1335 include_encodings: settings.include_encodings.clone(),
1336 exclude_encodings: settings.exclude_encodings.clone(),
1337 ..Default::default()
1338 };
1339
1340 let guess = charset_normalizer_rs::from_bytes(
1341 &data_bytes.to_vec(),
1342 Some(norm_settings),
1343 )
1344 .map_err(|err| anyhow::anyhow!("{err}"))?;
1345 if let Some(best) = guess.get_best() {
1346 if let Some(decoded) = best.decoded_payload() {
1347 msg = MimePart::parse(decoded.to_string())?;
1348 }
1349 }
1350 }
1351 }
1352 }
1353
1354 msg = msg.rebuild().with_context(|| {
1355 format!("Rebuilding message to correct conformance issues: {problems}")
1356 })?;
1357 }
1358
1359 if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1360 msg.headers_mut().set_date(Utc::now())?;
1361 }
1362
1363 if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1364 msg.headers_mut().set_mime_version("1.0")?;
1365 }
1366
1367 if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1368 let sender = self.sender().await?;
1369 let domain = sender.domain();
1370 let id = *self.id();
1371 msg.headers_mut()
1372 .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")))?;
1373 }
1374
1375 let new_data = msg.to_message_string();
1376 self.assign_data(new_data.into_bytes());
1377 }
1378
1379 Ok(())
1380 }
1381}
1382
1383fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1384 for name in names {
1385 if hdr_name.eq_ignore_ascii_case(name) {
1386 return true;
1387 }
1388 }
1389 false
1390}
1391
1392fn imported_header_name(name: &str) -> String {
1393 name.chars()
1394 .map(|c| match c.to_ascii_lowercase() {
1395 '-' => '_',
1396 c => c,
1397 })
1398 .collect()
1399}
1400
1401fn is_x_header(name: &str) -> bool {
1402 name.starts_with("X-") || name.starts_with("x-")
1403}
1404
1405fn size_header(name: Option<&str>, value: &str) -> usize {
1406 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1407}
1408
1409fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1410 if let Some(name) = name {
1411 dest.extend_from_slice(name.as_bytes());
1412 dest.extend_from_slice(b": ");
1413 }
1414 dest.extend_from_slice(value.as_bytes());
1415 if !value.ends_with("\r\n") {
1416 dest.extend_from_slice(b"\r\n");
1417 }
1418}
1419
1420#[cfg(feature = "impl")]
1421impl UserData for Message {
1422 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1423 methods.add_async_method(
1424 "set_meta",
1425 move |_, this, (name, value): (String, mlua::Value)| async move {
1426 let value = serde_json::value::to_value(value).map_err(any_err)?;
1427 this.set_meta(name, value).await.map_err(any_err)?;
1428 Ok(())
1429 },
1430 );
1431 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1432 let value = this.get_meta(name).await.map_err(any_err)?;
1433 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1434 });
1435 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1436 let data = this.data().await.map_err(any_err)?;
1437 lua.create_string(&*data)
1438 });
1439 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1440 this.assign_data(data.as_bytes().to_vec());
1441 Ok(())
1442 });
1443
1444 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1445 let data = this.data().await.map_err(any_err)?;
1446 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1447 let part = MimePart::parse(owned_data).map_err(any_err)?;
1448 Ok(mod_mimepart::PartRef::new(part))
1449 });
1450
1451 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1452 this.append_text_plain(&data).await.map_err(any_err)
1453 });
1454
1455 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1456 this.append_text_html(&data).await.map_err(any_err)
1457 });
1458
1459 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1460 methods.add_async_method("sender", move |_, this, _: ()| async move {
1461 this.sender().await.map_err(any_err)
1462 });
1463
1464 methods.add_method("num_attempts", move |_, this, _: ()| {
1465 Ok(this.get_num_attempts())
1466 });
1467
1468 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1469 this.get_queue_name().await.map_err(any_err)
1470 });
1471
1472 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1473 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1474 let revised_due = this.set_due(due).await.map_err(any_err)?;
1475 lua.to_value(&revised_due)
1476 });
1477
1478 methods.add_async_method(
1479 "set_sender",
1480 move |lua, this, value: mlua::Value| async move {
1481 let sender = match value {
1482 mlua::Value::String(s) => {
1483 let s = s.to_str()?;
1484 EnvelopeAddress::parse(&s).map_err(any_err)?
1485 }
1486 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1487 };
1488 this.set_sender(sender).await.map_err(any_err)
1489 },
1490 );
1491
1492 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1493 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1494 match recipients.len() {
1495 0 => Ok(mlua::Value::Nil),
1496 1 => {
1497 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1498 recip.into_lua(&lua)
1499 }
1500 _ => recipients.into_lua(&lua),
1501 }
1502 });
1503
1504 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1505 let recipients = this.recipient_list().await.map_err(any_err)?;
1506 recipients.into_lua(&lua)
1507 });
1508
1509 methods.add_async_method(
1510 "set_recipient",
1511 move |lua, this, value: mlua::Value| async move {
1512 let recipients = match value {
1513 mlua::Value::String(s) => {
1514 let s = s.to_str()?;
1515 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1516 }
1517 _ => {
1518 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1519 recips
1520 } else {
1521 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1522 }
1523 }
1524 };
1525 this.set_recipient_list(recipients).await.map_err(any_err)
1526 },
1527 );
1528
1529 #[cfg(feature = "impl")]
1530 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1531 this.dkim_sign(signer).await.map_err(any_err)
1532 });
1533
1534 methods.add_async_method("shrink", |_, this, _: ()| async move {
1535 if this.needs_save() {
1536 this.save(None).await.map_err(any_err)?;
1537 }
1538 this.shrink().map_err(any_err)
1539 });
1540
1541 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1542 if this.needs_save() {
1543 this.save(None).await.map_err(any_err)?;
1544 }
1545 this.shrink_data().map_err(any_err)
1546 });
1547
1548 methods.add_async_method(
1549 "add_authentication_results",
1550 |lua, this, (serv_id, results): (String, mlua::Value)| async move {
1551 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1552 let results = AuthenticationResults {
1553 serv_id,
1554 version: None,
1555 results,
1556 };
1557
1558 this.prepend_header(Some("Authentication-Results"), &results.encode_value())
1559 .await
1560 .map_err(any_err)?;
1561
1562 Ok(())
1563 },
1564 );
1565
1566 #[cfg(feature = "impl")]
1567 methods.add_async_method(
1568 "arc_verify",
1569 |lua, this, opt_resolver_name: Option<String>| async move {
1570 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1571 lua.to_value_with(&results, serialize_options())
1572 },
1573 );
1574
1575 #[cfg(feature = "impl")]
1576 methods.add_async_method(
1577 "arc_seal",
1578 |lua,
1579 this,
1580 (signer, serv_id, auth_res, opt_resolver_name): (
1581 Signer,
1582 String,
1583 mlua::Value,
1584 Option<String>,
1585 )| async move {
1586 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1587 this.arc_seal(
1588 signer,
1589 AuthenticationResults {
1590 serv_id,
1591 version: None,
1592 results,
1593 },
1594 opt_resolver_name,
1595 )
1596 .await
1597 .map_err(any_err)
1598 },
1599 );
1600
1601 #[cfg(feature = "impl")]
1602 methods.add_async_method(
1603 "dkim_verify",
1604 |lua, this, opt_resolver_name: Option<String>| async move {
1605 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1606 lua.to_value_with(&results, serialize_options())
1607 },
1608 );
1609
1610 methods.add_async_method(
1611 "prepend_header",
1612 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1613 let encode = encode.unwrap_or(false);
1614 if encode {
1615 let header = Header::new_unstructured(name.clone(), value);
1616 this.prepend_header(Some(&name), header.get_raw_value())
1617 .await
1618 .map_err(any_err)?;
1619 } else {
1620 this.prepend_header(Some(&name), &value)
1621 .await
1622 .map_err(any_err)?;
1623 }
1624 Ok(())
1625 },
1626 );
1627 methods.add_async_method(
1628 "append_header",
1629 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1630 let encode = encode.unwrap_or(false);
1631 if encode {
1632 let header = Header::new_unstructured(name.clone(), value);
1633 this.append_header(Some(&name), header.get_raw_value())
1634 .await
1635 .map_err(any_err)?;
1636 } else {
1637 this.append_header(Some(&name), &value)
1638 .await
1639 .map_err(any_err)?;
1640 }
1641 Ok(())
1642 },
1643 );
1644 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1645 this.get_address_header(&name).await.map_err(any_err)
1646 });
1647 methods.add_async_method("from_header", |_, this, ()| async move {
1648 this.get_address_header("From").await.map_err(any_err)
1649 });
1650 methods.add_async_method("to_header", |_, this, ()| async move {
1651 this.get_address_header("To").await.map_err(any_err)
1652 });
1653
1654 methods.add_async_method(
1655 "get_first_named_header_value",
1656 |_, this, name: String| async move {
1657 this.get_first_named_header_value(&name)
1658 .await
1659 .map_err(any_err)
1660 },
1661 );
1662 methods.add_async_method(
1663 "get_all_named_header_values",
1664 |_, this, name: String| async move {
1665 this.get_all_named_header_values(&name)
1666 .await
1667 .map_err(any_err)
1668 },
1669 );
1670 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1671 Ok(this
1672 .get_all_headers()
1673 .await
1674 .map_err(any_err)?
1675 .into_iter()
1676 .map(|(name, value)| vec![name, value])
1677 .collect::<Vec<Vec<String>>>())
1678 });
1679 methods.add_async_method(
1680 "import_x_headers",
1681 |_, this, names: Option<Vec<String>>| async move {
1682 this.import_x_headers(names.unwrap_or_default())
1683 .await
1684 .map_err(any_err)
1685 },
1686 );
1687
1688 methods.add_async_method(
1689 "remove_x_headers",
1690 |_, this, names: Option<Vec<String>>| async move {
1691 this.remove_x_headers(names.unwrap_or_default())
1692 .await
1693 .map_err(any_err)
1694 },
1695 );
1696 methods.add_async_method(
1697 "remove_all_named_headers",
1698 |_, this, name: String| async move {
1699 this.remove_all_named_headers(&name).await.map_err(any_err)
1700 },
1701 );
1702
1703 methods.add_async_method(
1704 "import_scheduling_header",
1705 |lua, this, (header_name, remove): (String, bool)| async move {
1706 let opt_schedule = this
1707 .import_scheduling_header(&header_name, remove)
1708 .await
1709 .map_err(any_err)?;
1710 lua.to_value(&opt_schedule)
1711 },
1712 );
1713
1714 methods.add_async_method(
1715 "set_scheduling",
1716 move |lua, this, params: mlua::Value| async move {
1717 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1718 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1719 lua.to_value(&opt_schedule)
1720 },
1721 );
1722
1723 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1724 let report = this.parse_rfc3464().await.map_err(any_err)?;
1725 match report {
1726 Some(report) => lua.to_value_with(&report, serialize_options()),
1727 None => Ok(mlua::Value::Nil),
1728 }
1729 });
1730
1731 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1732 let report = this.parse_rfc5965().await.map_err(any_err)?;
1733 match report {
1734 Some(report) => lua.to_value_with(&report, serialize_options()),
1735 None => Ok(mlua::Value::Nil),
1736 }
1737 });
1738
1739 methods.add_async_method("save", |_, this, ()| async move {
1740 this.save(None).await.map_err(any_err)
1741 });
1742
1743 methods.add_method("set_force_sync", move |_, this, force: bool| {
1744 this.set_force_sync(force);
1745 Ok(())
1746 });
1747
1748 methods.add_async_method(
1749 "check_fix_conformance",
1750 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1751 use std::str::FromStr;
1752 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1753 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1754
1755 let settings = match settings {
1756 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1757 None => None,
1758 };
1759
1760 match this
1761 .check_fix_conformance(check, fix, settings.as_ref())
1762 .await
1763 {
1764 Ok(_) => Ok(None),
1765 Err(err) => Ok(Some(format!("{err:#}"))),
1766 }
1767 },
1768 );
1769 }
1770}
1771
1772#[derive(Default, Debug, Clone)]
1773#[cfg_attr(feature = "impl", derive(Deserialize))]
1774pub struct CheckFixSettings {
1775 pub detect_encoding: bool,
1776 pub include_encodings: Vec<String>,
1777 pub exclude_encodings: Vec<String>,
1778}
1779
1780impl TimerEntryWithDelay for WeakMessage {
1781 fn delay(&self) -> Duration {
1782 match self.upgrade() {
1783 None => {
1784 Duration::from_millis(0)
1786 }
1787 Some(msg) => msg.delay(),
1788 }
1789 }
1790}
1791
1792impl TimerEntryWithDelay for Message {
1793 fn delay(&self) -> Duration {
1794 let inner = self.msg_and_id.inner.lock();
1795 match inner.due {
1796 Some(time) => {
1797 let now = Utc::now();
1798 let delta = time - now;
1799 delta.to_std().unwrap_or(Duration::from_millis(0))
1800 }
1801 None => Duration::from_millis(0),
1802 }
1803 }
1804}
1805
1806#[cfg(test)]
1807pub(crate) mod test {
1808 use super::*;
1809 use serde_json::json;
1810
1811 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1812 Message::new_dirty(
1813 SpoolId::new(),
1814 EnvelopeAddress::parse("sender@example.com").unwrap(),
1815 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1816 serde_json::json!({}),
1817 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1818 )
1819 .unwrap()
1820 }
1821
1822 fn data_as_string(msg: &Message) -> String {
1823 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1824 }
1825
1826 const X_HDR_CONTENT: &str =
1827 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1828
1829 #[tokio::test]
1830 async fn import_all_x_headers() {
1831 let msg = new_msg_body(X_HDR_CONTENT);
1832
1833 msg.import_x_headers(vec![]).await.unwrap();
1834 k9::assert_equal!(
1835 msg.get_meta_obj().await.unwrap(),
1836 json!({
1837 "x_hello": "there",
1838 "x_header": "value",
1839 })
1840 );
1841 }
1842
1843 #[tokio::test]
1844 async fn meta_and_nil() {
1845 let msg = new_msg_body(X_HDR_CONTENT);
1846 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1848 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1849
1850 let lua = mlua::Lua::new();
1852 lua.globals().set("msg", msg).unwrap();
1853 lua.load("assert(msg:get_meta('test') == nil)")
1854 .exec()
1855 .unwrap();
1856 }
1857
1858 #[tokio::test]
1859 async fn import_some_x_headers() {
1860 let msg = new_msg_body(X_HDR_CONTENT);
1861
1862 msg.import_x_headers(vec!["x-hello".to_string()])
1863 .await
1864 .unwrap();
1865 k9::assert_equal!(
1866 msg.get_meta_obj().await.unwrap(),
1867 json!({
1868 "x_hello": "there",
1869 })
1870 );
1871 }
1872
1873 #[tokio::test]
1874 async fn remove_all_x_headers() {
1875 let msg = new_msg_body(X_HDR_CONTENT);
1876
1877 msg.remove_x_headers(vec![]).await.unwrap();
1878 k9::assert_equal!(
1879 data_as_string(&msg),
1880 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1881 );
1882 }
1883
1884 #[tokio::test]
1885 async fn prepend_header_2_params() {
1886 let msg = new_msg_body(X_HDR_CONTENT);
1887
1888 msg.prepend_header(Some("Date"), "Today").await.unwrap();
1889 k9::assert_equal!(
1890 data_as_string(&msg),
1891 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1892 );
1893 }
1894
1895 #[tokio::test]
1896 async fn prepend_header_1_params() {
1897 let msg = new_msg_body(X_HDR_CONTENT);
1898
1899 msg.prepend_header(None, "Date: Today").await.unwrap();
1900 k9::assert_equal!(
1901 data_as_string(&msg),
1902 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1903 );
1904 }
1905
1906 #[tokio::test]
1907 async fn append_header_2_params() {
1908 let msg = new_msg_body(X_HDR_CONTENT);
1909
1910 msg.append_header(Some("Date"), "Today").await.unwrap();
1911 k9::assert_equal!(
1912 data_as_string(&msg),
1913 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1914 );
1915 }
1916
1917 #[tokio::test]
1918 async fn append_header_1_params() {
1919 let msg = new_msg_body(X_HDR_CONTENT);
1920
1921 msg.append_header(None, "Date: Today").await.unwrap();
1922 k9::assert_equal!(
1923 data_as_string(&msg),
1924 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1925 );
1926 }
1927
1928 const MULTI_HEADER_CONTENT: &str =
1929 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1930
1931 #[tokio::test]
1932 async fn get_first_header() {
1933 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1934 k9::assert_equal!(
1935 msg.get_first_named_header_value("X-header")
1936 .await
1937 .unwrap()
1938 .unwrap(),
1939 "value"
1940 );
1941 }
1942
1943 #[tokio::test]
1944 async fn get_all_header() {
1945 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1946 k9::assert_equal!(
1947 msg.get_all_named_header_values("X-header").await.unwrap(),
1948 vec!["value".to_string(), "another value".to_string()]
1949 );
1950 }
1951
1952 #[tokio::test]
1953 async fn remove_first() {
1954 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1955 msg.remove_first_named_header("X-header").await.unwrap();
1956 k9::assert_equal!(
1957 data_as_string(&msg),
1958 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1959 );
1960 }
1961
1962 #[tokio::test]
1963 async fn remove_all() {
1964 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1965 msg.remove_all_named_headers("X-header").await.unwrap();
1966 k9::assert_equal!(
1967 data_as_string(&msg),
1968 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1969 );
1970 }
1971
1972 #[tokio::test]
1973 async fn append_text_plain() {
1974 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1975 msg.append_text_plain("I am at the bottom").await.unwrap();
1976 k9::assert_equal!(
1977 data_as_string(&msg),
1978 "X-Hello: there\r\n\
1979 X-Header: value\r\n\
1980 Subject: Hello\r\n\
1981 X-Header: another value\r\n\
1982 From :Someone@somewhere\r\n\
1983 Content-Type: text/plain;\r\n\
1984 \tcharset=\"us-ascii\"\r\n\
1985 \r\n\
1986 Body\r\n\
1987 I am at the bottom\r\n"
1988 );
1989 }
1990
1991 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1992\tboundary=\"my-boundary\"\r\n\
1993\r\n\
1994--my-boundary\r\n\
1995Content-Type: text/plain;\r\n\
1996\tcharset=\"us-ascii\"\r\n\
1997\r\n\
1998plain text\r\n\
1999--my-boundary\r\n\
2000Content-Type: text/html;\r\n\
2001\tcharset=\"us-ascii\"\r\n\
2002\r\n\
2003<b>rich</b> text\r\n\
2004--my-boundary\r\n\
2005Content-Type: application/octet-stream\r\n\
2006Content-Transfer-Encoding: base64\r\n\
2007Content-Disposition: attachment;\r\n\
2008\tfilename=\"woot.bin\"\r\n\
2009Content-ID: <woot.id@somewhere>\r\n\
2010\r\n\
2011AAECAw==\r\n\
2012--my-boundary--\r\n\
2013\r\n";
2014
2015 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
2016\tboundary=\"my-boundary\"\r\n\
2017\r\n\
2018--my-boundary\r\n\
2019Content-Type: text/plain;\r\n\
2020\tcharset=\"us-ascii\"\r\n\
2021\r\n\
2022plain text\r\n\
2023--my-boundary\r\n\
2024Content-Type: text/html;\r\n\
2025\tcharset=\"us-ascii\"\r\n\
2026\r\n\
2027<BODY>\r\n\
2028<b>rich</b> text\r\n\
2029</BODY>\r\n\
2030--my-boundary\r\n\
2031Content-Type: application/octet-stream\r\n\
2032Content-Transfer-Encoding: base64\r\n\
2033Content-Disposition: attachment;\r\n\
2034\tfilename=\"woot.bin\"\r\n\
2035Content-ID: <woot.id>\r\n\
2036\r\n\
2037AAECAw==\r\n\
2038--my-boundary--\r\n\
2039\r\n";
2040
2041 #[tokio::test]
2042 async fn append_text_html() {
2043 let msg = new_msg_body(MIXED_CONTENT);
2044 msg.append_text_html("bottom html").await.unwrap();
2045 k9::snapshot!(
2046 data_as_string(&msg),
2047 r#"
2048Content-Type: multipart/mixed;\r
2049\tboundary="my-boundary"\r
2050\r
2051--my-boundary\r
2052Content-Type: text/plain;\r
2053\tcharset="us-ascii"\r
2054\r
2055plain text\r
2056--my-boundary\r
2057Content-Type: text/html;\r
2058\tcharset="us-ascii"\r
2059\r
2060<b>rich</b> text\r
2061\r
2062bottom html\r
2063--my-boundary\r
2064Content-Type: application/octet-stream\r
2065Content-Transfer-Encoding: base64\r
2066Content-Disposition: attachment;\r
2067\tfilename="woot.bin"\r
2068Content-ID: <woot.id@somewhere>\r
2069\r
2070AAECAw==\r
2071--my-boundary--\r
2072\r
2073
2074"#
2075 );
2076
2077 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2078 msg.append_text_html("bottom html 👻").await.unwrap();
2079 k9::snapshot!(
2080 data_as_string(&msg),
2081 r#"
2082Content-Type: multipart/mixed;\r
2083\tboundary="my-boundary"\r
2084\r
2085--my-boundary\r
2086Content-Type: text/plain;\r
2087\tcharset="us-ascii"\r
2088\r
2089plain text\r
2090--my-boundary\r
2091Content-Type: text/html;\r
2092\tcharset="utf-8"\r
2093Content-Transfer-Encoding: quoted-printable\r
2094\r
2095<BODY>\r
2096<b>rich</b> text\r
2097\r
2098bottom html =F0=9F=91=BB</BODY>\r
2099--my-boundary\r
2100Content-Type: application/octet-stream\r
2101Content-Transfer-Encoding: base64\r
2102Content-Disposition: attachment;\r
2103\tfilename="woot.bin"\r
2104Content-ID: <woot.id>\r
2105\r
2106AAECAw==\r
2107--my-boundary--\r
2108\r
2109
2110"#
2111 );
2112 }
2113
2114 #[tokio::test]
2115 async fn append_text_plain_mixed() {
2116 let msg = new_msg_body(MIXED_CONTENT);
2117 msg.append_text_plain("bottom text 👾").await.unwrap();
2118 k9::snapshot!(
2119 data_as_string(&msg),
2120 r#"
2121Content-Type: multipart/mixed;\r
2122\tboundary="my-boundary"\r
2123\r
2124--my-boundary\r
2125Content-Type: text/plain;\r
2126\tcharset="utf-8"\r
2127Content-Transfer-Encoding: quoted-printable\r
2128\r
2129plain text\r
2130\r
2131bottom text =F0=9F=91=BE\r
2132--my-boundary\r
2133Content-Type: text/html;\r
2134\tcharset="us-ascii"\r
2135\r
2136<b>rich</b> text\r
2137--my-boundary\r
2138Content-Type: application/octet-stream\r
2139Content-Transfer-Encoding: base64\r
2140Content-Disposition: attachment;\r
2141\tfilename="woot.bin"\r
2142Content-ID: <woot.id@somewhere>\r
2143\r
2144AAECAw==\r
2145--my-boundary--\r
2146\r
2147
2148"#
2149 );
2150 }
2151
2152 #[tokio::test]
2153 async fn check_conformance_angle_msg_id() {
2154 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2155Message-ID: <<1234@example.com>>\r
2156\r
2157Hello";
2158 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2159 k9::snapshot!(
2160 msg.check_fix_conformance(
2161 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2162 MessageConformance::empty(),
2163 None,
2164 )
2165 .await
2166 .unwrap_err(),
2167 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2168 );
2169
2170 msg.check_fix_conformance(
2171 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2172 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2173 None,
2174 )
2175 .await
2176 .unwrap();
2177
2178 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2193Message-ID: <<1234@example.com>>\r
2194\r
2195Hello this is a really long line Hello this is a really long line \
2196Hello this is a really long line Hello this is a really long line \
2197Hello this is a really long line Hello this is a really long line \
2198Hello this is a really long line Hello this is a really long line \
2199Hello this is a really long line Hello this is a really long line \
2200Hello this is a really long line Hello this is a really long line \
2201Hello this is a really long line Hello this is a really long line
2202";
2203 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2204 msg.check_fix_conformance(
2205 MessageConformance::MISSING_COLON_VALUE,
2206 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2207 None,
2208 )
2209 .await
2210 .unwrap();
2211
2212 }
2236
2237 #[tokio::test]
2238 async fn check_conformance() {
2239 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2240 msg.check_fix_conformance(
2241 MessageConformance::default(),
2242 MessageConformance::MISSING_MIME_VERSION,
2243 None,
2244 )
2245 .await
2246 .unwrap();
2247 k9::snapshot!(
2248 data_as_string(&msg),
2249 r#"
2250X-Hello: there\r
2251X-Header: value\r
2252Subject: Hello\r
2253X-Header: another value\r
2254From :Someone@somewhere\r
2255Mime-Version: 1.0\r
2256\r
2257Body
2258"#
2259 );
2260
2261 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2262 msg.check_fix_conformance(
2263 MessageConformance::default(),
2264 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2265 None,
2266 )
2267 .await
2268 .unwrap();
2269 k9::snapshot!(
2270 data_as_string(&msg),
2271 r#"
2272Content-Type: text/plain;\r
2273\tcharset="us-ascii"\r
2274X-Hello: there\r
2275X-Header: value\r
2276Subject: Hello\r
2277X-Header: another value\r
2278From: <Someone@somewhere>\r
2279Mime-Version: 1.0\r
2280\r
2281Body\r
2282
2283"#
2284 );
2285 }
2286
2287 #[tokio::test]
2288 async fn check_fix_latin_input() {
2289 const POUNDS: &str = "Subject: £\r\n\r\nGBP\r\n";
2291 let (content, _, _) = encoding_rs::WINDOWS_1252.encode(POUNDS);
2292 let msg = new_msg_body(&*content);
2293 msg.check_fix_conformance(
2294 MessageConformance::default(),
2295 MessageConformance::NEEDS_TRANSFER_ENCODING,
2296 Some(&CheckFixSettings {
2297 detect_encoding: true,
2298 include_encodings: vec!["iso-8859-1".to_string()],
2299 ..Default::default()
2300 }),
2301 )
2302 .await
2303 .unwrap();
2304
2305 let subject = msg
2306 .get_first_named_header_value("subject")
2307 .await
2308 .unwrap()
2309 .unwrap();
2310 assert_eq!(subject, "£");
2311 }
2312
2313 #[tokio::test]
2314 async fn set_scheduling() -> anyhow::Result<()> {
2315 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2316 assert!(msg.get_due().is_none(), "due is implicitly now");
2317
2318 let now = Utc::now();
2319 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2320
2321 msg.set_scheduling(Some(Scheduling {
2322 restriction: None,
2323 first_attempt: Some((now + one_day).into()),
2324 expires: None,
2325 }))
2326 .await?;
2327
2328 let due = msg.get_due().expect("due to now be set");
2329 assert!(due - now >= one_day, "due time is at least 1 day away");
2330
2331 Ok(())
2332 }
2333
2334 #[cfg(all(test, target_pointer_width = "64"))]
2335 #[test]
2336 fn sizes() {
2337 assert_eq!(std::mem::size_of::<Message>(), 8);
2338 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2339 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2340 }
2341}