1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16use kumo_log_types::rfc3464::Report;
17use kumo_log_types::rfc5965::ARFReport;
18#[cfg(feature = "impl")]
19use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
20use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
21#[cfg(feature = "impl")]
22use mlua::{LuaSerdeExt, UserData, UserDataMethods};
23use parking_lot::Mutex;
24use prometheus::{Histogram, IntGauge};
25use serde::{Deserialize, Serialize};
26use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
27use std::hash::Hash;
28use std::sync::{Arc, LazyLock, Weak};
29use std::time::{Duration, Instant};
30use timeq::TimerEntryWithDelay;
31
32bitflags::bitflags! {
33 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
34 struct MessageFlags: u8 {
35 const META_DIRTY = 1;
37 const DATA_DIRTY = 2;
39 const SCHEDULED = 4;
41 const FORCE_SYNC = 8;
43 }
44}
45
46static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
47 prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
48});
49static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
50 prometheus::register_int_gauge!(
51 "message_meta_resident_count",
52 "total number of Message objects with metadata loaded"
53 )
54 .unwrap()
55});
56static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
57 prometheus::register_int_gauge!(
58 "message_data_resident_count",
59 "total number of Message objects with body data loaded"
60 )
61 .unwrap()
62});
63static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
64static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
65 prometheus::register_histogram!(
66 "message_save_latency",
67 "how long it takes to save a message to spool"
68 )
69 .unwrap()
70});
71static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
72 prometheus::register_histogram!(
73 "message_data_load_latency",
74 "how long it takes to load message data from spool"
75 )
76 .unwrap()
77});
78static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
79 prometheus::register_histogram!(
80 "message_meta_load_latency",
81 "how long it takes to load message metadata from spool"
82 )
83 .unwrap()
84});
85
86#[derive(Debug)]
87struct MessageInner {
88 metadata: Option<Box<MetaData>>,
89 data: Arc<Box<[u8]>>,
90 flags: MessageFlags,
91 num_attempts: u16,
92 due: Option<DateTime<Utc>>,
93}
94
95#[derive(Debug)]
96pub(crate) struct MessageWithId {
97 id: SpoolId,
98 inner: Mutex<MessageInner>,
99 link: LinkedListAtomicLink,
100}
101
102intrusive_adapter!(
103 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
104);
105
106pub struct MessageList {
111 list: LinkedList<MessageWithIdAdapter>,
112 len: usize,
113}
114
115impl Default for MessageList {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl MessageList {
122 pub fn new() -> Self {
124 Self {
125 list: LinkedList::new(MessageWithIdAdapter::default()),
126 len: 0,
127 }
128 }
129
130 pub fn len(&self) -> usize {
132 self.len
133 }
134
135 pub fn is_empty(&self) -> bool {
136 self.len == 0
137 }
138
139 pub fn take(&mut self) -> Self {
142 let new_list = Self {
143 list: self.list.take(),
144 len: self.len,
145 };
146 self.len = 0;
147 new_list
148 }
149
150 pub fn push_back(&mut self, message: Message) {
152 self.list.push_back(message.msg_and_id);
153 self.len += 1;
154 }
155
156 pub fn pop_front(&mut self) -> Option<Message> {
158 self.list.pop_front().map(|msg_and_id| {
159 self.len -= 1;
160 Message { msg_and_id }
161 })
162 }
163
164 pub fn pop_back(&mut self) -> Option<Message> {
166 self.list.pop_back().map(|msg_and_id| {
167 self.len -= 1;
168 Message { msg_and_id }
169 })
170 }
171
172 pub fn drain(&mut self) -> Vec<Message> {
176 let mut messages = Vec::with_capacity(self.len);
177 while let Some(msg) = self.pop_front() {
178 messages.push(msg);
179 }
180 messages
181 }
182
183 pub fn extend_from_iter<I>(&mut self, iter: I)
184 where
185 I: Iterator<Item = Message>,
186 {
187 for msg in iter {
188 self.push_back(msg)
189 }
190 }
191}
192
193impl IntoIterator for MessageList {
194 type Item = Message;
195 type IntoIter = MessageListIter;
196 fn into_iter(self) -> MessageListIter {
197 MessageListIter { list: self.list }
198 }
199}
200
201pub struct MessageListIter {
202 list: LinkedList<MessageWithIdAdapter>,
203}
204
205impl Iterator for MessageListIter {
206 type Item = Message;
207 fn next(&mut self) -> Option<Message> {
208 self.list
209 .pop_front()
210 .map(|msg_and_id| Message { msg_and_id })
211 }
212}
213
214#[derive(Clone, Debug)]
215#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
216pub struct Message {
217 pub(crate) msg_and_id: Arc<MessageWithId>,
218}
219
220impl PartialEq for Message {
221 fn eq(&self, other: &Self) -> bool {
222 self.id() == other.id()
223 }
224}
225impl Eq for Message {}
226
227impl Hash for Message {
228 fn hash<H>(&self, hasher: &mut H)
229 where
230 H: std::hash::Hasher,
231 {
232 self.id().hash(hasher)
233 }
234}
235
236#[derive(Clone, Debug)]
237pub struct WeakMessage {
238 weak: Weak<MessageWithId>,
239}
240
241impl WeakMessage {
242 pub fn upgrade(&self) -> Option<Message> {
243 Some(Message {
244 msg_and_id: self.weak.upgrade()?,
245 })
246 }
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250struct MetaData {
251 sender: EnvelopeAddress,
252 recipient: EnvelopeAddress,
253 meta: serde_json::Value,
254 #[serde(default)]
255 schedule: Option<Scheduling>,
256}
257
258impl Drop for MessageInner {
259 fn drop(&mut self) {
260 if self.metadata.is_some() {
261 META_COUNT.dec();
262 }
263 if !self.data.is_empty() {
264 DATA_COUNT.dec();
265 }
266 MESSAGE_COUNT.dec();
267 }
268}
269
270impl Message {
271 pub fn new_dirty(
274 id: SpoolId,
275 sender: EnvelopeAddress,
276 recipient: EnvelopeAddress,
277 meta: serde_json::Value,
278 data: Arc<Box<[u8]>>,
279 ) -> anyhow::Result<Self> {
280 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
281 MESSAGE_COUNT.inc();
282 DATA_COUNT.inc();
283 META_COUNT.inc();
284 Ok(Self {
285 msg_and_id: Arc::new(MessageWithId {
286 id,
287 inner: Mutex::new(MessageInner {
288 metadata: Some(Box::new(MetaData {
289 sender,
290 recipient,
291 meta,
292 schedule: None,
293 })),
294 data,
295 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
296 num_attempts: 0,
297 due: None,
298 }),
299 link: LinkedListAtomicLink::default(),
300 }),
301 })
302 }
303
304 pub fn weak(&self) -> WeakMessage {
305 WeakMessage {
306 weak: Arc::downgrade(&self.msg_and_id),
307 }
308 }
309
310 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
314 let metadata: MetaData = serde_json::from_slice(&metadata)?;
315 MESSAGE_COUNT.inc();
316 META_COUNT.inc();
317
318 let flags = if metadata.schedule.is_some() {
319 MessageFlags::SCHEDULED
320 } else {
321 MessageFlags::empty()
322 };
323
324 Ok(Self {
325 msg_and_id: Arc::new(MessageWithId {
326 id,
327 inner: Mutex::new(MessageInner {
328 metadata: Some(Box::new(metadata)),
329 data: NO_DATA.clone(),
330 flags,
331 num_attempts: 0,
332 due: None,
333 }),
334 link: LinkedListAtomicLink::default(),
335 }),
336 })
337 }
338
339 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
340 let meta_spool = get_meta_spool();
341 let data = meta_spool.load(id).await?;
342 Self::new_from_spool(id, data)
343 }
344
345 pub fn get_num_attempts(&self) -> u16 {
346 let inner = self.msg_and_id.inner.lock();
347 inner.num_attempts
348 }
349
350 pub fn set_num_attempts(&self, num_attempts: u16) {
351 let mut inner = self.msg_and_id.inner.lock();
352 inner.num_attempts = num_attempts;
353 }
354
355 pub fn increment_num_attempts(&self) {
356 let mut inner = self.msg_and_id.inner.lock();
357 inner.num_attempts += 1;
358 }
359
360 pub fn set_scheduling(
361 &self,
362 scheduling: Option<Scheduling>,
363 ) -> anyhow::Result<Option<Scheduling>> {
364 let mut inner = self.msg_and_id.inner.lock();
365 match &mut inner.metadata {
366 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
367 Some(meta) => {
368 meta.schedule = scheduling;
369 inner
370 .flags
371 .set(MessageFlags::SCHEDULED, scheduling.is_some());
372 if let Some(sched) = scheduling {
373 let due = inner.due.unwrap_or_else(Utc::now);
374 inner.due = Some(sched.adjust_for_schedule(due));
375 }
376 Ok(scheduling)
377 }
378 }
379 }
380
381 pub fn get_scheduling(&self) -> Option<Scheduling> {
382 let inner = self.msg_and_id.inner.lock();
383 inner.metadata.as_ref().and_then(|meta| meta.schedule)
384 }
385
386 pub fn get_due(&self) -> Option<DateTime<Utc>> {
387 let inner = self.msg_and_id.inner.lock();
388 inner.due
389 }
390
391 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
392 let scale = rand::random::<f32>();
393 let value = (scale * limit as f32) as i64;
394 self.delay_by(seconds(value)?).await
395 }
396
397 pub async fn delay_by(
398 &self,
399 duration: chrono::Duration,
400 ) -> anyhow::Result<Option<DateTime<Utc>>> {
401 let due = Utc::now() + duration;
402 self.set_due(Some(due)).await
403 }
404
405 pub async fn delay_by_and_jitter(
407 &self,
408 duration: chrono::Duration,
409 ) -> anyhow::Result<Option<DateTime<Utc>>> {
410 let scale = rand::random::<f32>();
411 let value = (scale * 60.) as i64;
412 let due = Utc::now() + duration + seconds(value)?;
413 self.set_due(Some(due)).await
414 }
415
416 pub async fn set_due(
417 &self,
418 due: Option<DateTime<Utc>>,
419 ) -> anyhow::Result<Option<DateTime<Utc>>> {
420 let due = {
421 let mut inner = self.msg_and_id.inner.lock();
422
423 if !inner.flags.contains(MessageFlags::SCHEDULED) {
424 inner.due = due;
426 return Ok(inner.due);
427 }
428
429 let due = due.unwrap_or_else(Utc::now);
430
431 if let Some(meta) = &inner.metadata {
432 inner.due = match &meta.schedule {
433 Some(sched) => Some(sched.adjust_for_schedule(due)),
434 None => Some(due),
435 };
436 return Ok(inner.due);
437 }
438
439 due
442 };
443
444 self.load_meta().await?;
445
446 {
447 let mut inner = self.msg_and_id.inner.lock();
448 match &inner.metadata {
449 Some(meta) => {
450 inner.due = match &meta.schedule {
451 Some(sched) => Some(sched.adjust_for_schedule(due)),
452 None => Some(due),
453 };
454 Ok(inner.due)
455 }
456 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
457 }
458 }
459 }
460
461 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
462 let inner = self.msg_and_id.inner.lock();
463 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
464 Some(Arc::clone(&inner.data))
465 } else {
466 None
467 }
468 }
469
470 fn get_meta_if_dirty(&self) -> Option<MetaData> {
471 let inner = self.msg_and_id.inner.lock();
472 if inner.flags.contains(MessageFlags::META_DIRTY) {
473 inner.metadata.as_ref().map(|md| (**md).clone())
474 } else {
475 None
476 }
477 }
478
479 pub fn set_force_sync(&self, force: bool) {
480 let mut inner = self.msg_and_id.inner.lock();
481 inner.flags.set(MessageFlags::FORCE_SYNC, force);
482 }
483
484 pub fn needs_save(&self) -> bool {
485 let inner = self.msg_and_id.inner.lock();
486 inner.flags.contains(MessageFlags::META_DIRTY)
487 || inner.flags.contains(MessageFlags::DATA_DIRTY)
488 }
489
490 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
491 let _timer = SAVE_HIST.start_timer();
492 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
493 .await
494 }
495
496 pub async fn save_to(
497 &self,
498 meta_spool: &(dyn Spool + Send + Sync),
499 data_spool: &(dyn Spool + Send + Sync),
500 deadline: Option<Instant>,
501 ) -> anyhow::Result<()> {
502 let force_sync = self
503 .msg_and_id
504 .inner
505 .lock()
506 .flags
507 .contains(MessageFlags::FORCE_SYNC);
508
509 let data_fut = if let Some(data) = self.get_data_if_dirty() {
510 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
511 data_spool
512 .store(self.msg_and_id.id, data, force_sync, deadline)
513 .map(|_| true)
514 .boxed()
515 } else {
516 futures::future::ready(false).boxed()
517 };
518 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
519 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
520 meta_spool
521 .store(self.msg_and_id.id, meta, force_sync, deadline)
522 .map(|_| true)
523 .boxed()
524 } else {
525 futures::future::ready(false).boxed()
526 };
527
528 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
534
535 if data_res {
536 self.msg_and_id
537 .inner
538 .lock()
539 .flags
540 .remove(MessageFlags::DATA_DIRTY);
541 }
542 if meta_res {
543 self.msg_and_id
544 .inner
545 .lock()
546 .flags
547 .remove(MessageFlags::META_DIRTY);
548 }
549 Ok(())
550 }
551
552 pub fn id(&self) -> &SpoolId {
553 &self.msg_and_id.id
554 }
555
556 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
558 self.save(None).await?;
559 self.shrink()
560 }
561
562 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
564 self.save(None).await?;
565 self.shrink_data()
566 }
567
568 pub fn shrink_data(&self) -> anyhow::Result<bool> {
569 let mut inner = self.msg_and_id.inner.lock();
570 let mut did_shrink = false;
571 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
572 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
573 }
574 if !inner.data.is_empty() {
575 DATA_COUNT.dec();
576 did_shrink = true;
577 }
578 if !inner.data.is_empty() {
579 inner.data = NO_DATA.clone();
580 did_shrink = true;
581 }
582 Ok(did_shrink)
583 }
584
585 pub fn shrink(&self) -> anyhow::Result<bool> {
586 let mut inner = self.msg_and_id.inner.lock();
587 let mut did_shrink = false;
588 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
589 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
590 }
591 if inner.flags.contains(MessageFlags::META_DIRTY) {
592 anyhow::bail!("Cannot shrink message: META_DIRTY");
593 }
594 if inner.metadata.take().is_some() {
595 META_COUNT.dec();
596 did_shrink = true;
597 }
598 if !inner.data.is_empty() {
599 DATA_COUNT.dec();
600 did_shrink = true;
601 }
602 if !inner.data.is_empty() {
603 inner.data = NO_DATA.clone();
604 did_shrink = true;
605 }
606 Ok(did_shrink)
607 }
608
609 pub fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
610 let inner = self.msg_and_id.inner.lock();
611 match &inner.metadata {
612 Some(meta) => Ok(meta.sender.clone()),
613 None => anyhow::bail!("metadata is not loaded"),
614 }
615 }
616
617 pub fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
618 let mut inner = self.msg_and_id.inner.lock();
619 match &mut inner.metadata {
620 Some(meta) => {
621 meta.sender = sender;
622 inner.flags.set(MessageFlags::DATA_DIRTY, true);
623 Ok(())
624 }
625 None => anyhow::bail!("metadata is not loaded"),
626 }
627 }
628
629 pub fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
630 let inner = self.msg_and_id.inner.lock();
631 match &inner.metadata {
632 Some(meta) => Ok(meta.recipient.clone()),
633 None => anyhow::bail!("metadata is not loaded"),
634 }
635 }
636
637 pub fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
638 let mut inner = self.msg_and_id.inner.lock();
639 match &mut inner.metadata {
640 Some(meta) => {
641 meta.recipient = recipient;
642 inner.flags.set(MessageFlags::DATA_DIRTY, true);
643 Ok(())
644 }
645 None => anyhow::bail!("metadata is not loaded"),
646 }
647 }
648
649 pub fn is_meta_loaded(&self) -> bool {
650 self.msg_and_id.inner.lock().metadata.is_some()
651 }
652
653 pub fn is_data_loaded(&self) -> bool {
654 !self.msg_and_id.inner.lock().data.is_empty()
655 }
656
657 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
658 if self.is_meta_loaded() {
659 return Ok(());
660 }
661 self.load_meta().await
662 }
663
664 pub async fn load_data_if_needed(&self) -> anyhow::Result<()> {
665 if self.is_data_loaded() {
666 return Ok(());
667 }
668 self.load_data().await
669 }
670
671 pub async fn load_meta(&self) -> anyhow::Result<()> {
672 let _timer = LOAD_META_HIST.start_timer();
673 self.load_meta_from(&**get_meta_spool()).await
674 }
675
676 pub async fn load_meta_from(
677 &self,
678 meta_spool: &(dyn Spool + Send + Sync),
679 ) -> anyhow::Result<()> {
680 let id = self.id();
681 let data = meta_spool.load(*id).await?;
682 let mut inner = self.msg_and_id.inner.lock();
683 let was_not_loaded = inner.metadata.is_none();
684 let metadata: MetaData = serde_json::from_slice(&data)?;
685 inner.metadata.replace(Box::new(metadata));
686 if was_not_loaded {
687 META_COUNT.inc();
688 }
689 Ok(())
690 }
691
692 pub async fn load_data(&self) -> anyhow::Result<()> {
693 let _timer = LOAD_DATA_HIST.start_timer();
694 self.load_data_from(&**get_data_spool()).await
695 }
696
697 pub async fn load_data_from(
698 &self,
699 data_spool: &(dyn Spool + Send + Sync),
700 ) -> anyhow::Result<()> {
701 let data = data_spool.load(*self.id()).await?;
702 let mut inner = self.msg_and_id.inner.lock();
703 let was_empty = inner.data.is_empty();
704 inner.data = Arc::new(data.into_boxed_slice());
705 if was_empty {
706 DATA_COUNT.inc();
707 }
708 Ok(())
709 }
710
711 pub fn assign_data(&self, data: Vec<u8>) {
712 let mut inner = self.msg_and_id.inner.lock();
713 let was_empty = inner.data.is_empty();
714 inner.data = Arc::new(data.into_boxed_slice());
715 inner.flags.set(MessageFlags::DATA_DIRTY, true);
716 if was_empty {
717 DATA_COUNT.inc();
718 }
719 }
720
721 pub fn get_data(&self) -> Arc<Box<[u8]>> {
722 let inner = self.msg_and_id.inner.lock();
723 inner.data.clone()
724 }
725
726 pub fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
727 &self,
728 key: S,
729 value: V,
730 ) -> anyhow::Result<()> {
731 let mut inner = self.msg_and_id.inner.lock();
732 match &mut inner.metadata {
733 None => anyhow::bail!("set_meta: metadata must be loaded first"),
734 Some(meta) => {
735 let key = key.as_ref();
736 let value = value.into();
737
738 match &mut meta.meta {
739 serde_json::Value::Object(map) => {
740 map.insert(key.to_string(), value);
741 }
742 _ => anyhow::bail!("metadata is somehow not a json object"),
743 }
744
745 inner.flags.set(MessageFlags::META_DIRTY, true);
746 Ok(())
747 }
748 }
749 }
750
751 pub fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
753 &self,
754 key: S,
755 ) -> anyhow::Result<Option<String>> {
756 match self.get_meta(key) {
757 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
758 Ok(serde_json::Value::Null) => Ok(None),
759 hmm => {
760 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
761 }
762 }
763 }
764
765 pub fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
766 let inner = self.msg_and_id.inner.lock();
767 match &inner.metadata {
768 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
769 Some(meta) => Ok(meta.meta.clone()),
770 }
771 }
772
773 pub fn get_meta<S: serde_json::value::Index>(
774 &self,
775 key: S,
776 ) -> anyhow::Result<serde_json::Value> {
777 let inner = self.msg_and_id.inner.lock();
778 match &inner.metadata {
779 None => anyhow::bail!("get_meta: metadata must be loaded first"),
780 Some(meta) => match meta.meta.get(key) {
781 Some(value) => Ok(value.clone()),
782 None => Ok(serde_json::Value::Null),
783 },
784 }
785 }
786
787 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
788 self.msg_and_id.id.age(now)
789 }
790
791 pub fn get_queue_name(&self) -> anyhow::Result<String> {
792 Ok(match self.get_meta_string("queue")? {
793 Some(name) => name,
794 None => {
795 let name = QueueNameComponents::format(
796 self.get_meta_string("campaign")?,
797 self.get_meta_string("tenant")?,
798 self.recipient()?.domain().to_string().to_lowercase(),
799 self.get_meta_string("routing_domain")?,
800 );
801 name.to_string()
802 }
803 })
804 }
805
806 #[cfg(feature = "impl")]
807 pub async fn dkim_verify(&self) -> anyhow::Result<Vec<AuthenticationResult>> {
808 let resolver = dns_resolver::get_resolver();
809 let data = self.get_data();
810 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
811
812 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
813 if parsed
814 .overall_conformance
815 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
816 {
817 return Ok(vec![AuthenticationResult {
818 method: "dkim".to_string(),
819 method_version: None,
820 result: "permerror".to_string(),
821 reason: Some("message has non-canonical line endings".to_string()),
822 props: Default::default(),
823 }]);
824 }
825 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
826
827 let from = message
828 .get_headers()
829 .from()
830 .map_err(any_err)?
831 .ok_or_else(|| anyhow::anyhow!("Missing or invalid From header"))?
832 .0;
833 if from.len() != 1 {
834 anyhow::bail!(
835 "From header must have a single sender, found {}",
836 from.len()
837 );
838 }
839 let from_domain = &from[0].address.domain;
840
841 let results =
842 kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
843 Ok(results)
844 }
845
846 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
851 self.load_data_if_needed().await?;
852 let data = self.get_data();
853 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
854 Ok(MimePart::parse(owned_data)?)
855 }
856
857 pub fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
858 let data = self.get_data();
859 Report::parse(&data)
860 }
861
862 pub fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
863 let data = self.get_data();
864 ARFReport::parse(&data)
865 }
866
867 pub fn prepend_header(&self, name: Option<&str>, value: &str) {
868 let data = self.get_data();
869 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
870 emit_header(&mut new_data, name, value);
871 new_data.extend_from_slice(&data);
872 self.assign_data(new_data);
873 }
874
875 pub fn append_header(&self, name: Option<&str>, value: &str) {
876 let data = self.get_data();
877 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
878 for (idx, window) in data.windows(4).enumerate() {
879 if window == b"\r\n\r\n" {
880 let headers = &data[0..idx + 2];
881 let body = &data[idx + 2..];
882
883 new_data.extend_from_slice(headers);
884 emit_header(&mut new_data, name, value);
885 new_data.extend_from_slice(body);
886 self.assign_data(new_data);
887 return;
888 }
889 }
890 }
891
892 pub fn get_address_header(
893 &self,
894 header_name: &str,
895 ) -> anyhow::Result<Option<HeaderAddressList>> {
896 let data = self.get_data();
897 let HeaderParseResult { headers, .. } =
898 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
899
900 match headers.get_first(header_name) {
901 Some(hdr) => {
902 let list = hdr.as_address_list()?;
903 let result: HeaderAddressList = list.into();
904 Ok(Some(result))
905 }
906 None => Ok(None),
907 }
908 }
909
910 pub fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
911 let data = self.get_data();
912 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
913
914 match headers.get_first(name) {
915 Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
916 None => Ok(None),
917 }
918 }
919
920 pub fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
921 let data = self.get_data();
922 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
923
924 let mut values = vec![];
925 for hdr in headers.iter_named(name) {
926 values.push(hdr.as_unstructured()?);
927 }
928 Ok(values)
929 }
930
931 pub fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
932 let data = self.get_data();
933 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
934
935 let mut values = vec![];
936 for hdr in headers.iter() {
937 values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
938 }
939 Ok(values)
940 }
941
942 pub fn retain_headers<F: FnMut(&Header) -> bool>(&self, mut func: F) -> anyhow::Result<()> {
943 let data = self.get_data();
944 let mut new_data = Vec::with_capacity(data.len());
945 let HeaderParseResult {
946 headers,
947 body_offset,
948 ..
949 } = Header::parse_headers(data.as_ref().as_ref())?;
950 for hdr in headers.iter() {
951 let retain = (func)(hdr);
952 if !retain {
953 continue;
954 }
955 hdr.write_header(&mut new_data)?;
956 }
957 new_data.extend_from_slice(b"\r\n");
958 new_data.extend_from_slice(&data[body_offset..]);
959 self.assign_data(new_data);
960 Ok(())
961 }
962
963 pub fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
964 let mut removed = false;
965 self.retain_headers(|hdr| {
966 if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
967 removed = true;
968 false
969 } else {
970 true
971 }
972 })
973 }
974
975 pub fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
976 let data = self.get_data();
977 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
978
979 for hdr in headers.iter() {
980 let do_import = if names.is_empty() {
981 is_x_header(hdr.get_name())
982 } else {
983 is_header_in_names_list(hdr.get_name(), &names)
984 };
985 if do_import {
986 let name = imported_header_name(hdr.get_name());
987 self.set_meta(name, hdr.as_unstructured()?)?;
988 }
989 }
990
991 Ok(())
992 }
993
994 pub fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
995 self.retain_headers(|hdr| {
996 if names.is_empty() {
997 !is_x_header(hdr.get_name())
998 } else {
999 !is_header_in_names_list(hdr.get_name(), &names)
1000 }
1001 })
1002 }
1003
1004 pub fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1005 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
1006 }
1007
1008 #[cfg(feature = "impl")]
1009 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1010 if let Some(runtime) = SIGN_POOL.get() {
1011 let msg = self.clone();
1012 runtime
1013 .spawn_blocking(move || {
1014 let data = msg.get_data();
1015 let header = signer.sign(&data)?;
1016 msg.prepend_header(None, &header);
1017 Ok::<(), anyhow::Error>(())
1018 })
1019 .await??;
1020 } else {
1021 let data = self.get_data();
1022 let header = signer.sign(&data)?;
1023 self.prepend_header(None, &header);
1024 }
1025 Ok(())
1026 }
1027
1028 pub fn import_scheduling_header(
1029 &self,
1030 header_name: &str,
1031 remove: bool,
1032 ) -> anyhow::Result<Option<Scheduling>> {
1033 if let Some(value) = self.get_first_named_header_value(header_name)? {
1034 let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1035 format!("{value} from header {header_name} is not a valid Scheduling header")
1036 })?;
1037 let result = self.set_scheduling(Some(sched))?;
1038
1039 if remove {
1040 self.remove_all_named_headers(header_name)?;
1041 }
1042
1043 Ok(result)
1044 } else {
1045 Ok(None)
1046 }
1047 }
1048
1049 pub fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1050 let data = self.get_data();
1051 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1052 let parts = msg.simplified_structure_pointers()?;
1053 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1054 match p.body()? {
1055 DecodedBody::Text(text) => {
1056 let mut text = text.as_str().to_string();
1057 text.push_str("\r\n");
1058 text.push_str(content);
1059 p.replace_text_body("text/plain", &text)?;
1060
1061 let new_data = msg.to_message_string();
1062 self.assign_data(new_data.into_bytes());
1063 Ok(true)
1064 }
1065 DecodedBody::Binary(_) => {
1066 anyhow::bail!("expected text/plain part to be text, but it is binary");
1067 }
1068 }
1069 } else {
1070 Ok(false)
1071 }
1072 }
1073
1074 pub fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1075 let data = self.get_data();
1076 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1077 let parts = msg.simplified_structure_pointers()?;
1078 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1079 match p.body()? {
1080 DecodedBody::Text(text) => {
1081 let mut text = text.as_str().to_string();
1082
1083 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1084 Some(idx) => {
1085 text.insert_str(idx, content);
1086 text.insert_str(idx, "\r\n");
1087 }
1088 None => {
1089 text.push_str("\r\n");
1091 text.push_str(content);
1092 }
1093 }
1094
1095 p.replace_text_body("text/html", &text)?;
1096
1097 let new_data = msg.to_message_string();
1098 self.assign_data(new_data.into_bytes());
1099 Ok(true)
1100 }
1101 DecodedBody::Binary(_) => {
1102 anyhow::bail!("expected text/html part to be text, but it is binary");
1103 }
1104 }
1105 } else {
1106 Ok(false)
1107 }
1108 }
1109
1110 pub fn check_fix_conformance(
1111 &self,
1112 check: MessageConformance,
1113 fix: MessageConformance,
1114 ) -> anyhow::Result<()> {
1115 let data = self.get_data();
1116 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1117
1118 let conformance = msg.conformance();
1119
1120 let check = check - fix;
1122
1123 if check.intersects(conformance) {
1124 let problems = check.intersection(conformance).to_string();
1125 anyhow::bail!("Message has conformance issues: {problems}");
1126 }
1127
1128 if fix.intersects(conformance) {
1129 let to_fix = fix.intersection(conformance);
1130 let problems = to_fix.to_string();
1131
1132 let missing_headers_only = to_fix
1133 .difference(
1134 MessageConformance::MISSING_DATE_HEADER
1135 | MessageConformance::MISSING_MIME_VERSION
1136 | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1137 )
1138 .is_empty();
1139
1140 if !missing_headers_only {
1141 msg = msg.rebuild().with_context(|| {
1142 format!("Rebuilding message to correct conformance issues: {problems}")
1143 })?;
1144 }
1145
1146 if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1147 msg.headers_mut().set_date(Utc::now())?;
1148 }
1149
1150 if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1151 msg.headers_mut().set_mime_version("1.0")?;
1152 }
1153
1154 if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1155 let sender = self.sender()?;
1156 let domain = sender.domain();
1157 let id = *self.id();
1158 msg.headers_mut()
1159 .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")))?;
1160 }
1161
1162 let new_data = msg.to_message_string();
1163 self.assign_data(new_data.into_bytes());
1164 }
1165
1166 Ok(())
1167 }
1168}
1169
1170fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1171 for name in names {
1172 if hdr_name.eq_ignore_ascii_case(name) {
1173 return true;
1174 }
1175 }
1176 false
1177}
1178
1179fn imported_header_name(name: &str) -> String {
1180 name.chars()
1181 .map(|c| match c.to_ascii_lowercase() {
1182 '-' => '_',
1183 c => c,
1184 })
1185 .collect()
1186}
1187
1188fn is_x_header(name: &str) -> bool {
1189 name.starts_with("X-") || name.starts_with("x-")
1190}
1191
1192fn size_header(name: Option<&str>, value: &str) -> usize {
1193 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1194}
1195
1196fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1197 if let Some(name) = name {
1198 dest.extend_from_slice(name.as_bytes());
1199 dest.extend_from_slice(b": ");
1200 }
1201 dest.extend_from_slice(value.as_bytes());
1202 if !value.ends_with("\r\n") {
1203 dest.extend_from_slice(b"\r\n");
1204 }
1205}
1206
1207#[cfg(feature = "impl")]
1208impl UserData for Message {
1209 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1210 methods.add_async_method(
1211 "set_meta",
1212 move |_, this, (name, value): (String, mlua::Value)| async move {
1213 this.load_meta_if_needed().await.map_err(any_err)?;
1214 let value = serde_json::value::to_value(value).map_err(any_err)?;
1215 this.set_meta(name, value).map_err(any_err)?;
1216 Ok(())
1217 },
1218 );
1219 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1220 this.load_meta_if_needed().await.map_err(any_err)?;
1221 let value = this.get_meta(name).map_err(any_err)?;
1222 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1223 });
1224 methods.add_async_method("get_data", move |lua, this, _: ()| async move {
1225 this.load_data_if_needed().await.map_err(any_err)?;
1226 let data = this.get_data();
1227 lua.create_string(&*data)
1228 });
1229 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1230 this.assign_data(data.as_bytes().to_vec());
1231 Ok(())
1232 });
1233
1234 methods.add_async_method("parse_mime", move |_lua, this, _: ()| async move {
1235 this.load_data_if_needed().await.map_err(any_err)?;
1236 let data = this.get_data();
1237 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1238 let part = MimePart::parse(owned_data).map_err(any_err)?;
1239 Ok(mod_mimepart::PartRef::new(part))
1240 });
1241
1242 methods.add_method("append_text_plain", move |_lua, this, data: String| {
1243 this.append_text_plain(&data).map_err(any_err)
1244 });
1245
1246 methods.add_method("append_text_html", move |_lua, this, data: String| {
1247 this.append_text_html(&data).map_err(any_err)
1248 });
1249
1250 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1251 methods.add_method("sender", move |_, this, _: ()| {
1252 this.sender().map_err(any_err)
1253 });
1254
1255 methods.add_method("num_attempts", move |_, this, _: ()| {
1256 Ok(this.get_num_attempts())
1257 });
1258
1259 methods.add_method("queue_name", move |_, this, _: ()| {
1260 this.get_queue_name().map_err(any_err)
1261 });
1262
1263 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1264 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1265 let revised_due = this.set_due(due).await.map_err(any_err)?;
1266 lua.to_value(&revised_due)
1267 });
1268
1269 methods.add_method("set_sender", move |lua, this, value: mlua::Value| {
1270 let sender = match value {
1271 mlua::Value::String(s) => {
1272 let s = s.to_str()?;
1273 EnvelopeAddress::parse(&s).map_err(any_err)?
1274 }
1275 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1276 };
1277 this.set_sender(sender).map_err(any_err)
1278 });
1279
1280 methods.add_method("recipient", move |_, this, _: ()| {
1281 this.recipient().map_err(any_err)
1282 });
1283
1284 methods.add_method("set_recipient", move |lua, this, value: mlua::Value| {
1285 let recipient = match value {
1286 mlua::Value::String(s) => {
1287 let s = s.to_str()?;
1288 EnvelopeAddress::parse(&s).map_err(any_err)?
1289 }
1290 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1291 };
1292 this.set_recipient(recipient).map_err(any_err)
1293 });
1294
1295 #[cfg(feature = "impl")]
1296 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1297 this.dkim_sign(signer).await.map_err(any_err)
1298 });
1299
1300 methods.add_async_method("shrink", |_, this, _: ()| async move {
1301 if this.needs_save() {
1302 this.save(None).await.map_err(any_err)?;
1303 }
1304 this.shrink().map_err(any_err)
1305 });
1306
1307 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1308 if this.needs_save() {
1309 this.save(None).await.map_err(any_err)?;
1310 }
1311 this.shrink_data().map_err(any_err)
1312 });
1313
1314 methods.add_method(
1315 "add_authentication_results",
1316 move |lua, this, (serv_id, results): (String, mlua::Value)| {
1317 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1318 let results = AuthenticationResults {
1319 serv_id,
1320 version: None,
1321 results,
1322 };
1323
1324 this.prepend_header(Some("Authentication-Results"), &results.encode_value());
1325
1326 Ok(())
1327 },
1328 );
1329
1330 #[cfg(feature = "impl")]
1331 methods.add_async_method("dkim_verify", |lua, this, ()| async move {
1332 let results = this.dkim_verify().await.map_err(any_err)?;
1333 lua.to_value_with(&results, serialize_options())
1334 });
1335
1336 methods.add_method(
1337 "prepend_header",
1338 move |_, this, (name, value): (String, String)| {
1339 this.prepend_header(Some(&name), &value);
1340 Ok(())
1341 },
1342 );
1343 methods.add_method(
1344 "append_header",
1345 move |_, this, (name, value): (String, String)| {
1346 this.append_header(Some(&name), &value);
1347 Ok(())
1348 },
1349 );
1350 methods.add_method("get_address_header", move |_, this, name: String| {
1351 this.get_address_header(&name).map_err(any_err)
1352 });
1353 methods.add_method("from_header", move |_, this, ()| {
1354 this.get_address_header("From").map_err(any_err)
1355 });
1356 methods.add_method("to_header", move |_, this, ()| {
1357 this.get_address_header("To").map_err(any_err)
1358 });
1359
1360 methods.add_method(
1361 "get_first_named_header_value",
1362 move |_, this, name: String| this.get_first_named_header_value(&name).map_err(any_err),
1363 );
1364 methods.add_method(
1365 "get_all_named_header_values",
1366 move |_, this, name: String| this.get_all_named_header_values(&name).map_err(any_err),
1367 );
1368 methods.add_method("get_all_headers", move |_, this, _: ()| {
1369 Ok(this
1370 .get_all_headers()
1371 .map_err(any_err)?
1372 .into_iter()
1373 .map(|(name, value)| vec![name, value])
1374 .collect::<Vec<Vec<String>>>())
1375 });
1376 methods.add_method("get_all_headers", move |_, this, _: ()| {
1377 Ok(this
1378 .get_all_headers()
1379 .map_err(any_err)?
1380 .into_iter()
1381 .map(|(name, value)| vec![name, value])
1382 .collect::<Vec<Vec<String>>>())
1383 });
1384 methods.add_method(
1385 "import_x_headers",
1386 move |_, this, names: Option<Vec<String>>| {
1387 this.import_x_headers(names.unwrap_or_default())
1388 .map_err(any_err)
1389 },
1390 );
1391
1392 methods.add_method(
1393 "remove_x_headers",
1394 move |_, this, names: Option<Vec<String>>| {
1395 this.remove_x_headers(names.unwrap_or_default())
1396 .map_err(any_err)
1397 },
1398 );
1399 methods.add_method("remove_all_named_headers", move |_, this, name: String| {
1400 this.remove_all_named_headers(&name).map_err(any_err)
1401 });
1402
1403 methods.add_method(
1404 "import_scheduling_header",
1405 move |lua, this, (header_name, remove): (String, bool)| {
1406 let opt_schedule = this
1407 .import_scheduling_header(&header_name, remove)
1408 .map_err(any_err)?;
1409 lua.to_value(&opt_schedule)
1410 },
1411 );
1412
1413 methods.add_method("set_scheduling", move |lua, this, params: mlua::Value| {
1414 let sched: Option<Scheduling> = from_lua_value(lua, params)?;
1415 let opt_schedule = this.set_scheduling(sched).map_err(any_err)?;
1416 lua.to_value(&opt_schedule)
1417 });
1418
1419 methods.add_method("parse_rfc3464", move |lua, this, _: ()| {
1420 let report = this.parse_rfc3464().map_err(any_err)?;
1421 match report {
1422 Some(report) => lua.to_value_with(&report, serialize_options()),
1423 None => Ok(mlua::Value::Nil),
1424 }
1425 });
1426
1427 methods.add_method("parse_rfc5965", move |lua, this, _: ()| {
1428 let report = this.parse_rfc5965().map_err(any_err)?;
1429 match report {
1430 Some(report) => lua.to_value_with(&report, serialize_options()),
1431 None => Ok(mlua::Value::Nil),
1432 }
1433 });
1434
1435 methods.add_async_method("save", |_, this, ()| async move {
1436 this.save(None).await.map_err(any_err)
1437 });
1438
1439 methods.add_method("set_force_sync", move |_, this, force: bool| {
1440 this.set_force_sync(force);
1441 Ok(())
1442 });
1443
1444 methods.add_async_method(
1445 "check_fix_conformance",
1446 |_, this, (check, fix): (String, String)| async move {
1447 use std::str::FromStr;
1448 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1449 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1450
1451 match this.check_fix_conformance(check, fix) {
1452 Ok(_) => Ok(None),
1453 Err(err) => Ok(Some(format!("{err:#}"))),
1454 }
1455 },
1456 );
1457 }
1458}
1459
1460impl TimerEntryWithDelay for WeakMessage {
1461 fn delay(&self) -> Duration {
1462 match self.upgrade() {
1463 None => {
1464 Duration::from_millis(0)
1466 }
1467 Some(msg) => msg.delay(),
1468 }
1469 }
1470}
1471
1472impl TimerEntryWithDelay for Message {
1473 fn delay(&self) -> Duration {
1474 let inner = self.msg_and_id.inner.lock();
1475 match inner.due {
1476 Some(time) => {
1477 let now = Utc::now();
1478 let delta = time - now;
1479 delta.to_std().unwrap_or(Duration::from_millis(0))
1480 }
1481 None => Duration::from_millis(0),
1482 }
1483 }
1484}
1485
1486#[cfg(test)]
1487mod test {
1488 use super::*;
1489 use serde_json::json;
1490
1491 fn new_msg_body<S: AsRef<str>>(s: S) -> Message {
1492 Message::new_dirty(
1493 SpoolId::new(),
1494 EnvelopeAddress::parse("sender@example.com").unwrap(),
1495 EnvelopeAddress::parse("recip@example.com").unwrap(),
1496 serde_json::json!({}),
1497 Arc::new(s.as_ref().as_bytes().to_vec().into_boxed_slice()),
1498 )
1499 .unwrap()
1500 }
1501
1502 fn data_as_string(msg: &Message) -> String {
1503 String::from_utf8(msg.get_data().to_vec()).unwrap()
1504 }
1505
1506 const X_HDR_CONTENT: &str =
1507 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1508
1509 #[test]
1510 fn import_all_x_headers() {
1511 let msg = new_msg_body(X_HDR_CONTENT);
1512
1513 msg.import_x_headers(vec![]).unwrap();
1514 k9::assert_equal!(
1515 msg.get_meta_obj().unwrap(),
1516 json!({
1517 "x_hello": "there",
1518 "x_header": "value",
1519 })
1520 );
1521 }
1522
1523 #[test]
1524 fn meta_and_nil() {
1525 let msg = new_msg_body(X_HDR_CONTENT);
1526 msg.set_meta("test", serde_json::Value::Null).unwrap();
1528 k9::assert_equal!(msg.get_meta("test").unwrap(), serde_json::Value::Null);
1529
1530 let lua = mlua::Lua::new();
1532 lua.globals().set("msg", msg).unwrap();
1533 lua.load("assert(msg:get_meta('test') == nil)")
1534 .exec()
1535 .unwrap();
1536 }
1537
1538 #[test]
1539 fn import_some_x_headers() {
1540 let msg = new_msg_body(X_HDR_CONTENT);
1541
1542 msg.import_x_headers(vec!["x-hello".to_string()]).unwrap();
1543 k9::assert_equal!(
1544 msg.get_meta_obj().unwrap(),
1545 json!({
1546 "x_hello": "there",
1547 })
1548 );
1549 }
1550
1551 #[test]
1552 fn remove_all_x_headers() {
1553 let msg = new_msg_body(X_HDR_CONTENT);
1554
1555 msg.remove_x_headers(vec![]).unwrap();
1556 k9::assert_equal!(
1557 data_as_string(&msg),
1558 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1559 );
1560 }
1561
1562 #[test]
1563 fn prepend_header_2_params() {
1564 let msg = new_msg_body(X_HDR_CONTENT);
1565
1566 msg.prepend_header(Some("Date"), "Today");
1567 k9::assert_equal!(
1568 data_as_string(&msg),
1569 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1570 );
1571 }
1572
1573 #[test]
1574 fn prepend_header_1_params() {
1575 let msg = new_msg_body(X_HDR_CONTENT);
1576
1577 msg.prepend_header(None, "Date: Today");
1578 k9::assert_equal!(
1579 data_as_string(&msg),
1580 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1581 );
1582 }
1583
1584 #[test]
1585 fn append_header_2_params() {
1586 let msg = new_msg_body(X_HDR_CONTENT);
1587
1588 msg.append_header(Some("Date"), "Today");
1589 k9::assert_equal!(
1590 data_as_string(&msg),
1591 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1592 );
1593 }
1594
1595 #[test]
1596 fn append_header_1_params() {
1597 let msg = new_msg_body(X_HDR_CONTENT);
1598
1599 msg.append_header(None, "Date: Today");
1600 k9::assert_equal!(
1601 data_as_string(&msg),
1602 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1603 );
1604 }
1605
1606 const MULTI_HEADER_CONTENT: &str =
1607 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1608
1609 #[test]
1610 fn get_first_header() {
1611 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1612 k9::assert_equal!(
1613 msg.get_first_named_header_value("X-header")
1614 .unwrap()
1615 .unwrap(),
1616 "value"
1617 );
1618 }
1619
1620 #[test]
1621 fn get_all_header() {
1622 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1623 k9::assert_equal!(
1624 msg.get_all_named_header_values("X-header").unwrap(),
1625 vec!["value".to_string(), "another value".to_string()]
1626 );
1627 }
1628
1629 #[test]
1630 fn remove_first() {
1631 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1632 msg.remove_first_named_header("X-header").unwrap();
1633 k9::assert_equal!(
1634 data_as_string(&msg),
1635 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1636 );
1637 }
1638
1639 #[test]
1640 fn remove_all() {
1641 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1642 msg.remove_all_named_headers("X-header").unwrap();
1643 k9::assert_equal!(
1644 data_as_string(&msg),
1645 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1646 );
1647 }
1648
1649 #[test]
1650 fn append_text_plain() {
1651 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1652 msg.append_text_plain("I am at the bottom").unwrap();
1653 k9::assert_equal!(
1654 data_as_string(&msg),
1655 "X-Hello: there\r\n\
1656 X-Header: value\r\n\
1657 Subject: Hello\r\n\
1658 X-Header: another value\r\n\
1659 From :Someone@somewhere\r\n\
1660 Content-Type: text/plain;\r\n\
1661 \tcharset=\"us-ascii\"\r\n\
1662 \r\n\
1663 Body\r\n\
1664 I am at the bottom\r\n"
1665 );
1666 }
1667
1668 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1669\tboundary=\"my-boundary\"\r\n\
1670\r\n\
1671--my-boundary\r\n\
1672Content-Type: text/plain;\r\n\
1673\tcharset=\"us-ascii\"\r\n\
1674\r\n\
1675plain text\r\n\
1676--my-boundary\r\n\
1677Content-Type: text/html;\r\n\
1678\tcharset=\"us-ascii\"\r\n\
1679\r\n\
1680<b>rich</b> text\r\n\
1681--my-boundary\r\n\
1682Content-Type: application/octet-stream\r\n\
1683Content-Transfer-Encoding: base64\r\n\
1684Content-Disposition: attachment;\r\n\
1685\tfilename=\"woot.bin\"\r\n\
1686Content-ID: <woot.id@somewhere>\r\n\
1687\r\n\
1688AAECAw==\r\n\
1689--my-boundary--\r\n\
1690\r\n";
1691
1692 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1693\tboundary=\"my-boundary\"\r\n\
1694\r\n\
1695--my-boundary\r\n\
1696Content-Type: text/plain;\r\n\
1697\tcharset=\"us-ascii\"\r\n\
1698\r\n\
1699plain text\r\n\
1700--my-boundary\r\n\
1701Content-Type: text/html;\r\n\
1702\tcharset=\"us-ascii\"\r\n\
1703\r\n\
1704<BODY>\r\n\
1705<b>rich</b> text\r\n\
1706</BODY>\r\n\
1707--my-boundary\r\n\
1708Content-Type: application/octet-stream\r\n\
1709Content-Transfer-Encoding: base64\r\n\
1710Content-Disposition: attachment;\r\n\
1711\tfilename=\"woot.bin\"\r\n\
1712Content-ID: <woot.id>\r\n\
1713\r\n\
1714AAECAw==\r\n\
1715--my-boundary--\r\n\
1716\r\n";
1717
1718 #[test]
1719 fn append_text_html() {
1720 let msg = new_msg_body(MIXED_CONTENT);
1721 msg.append_text_html("bottom html").unwrap();
1722 k9::snapshot!(
1723 data_as_string(&msg),
1724 r#"
1725Content-Type: multipart/mixed;\r
1726\tboundary="my-boundary"\r
1727\r
1728--my-boundary\r
1729Content-Type: text/plain;\r
1730\tcharset="us-ascii"\r
1731\r
1732plain text\r
1733--my-boundary\r
1734Content-Type: text/html;\r
1735\tcharset="us-ascii"\r
1736\r
1737<b>rich</b> text\r
1738\r
1739bottom html\r
1740--my-boundary\r
1741Content-Type: application/octet-stream\r
1742Content-Transfer-Encoding: base64\r
1743Content-Disposition: attachment;\r
1744\tfilename="woot.bin"\r
1745Content-ID: <woot.id@somewhere>\r
1746\r
1747AAECAw==\r
1748--my-boundary--\r
1749\r
1750
1751"#
1752 );
1753
1754 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
1755 msg.append_text_html("bottom html 👻").unwrap();
1756 k9::snapshot!(
1757 data_as_string(&msg),
1758 r#"
1759Content-Type: multipart/mixed;\r
1760\tboundary="my-boundary"\r
1761\r
1762--my-boundary\r
1763Content-Type: text/plain;\r
1764\tcharset="us-ascii"\r
1765\r
1766plain text\r
1767--my-boundary\r
1768Content-Type: text/html;\r
1769\tcharset="utf-8"\r
1770Content-Transfer-Encoding: quoted-printable\r
1771\r
1772<BODY>\r
1773<b>rich</b> text\r
1774\r
1775bottom html =F0=9F=91=BB</BODY>\r
1776--my-boundary\r
1777Content-Type: application/octet-stream\r
1778Content-Transfer-Encoding: base64\r
1779Content-Disposition: attachment;\r
1780\tfilename="woot.bin"\r
1781Content-ID: <woot.id>\r
1782\r
1783AAECAw==\r
1784--my-boundary--\r
1785\r
1786
1787"#
1788 );
1789 }
1790
1791 #[test]
1792 fn append_text_plain_mixed() {
1793 let msg = new_msg_body(MIXED_CONTENT);
1794 msg.append_text_plain("bottom text 👾").unwrap();
1795 k9::snapshot!(
1796 data_as_string(&msg),
1797 r#"
1798Content-Type: multipart/mixed;\r
1799\tboundary="my-boundary"\r
1800\r
1801--my-boundary\r
1802Content-Type: text/plain;\r
1803\tcharset="utf-8"\r
1804Content-Transfer-Encoding: quoted-printable\r
1805\r
1806plain text\r
1807\r
1808bottom text =F0=9F=91=BE\r
1809--my-boundary\r
1810Content-Type: text/html;\r
1811\tcharset="us-ascii"\r
1812\r
1813<b>rich</b> text\r
1814--my-boundary\r
1815Content-Type: application/octet-stream\r
1816Content-Transfer-Encoding: base64\r
1817Content-Disposition: attachment;\r
1818\tfilename="woot.bin"\r
1819Content-ID: <woot.id@somewhere>\r
1820\r
1821AAECAw==\r
1822--my-boundary--\r
1823\r
1824
1825"#
1826 );
1827 }
1828
1829 #[test]
1830 fn check_conformance_angle_msg_id() {
1831 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
1832Message-ID: <<1234@example.com>>\r
1833\r
1834Hello";
1835 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
1836 k9::snapshot!(
1837 msg.check_fix_conformance(
1838 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1839 MessageConformance::empty(),
1840 )
1841 .unwrap_err(),
1842 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
1843 );
1844
1845 msg.check_fix_conformance(
1846 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1847 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1848 )
1849 .unwrap();
1850
1851 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
1866Message-ID: <<1234@example.com>>\r
1867\r
1868Hello this is a really long line Hello this is a really long line \
1869Hello this is a really long line Hello this is a really long line \
1870Hello this is a really long line Hello this is a really long line \
1871Hello this is a really long line Hello this is a really long line \
1872Hello this is a really long line Hello this is a really long line \
1873Hello this is a really long line Hello this is a really long line \
1874Hello this is a really long line Hello this is a really long line
1875";
1876 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
1877 msg.check_fix_conformance(
1878 MessageConformance::MISSING_COLON_VALUE,
1879 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
1880 )
1881 .unwrap();
1882
1883 }
1907
1908 #[test]
1909 fn check_conformance() {
1910 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1911 msg.check_fix_conformance(
1912 MessageConformance::default(),
1913 MessageConformance::MISSING_MIME_VERSION,
1914 )
1915 .unwrap();
1916 k9::snapshot!(
1917 data_as_string(&msg),
1918 r#"
1919X-Hello: there\r
1920X-Header: value\r
1921Subject: Hello\r
1922X-Header: another value\r
1923From :Someone@somewhere\r
1924Mime-Version: 1.0\r
1925\r
1926Body
1927"#
1928 );
1929
1930 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1931 msg.check_fix_conformance(
1932 MessageConformance::default(),
1933 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
1934 )
1935 .unwrap();
1936 k9::snapshot!(
1937 data_as_string(&msg),
1938 r#"
1939Content-Type: text/plain;\r
1940\tcharset="us-ascii"\r
1941X-Hello: there\r
1942X-Header: value\r
1943Subject: Hello\r
1944X-Header: another value\r
1945From: <Someone@somewhere>\r
1946Mime-Version: 1.0\r
1947\r
1948Body\r
1949
1950"#
1951 );
1952 }
1953
1954 #[test]
1955 fn set_scheduling() -> anyhow::Result<()> {
1956 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1957 assert!(msg.get_due().is_none(), "due is implicitly now");
1958
1959 let now = Utc::now();
1960 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
1961
1962 msg.set_scheduling(Some(Scheduling {
1963 restriction: None,
1964 first_attempt: Some((now + one_day).into()),
1965 expires: None,
1966 }))?;
1967
1968 let due = msg.get_due().expect("due to now be set");
1969 assert!(due - now >= one_day, "due time is at least 1 day away");
1970
1971 Ok(())
1972 }
1973
1974 #[cfg(all(test, target_pointer_width = "64"))]
1975 #[test]
1976 fn sizes() {
1977 assert_eq!(std::mem::size_of::<Message>(), 8);
1978 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
1979 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
1980 }
1981}