1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use crate::EnvelopeAddress;
9use anyhow::Context;
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16use kumo_log_types::rfc3464::Report;
17use kumo_log_types::rfc5965::ARFReport;
18#[cfg(feature = "impl")]
19use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
20use mailparsing::{DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart};
21#[cfg(feature = "impl")]
22use mlua::{LuaSerdeExt, UserData, UserDataMethods};
23use parking_lot::Mutex;
24use prometheus::{Histogram, IntGauge};
25use serde::{Deserialize, Serialize};
26use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
27use std::hash::Hash;
28use std::sync::{Arc, LazyLock, Weak};
29use std::time::{Duration, Instant};
30use timeq::TimerEntryWithDelay;
31
32bitflags::bitflags! {
33 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
34 struct MessageFlags: u8 {
35 const META_DIRTY = 1;
37 const DATA_DIRTY = 2;
39 const SCHEDULED = 4;
41 const FORCE_SYNC = 8;
43 }
44}
45
46static MESSAGE_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
47 prometheus::register_int_gauge!("message_count", "total number of Message objects").unwrap()
48});
49static META_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
50 prometheus::register_int_gauge!(
51 "message_meta_resident_count",
52 "total number of Message objects with metadata loaded"
53 )
54 .unwrap()
55});
56static DATA_COUNT: LazyLock<IntGauge> = LazyLock::new(|| {
57 prometheus::register_int_gauge!(
58 "message_data_resident_count",
59 "total number of Message objects with body data loaded"
60 )
61 .unwrap()
62});
63static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
64static SAVE_HIST: LazyLock<Histogram> = LazyLock::new(|| {
65 prometheus::register_histogram!(
66 "message_save_latency",
67 "how long it takes to save a message to spool"
68 )
69 .unwrap()
70});
71static LOAD_DATA_HIST: LazyLock<Histogram> = LazyLock::new(|| {
72 prometheus::register_histogram!(
73 "message_data_load_latency",
74 "how long it takes to load message data from spool"
75 )
76 .unwrap()
77});
78static LOAD_META_HIST: LazyLock<Histogram> = LazyLock::new(|| {
79 prometheus::register_histogram!(
80 "message_meta_load_latency",
81 "how long it takes to load message metadata from spool"
82 )
83 .unwrap()
84});
85
86#[derive(Debug)]
87struct MessageInner {
88 metadata: Option<Box<MetaData>>,
89 data: Arc<Box<[u8]>>,
90 flags: MessageFlags,
91 num_attempts: u16,
92 due: Option<DateTime<Utc>>,
93}
94
95#[derive(Debug)]
96pub(crate) struct MessageWithId {
97 id: SpoolId,
98 inner: Mutex<MessageInner>,
99 link: LinkedListAtomicLink,
100}
101
102intrusive_adapter!(
103 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
104);
105
106pub struct MessageList {
111 list: LinkedList<MessageWithIdAdapter>,
112 len: usize,
113}
114
115impl Default for MessageList {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl MessageList {
122 pub fn new() -> Self {
124 Self {
125 list: LinkedList::new(MessageWithIdAdapter::default()),
126 len: 0,
127 }
128 }
129
130 pub fn len(&self) -> usize {
132 self.len
133 }
134
135 pub fn is_empty(&self) -> bool {
136 self.len == 0
137 }
138
139 pub fn take(&mut self) -> Self {
142 let new_list = Self {
143 list: self.list.take(),
144 len: self.len,
145 };
146 self.len = 0;
147 new_list
148 }
149
150 pub fn push_back(&mut self, message: Message) {
152 self.list.push_back(message.msg_and_id);
153 self.len += 1;
154 }
155
156 pub fn pop_front(&mut self) -> Option<Message> {
158 self.list.pop_front().map(|msg_and_id| {
159 self.len -= 1;
160 Message { msg_and_id }
161 })
162 }
163
164 pub fn pop_back(&mut self) -> Option<Message> {
166 self.list.pop_back().map(|msg_and_id| {
167 self.len -= 1;
168 Message { msg_and_id }
169 })
170 }
171
172 pub fn drain(&mut self) -> Vec<Message> {
176 let mut messages = Vec::with_capacity(self.len);
177 while let Some(msg) = self.pop_front() {
178 messages.push(msg);
179 }
180 messages
181 }
182
183 pub fn extend_from_iter<I>(&mut self, iter: I)
184 where
185 I: Iterator<Item = Message>,
186 {
187 for msg in iter {
188 self.push_back(msg)
189 }
190 }
191}
192
193impl IntoIterator for MessageList {
194 type Item = Message;
195 type IntoIter = MessageListIter;
196 fn into_iter(self) -> MessageListIter {
197 MessageListIter { list: self.list }
198 }
199}
200
201pub struct MessageListIter {
202 list: LinkedList<MessageWithIdAdapter>,
203}
204
205impl Iterator for MessageListIter {
206 type Item = Message;
207 fn next(&mut self) -> Option<Message> {
208 self.list
209 .pop_front()
210 .map(|msg_and_id| Message { msg_and_id })
211 }
212}
213
214#[derive(Clone, Debug)]
215#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
216pub struct Message {
217 pub(crate) msg_and_id: Arc<MessageWithId>,
218}
219
220impl PartialEq for Message {
221 fn eq(&self, other: &Self) -> bool {
222 self.id() == other.id()
223 }
224}
225impl Eq for Message {}
226
227impl Hash for Message {
228 fn hash<H>(&self, hasher: &mut H)
229 where
230 H: std::hash::Hasher,
231 {
232 self.id().hash(hasher)
233 }
234}
235
236#[derive(Clone, Debug)]
237pub struct WeakMessage {
238 weak: Weak<MessageWithId>,
239}
240
241impl WeakMessage {
242 pub fn upgrade(&self) -> Option<Message> {
243 Some(Message {
244 msg_and_id: self.weak.upgrade()?,
245 })
246 }
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250struct MetaData {
251 sender: EnvelopeAddress,
252 recipient: EnvelopeAddress,
253 meta: serde_json::Value,
254 #[serde(default)]
255 schedule: Option<Scheduling>,
256}
257
258impl Drop for MessageInner {
259 fn drop(&mut self) {
260 if self.metadata.is_some() {
261 META_COUNT.dec();
262 }
263 if !self.data.is_empty() {
264 DATA_COUNT.dec();
265 }
266 MESSAGE_COUNT.dec();
267 }
268}
269
270impl Message {
271 pub fn new_dirty(
274 id: SpoolId,
275 sender: EnvelopeAddress,
276 recipient: EnvelopeAddress,
277 meta: serde_json::Value,
278 data: Arc<Box<[u8]>>,
279 ) -> anyhow::Result<Self> {
280 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
281 MESSAGE_COUNT.inc();
282 DATA_COUNT.inc();
283 META_COUNT.inc();
284 Ok(Self {
285 msg_and_id: Arc::new(MessageWithId {
286 id,
287 inner: Mutex::new(MessageInner {
288 metadata: Some(Box::new(MetaData {
289 sender,
290 recipient,
291 meta,
292 schedule: None,
293 })),
294 data,
295 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
296 num_attempts: 0,
297 due: None,
298 }),
299 link: LinkedListAtomicLink::default(),
300 }),
301 })
302 }
303
304 pub fn weak(&self) -> WeakMessage {
305 WeakMessage {
306 weak: Arc::downgrade(&self.msg_and_id),
307 }
308 }
309
310 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
314 let metadata: MetaData = serde_json::from_slice(&metadata)?;
315 MESSAGE_COUNT.inc();
316 META_COUNT.inc();
317
318 let flags = if metadata.schedule.is_some() {
319 MessageFlags::SCHEDULED
320 } else {
321 MessageFlags::empty()
322 };
323
324 Ok(Self {
325 msg_and_id: Arc::new(MessageWithId {
326 id,
327 inner: Mutex::new(MessageInner {
328 metadata: Some(Box::new(metadata)),
329 data: NO_DATA.clone(),
330 flags,
331 num_attempts: 0,
332 due: None,
333 }),
334 link: LinkedListAtomicLink::default(),
335 }),
336 })
337 }
338
339 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
340 let meta_spool = get_meta_spool();
341 let data = meta_spool.load(id).await?;
342 Self::new_from_spool(id, data)
343 }
344
345 pub fn get_num_attempts(&self) -> u16 {
346 let inner = self.msg_and_id.inner.lock();
347 inner.num_attempts
348 }
349
350 pub fn set_num_attempts(&self, num_attempts: u16) {
351 let mut inner = self.msg_and_id.inner.lock();
352 inner.num_attempts = num_attempts;
353 }
354
355 pub fn increment_num_attempts(&self) {
356 let mut inner = self.msg_and_id.inner.lock();
357 inner.num_attempts += 1;
358 }
359
360 pub fn set_scheduling(
361 &self,
362 scheduling: Option<Scheduling>,
363 ) -> anyhow::Result<Option<Scheduling>> {
364 let mut inner = self.msg_and_id.inner.lock();
365 match &mut inner.metadata {
366 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
367 Some(meta) => {
368 meta.schedule = scheduling;
369 inner
370 .flags
371 .set(MessageFlags::SCHEDULED, scheduling.is_some());
372 if let Some(sched) = scheduling {
373 let due = inner.due.unwrap_or_else(Utc::now);
374 inner.due = Some(sched.adjust_for_schedule(due));
375 }
376 Ok(scheduling)
377 }
378 }
379 }
380
381 pub fn get_scheduling(&self) -> Option<Scheduling> {
382 let inner = self.msg_and_id.inner.lock();
383 inner.metadata.as_ref().and_then(|meta| meta.schedule)
384 }
385
386 pub fn get_due(&self) -> Option<DateTime<Utc>> {
387 let inner = self.msg_and_id.inner.lock();
388 inner.due
389 }
390
391 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
392 let scale = rand::random::<f32>();
393 let value = (scale * limit as f32) as i64;
394 self.delay_by(seconds(value)?).await
395 }
396
397 pub async fn delay_by(
398 &self,
399 duration: chrono::Duration,
400 ) -> anyhow::Result<Option<DateTime<Utc>>> {
401 let due = Utc::now() + duration;
402 self.set_due(Some(due)).await
403 }
404
405 pub async fn delay_by_and_jitter(
407 &self,
408 duration: chrono::Duration,
409 ) -> anyhow::Result<Option<DateTime<Utc>>> {
410 let scale = rand::random::<f32>();
411 let value = (scale * 60.) as i64;
412 let due = Utc::now() + duration + seconds(value)?;
413 self.set_due(Some(due)).await
414 }
415
416 pub async fn set_due(
417 &self,
418 due: Option<DateTime<Utc>>,
419 ) -> anyhow::Result<Option<DateTime<Utc>>> {
420 let due = {
421 let mut inner = self.msg_and_id.inner.lock();
422
423 if !inner.flags.contains(MessageFlags::SCHEDULED) {
424 inner.due = due;
426 return Ok(inner.due);
427 }
428
429 let due = due.unwrap_or_else(Utc::now);
430
431 if let Some(meta) = &inner.metadata {
432 inner.due = match &meta.schedule {
433 Some(sched) => Some(sched.adjust_for_schedule(due)),
434 None => Some(due),
435 };
436 return Ok(inner.due);
437 }
438
439 due
442 };
443
444 self.load_meta().await?;
445
446 {
447 let mut inner = self.msg_and_id.inner.lock();
448 match &inner.metadata {
449 Some(meta) => {
450 inner.due = match &meta.schedule {
451 Some(sched) => Some(sched.adjust_for_schedule(due)),
452 None => Some(due),
453 };
454 Ok(inner.due)
455 }
456 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
457 }
458 }
459 }
460
461 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
462 let inner = self.msg_and_id.inner.lock();
463 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
464 Some(Arc::clone(&inner.data))
465 } else {
466 None
467 }
468 }
469
470 fn get_meta_if_dirty(&self) -> Option<MetaData> {
471 let inner = self.msg_and_id.inner.lock();
472 if inner.flags.contains(MessageFlags::META_DIRTY) {
473 inner.metadata.as_ref().map(|md| (**md).clone())
474 } else {
475 None
476 }
477 }
478
479 pub fn set_force_sync(&self, force: bool) {
480 let mut inner = self.msg_and_id.inner.lock();
481 inner.flags.set(MessageFlags::FORCE_SYNC, force);
482 }
483
484 pub fn needs_save(&self) -> bool {
485 let inner = self.msg_and_id.inner.lock();
486 inner.flags.contains(MessageFlags::META_DIRTY)
487 || inner.flags.contains(MessageFlags::DATA_DIRTY)
488 }
489
490 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
491 let _timer = SAVE_HIST.start_timer();
492 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
493 .await
494 }
495
496 pub async fn save_to(
497 &self,
498 meta_spool: &(dyn Spool + Send + Sync),
499 data_spool: &(dyn Spool + Send + Sync),
500 deadline: Option<Instant>,
501 ) -> anyhow::Result<()> {
502 let force_sync = self
503 .msg_and_id
504 .inner
505 .lock()
506 .flags
507 .contains(MessageFlags::FORCE_SYNC);
508
509 let data_fut = if let Some(data) = self.get_data_if_dirty() {
510 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
511 data_spool
512 .store(self.msg_and_id.id, data, force_sync, deadline)
513 .map(|_| true)
514 .boxed()
515 } else {
516 futures::future::ready(false).boxed()
517 };
518 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
519 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
520 meta_spool
521 .store(self.msg_and_id.id, meta, force_sync, deadline)
522 .map(|_| true)
523 .boxed()
524 } else {
525 futures::future::ready(false).boxed()
526 };
527
528 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
534
535 if data_res {
536 self.msg_and_id
537 .inner
538 .lock()
539 .flags
540 .remove(MessageFlags::DATA_DIRTY);
541 }
542 if meta_res {
543 self.msg_and_id
544 .inner
545 .lock()
546 .flags
547 .remove(MessageFlags::META_DIRTY);
548 }
549 Ok(())
550 }
551
552 pub fn id(&self) -> &SpoolId {
553 &self.msg_and_id.id
554 }
555
556 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
558 self.save(None).await?;
559 self.shrink()
560 }
561
562 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
564 self.save(None).await?;
565 self.shrink_data()
566 }
567
568 pub fn shrink_data(&self) -> anyhow::Result<bool> {
569 let mut inner = self.msg_and_id.inner.lock();
570 let mut did_shrink = false;
571 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
572 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
573 }
574 if !inner.data.is_empty() {
575 DATA_COUNT.dec();
576 did_shrink = true;
577 }
578 if !inner.data.is_empty() {
579 inner.data = NO_DATA.clone();
580 did_shrink = true;
581 }
582 Ok(did_shrink)
583 }
584
585 pub fn shrink(&self) -> anyhow::Result<bool> {
586 let mut inner = self.msg_and_id.inner.lock();
587 let mut did_shrink = false;
588 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
589 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
590 }
591 if inner.flags.contains(MessageFlags::META_DIRTY) {
592 anyhow::bail!("Cannot shrink message: META_DIRTY");
593 }
594 if inner.metadata.take().is_some() {
595 META_COUNT.dec();
596 did_shrink = true;
597 }
598 if !inner.data.is_empty() {
599 DATA_COUNT.dec();
600 did_shrink = true;
601 }
602 if !inner.data.is_empty() {
603 inner.data = NO_DATA.clone();
604 did_shrink = true;
605 }
606 Ok(did_shrink)
607 }
608
609 pub fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
610 let inner = self.msg_and_id.inner.lock();
611 match &inner.metadata {
612 Some(meta) => Ok(meta.sender.clone()),
613 None => anyhow::bail!("metadata is not loaded"),
614 }
615 }
616
617 pub fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
618 let mut inner = self.msg_and_id.inner.lock();
619 match &mut inner.metadata {
620 Some(meta) => {
621 meta.sender = sender;
622 inner.flags.set(MessageFlags::DATA_DIRTY, true);
623 Ok(())
624 }
625 None => anyhow::bail!("metadata is not loaded"),
626 }
627 }
628
629 pub fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
630 let inner = self.msg_and_id.inner.lock();
631 match &inner.metadata {
632 Some(meta) => Ok(meta.recipient.clone()),
633 None => anyhow::bail!("metadata is not loaded"),
634 }
635 }
636
637 pub fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
638 let mut inner = self.msg_and_id.inner.lock();
639 match &mut inner.metadata {
640 Some(meta) => {
641 meta.recipient = recipient;
642 inner.flags.set(MessageFlags::DATA_DIRTY, true);
643 Ok(())
644 }
645 None => anyhow::bail!("metadata is not loaded"),
646 }
647 }
648
649 pub fn is_meta_loaded(&self) -> bool {
650 self.msg_and_id.inner.lock().metadata.is_some()
651 }
652
653 pub fn is_data_loaded(&self) -> bool {
654 !self.msg_and_id.inner.lock().data.is_empty()
655 }
656
657 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
658 if self.is_meta_loaded() {
659 return Ok(());
660 }
661 self.load_meta().await
662 }
663
664 pub async fn load_data_if_needed(&self) -> anyhow::Result<()> {
665 if self.is_data_loaded() {
666 return Ok(());
667 }
668 self.load_data().await
669 }
670
671 pub async fn load_meta(&self) -> anyhow::Result<()> {
672 let _timer = LOAD_META_HIST.start_timer();
673 self.load_meta_from(&**get_meta_spool()).await
674 }
675
676 pub async fn load_meta_from(
677 &self,
678 meta_spool: &(dyn Spool + Send + Sync),
679 ) -> anyhow::Result<()> {
680 let id = self.id();
681 let data = meta_spool.load(*id).await?;
682 let mut inner = self.msg_and_id.inner.lock();
683 let was_not_loaded = inner.metadata.is_none();
684 let metadata: MetaData = serde_json::from_slice(&data)?;
685 inner.metadata.replace(Box::new(metadata));
686 if was_not_loaded {
687 META_COUNT.inc();
688 }
689 Ok(())
690 }
691
692 pub async fn load_data(&self) -> anyhow::Result<()> {
693 let _timer = LOAD_DATA_HIST.start_timer();
694 self.load_data_from(&**get_data_spool()).await
695 }
696
697 pub async fn load_data_from(
698 &self,
699 data_spool: &(dyn Spool + Send + Sync),
700 ) -> anyhow::Result<()> {
701 let data = data_spool.load(*self.id()).await?;
702 let mut inner = self.msg_and_id.inner.lock();
703 let was_empty = inner.data.is_empty();
704 inner.data = Arc::new(data.into_boxed_slice());
705 if was_empty {
706 DATA_COUNT.inc();
707 }
708 Ok(())
709 }
710
711 pub fn assign_data(&self, data: Vec<u8>) {
712 let mut inner = self.msg_and_id.inner.lock();
713 let was_empty = inner.data.is_empty();
714 inner.data = Arc::new(data.into_boxed_slice());
715 inner.flags.set(MessageFlags::DATA_DIRTY, true);
716 if was_empty {
717 DATA_COUNT.inc();
718 }
719 }
720
721 pub fn get_data(&self) -> Arc<Box<[u8]>> {
722 let inner = self.msg_and_id.inner.lock();
723 inner.data.clone()
724 }
725
726 pub fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
727 &self,
728 key: S,
729 value: V,
730 ) -> anyhow::Result<()> {
731 let mut inner = self.msg_and_id.inner.lock();
732 match &mut inner.metadata {
733 None => anyhow::bail!("set_meta: metadata must be loaded first"),
734 Some(meta) => {
735 let key = key.as_ref();
736 let value = value.into();
737
738 match &mut meta.meta {
739 serde_json::Value::Object(map) => {
740 map.insert(key.to_string(), value);
741 }
742 _ => anyhow::bail!("metadata is somehow not a json object"),
743 }
744
745 inner.flags.set(MessageFlags::META_DIRTY, true);
746 Ok(())
747 }
748 }
749 }
750
751 pub fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
753 &self,
754 key: S,
755 ) -> anyhow::Result<Option<String>> {
756 match self.get_meta(key) {
757 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
758 Ok(serde_json::Value::Null) => Ok(None),
759 hmm => {
760 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
761 }
762 }
763 }
764
765 pub fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
766 let inner = self.msg_and_id.inner.lock();
767 match &inner.metadata {
768 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
769 Some(meta) => Ok(meta.meta.clone()),
770 }
771 }
772
773 pub fn get_meta<S: serde_json::value::Index>(
774 &self,
775 key: S,
776 ) -> anyhow::Result<serde_json::Value> {
777 let inner = self.msg_and_id.inner.lock();
778 match &inner.metadata {
779 None => anyhow::bail!("get_meta: metadata must be loaded first"),
780 Some(meta) => match meta.meta.get(key) {
781 Some(value) => Ok(value.clone()),
782 None => Ok(serde_json::Value::Null),
783 },
784 }
785 }
786
787 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
788 self.msg_and_id.id.age(now)
789 }
790
791 pub fn get_queue_name(&self) -> anyhow::Result<String> {
792 Ok(match self.get_meta_string("queue")? {
793 Some(name) => name,
794 None => {
795 let name = QueueNameComponents::format(
796 self.get_meta_string("campaign")?,
797 self.get_meta_string("tenant")?,
798 self.recipient()?.domain().to_string().to_lowercase(),
799 self.get_meta_string("routing_domain")?,
800 );
801 name.to_string()
802 }
803 })
804 }
805
806 #[cfg(feature = "impl")]
807 pub async fn dkim_verify(&self) -> anyhow::Result<Vec<AuthenticationResult>> {
808 let resolver = dns_resolver::get_resolver();
809 let data = self.get_data();
810 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
811
812 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
813 if parsed
814 .overall_conformance
815 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
816 {
817 return Ok(vec![AuthenticationResult {
818 method: "dkim".to_string(),
819 method_version: None,
820 result: "permerror".to_string(),
821 reason: Some("message has non-canonical line endings".to_string()),
822 props: Default::default(),
823 }]);
824 }
825 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
826
827 let from = message
828 .get_headers()
829 .from()
830 .map_err(any_err)?
831 .ok_or_else(|| anyhow::anyhow!("Missing or invalid From header"))?
832 .0;
833 if from.len() != 1 {
834 anyhow::bail!(
835 "From header must have a single sender, found {}",
836 from.len()
837 );
838 }
839 let from_domain = &from[0].address.domain;
840
841 let results =
842 kumo_dkim::verify_email_with_resolver(from_domain, &message, &**resolver).await?;
843 Ok(results)
844 }
845
846 pub fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
847 let data = self.get_data();
848 Report::parse(&data)
849 }
850
851 pub fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
852 let data = self.get_data();
853 ARFReport::parse(&data)
854 }
855
856 pub fn prepend_header(&self, name: Option<&str>, value: &str) {
857 let data = self.get_data();
858 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
859 emit_header(&mut new_data, name, value);
860 new_data.extend_from_slice(&data);
861 self.assign_data(new_data);
862 }
863
864 pub fn append_header(&self, name: Option<&str>, value: &str) {
865 let data = self.get_data();
866 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
867 for (idx, window) in data.windows(4).enumerate() {
868 if window == b"\r\n\r\n" {
869 let headers = &data[0..idx + 2];
870 let body = &data[idx + 2..];
871
872 new_data.extend_from_slice(headers);
873 emit_header(&mut new_data, name, value);
874 new_data.extend_from_slice(body);
875 self.assign_data(new_data);
876 return;
877 }
878 }
879 }
880
881 pub fn get_address_header(
882 &self,
883 header_name: &str,
884 ) -> anyhow::Result<Option<HeaderAddressList>> {
885 let data = self.get_data();
886 let HeaderParseResult { headers, .. } =
887 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
888
889 match headers.get_first(header_name) {
890 Some(hdr) => {
891 let list = hdr.as_address_list()?;
892 let result: HeaderAddressList = list.into();
893 Ok(Some(result))
894 }
895 None => Ok(None),
896 }
897 }
898
899 pub fn get_first_named_header_value(&self, name: &str) -> anyhow::Result<Option<String>> {
900 let data = self.get_data();
901 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
902
903 match headers.get_first(name) {
904 Some(hdr) => Ok(Some(hdr.as_unstructured()?)),
905 None => Ok(None),
906 }
907 }
908
909 pub fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<String>> {
910 let data = self.get_data();
911 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
912
913 let mut values = vec![];
914 for hdr in headers.iter_named(name) {
915 values.push(hdr.as_unstructured()?);
916 }
917 Ok(values)
918 }
919
920 pub fn get_all_headers(&self) -> anyhow::Result<Vec<(String, String)>> {
921 let data = self.get_data();
922 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
923
924 let mut values = vec![];
925 for hdr in headers.iter() {
926 values.push((hdr.get_name().to_string(), hdr.as_unstructured()?));
927 }
928 Ok(values)
929 }
930
931 pub fn retain_headers<F: FnMut(&Header) -> bool>(&self, mut func: F) -> anyhow::Result<()> {
932 let data = self.get_data();
933 let mut new_data = Vec::with_capacity(data.len());
934 let HeaderParseResult {
935 headers,
936 body_offset,
937 ..
938 } = Header::parse_headers(data.as_ref().as_ref())?;
939 for hdr in headers.iter() {
940 let retain = (func)(hdr);
941 if !retain {
942 continue;
943 }
944 hdr.write_header(&mut new_data)?;
945 }
946 new_data.extend_from_slice(b"\r\n");
947 new_data.extend_from_slice(&data[body_offset..]);
948 self.assign_data(new_data);
949 Ok(())
950 }
951
952 pub fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
953 let mut removed = false;
954 self.retain_headers(|hdr| {
955 if hdr.get_name().eq_ignore_ascii_case(name) && !removed {
956 removed = true;
957 false
958 } else {
959 true
960 }
961 })
962 }
963
964 pub fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
965 let data = self.get_data();
966 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
967
968 for hdr in headers.iter() {
969 let do_import = if names.is_empty() {
970 is_x_header(hdr.get_name())
971 } else {
972 is_header_in_names_list(hdr.get_name(), &names)
973 };
974 if do_import {
975 let name = imported_header_name(hdr.get_name());
976 self.set_meta(name, hdr.as_unstructured()?)?;
977 }
978 }
979
980 Ok(())
981 }
982
983 pub fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
984 self.retain_headers(|hdr| {
985 if names.is_empty() {
986 !is_x_header(hdr.get_name())
987 } else {
988 !is_header_in_names_list(hdr.get_name(), &names)
989 }
990 })
991 }
992
993 pub fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
994 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name))
995 }
996
997 #[cfg(feature = "impl")]
998 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
999 if let Some(runtime) = SIGN_POOL.get() {
1000 let msg = self.clone();
1001 runtime
1002 .spawn_blocking(move || {
1003 let data = msg.get_data();
1004 let header = signer.sign(&data)?;
1005 msg.prepend_header(None, &header);
1006 Ok::<(), anyhow::Error>(())
1007 })
1008 .await??;
1009 } else {
1010 let data = self.get_data();
1011 let header = signer.sign(&data)?;
1012 self.prepend_header(None, &header);
1013 }
1014 Ok(())
1015 }
1016
1017 pub fn import_scheduling_header(
1018 &self,
1019 header_name: &str,
1020 remove: bool,
1021 ) -> anyhow::Result<Option<Scheduling>> {
1022 if let Some(value) = self.get_first_named_header_value(header_name)? {
1023 let sched: Scheduling = serde_json::from_str(&value).with_context(|| {
1024 format!("{value} from header {header_name} is not a valid Scheduling header")
1025 })?;
1026 let result = self.set_scheduling(Some(sched))?;
1027
1028 if remove {
1029 self.remove_all_named_headers(header_name)?;
1030 }
1031
1032 Ok(result)
1033 } else {
1034 Ok(None)
1035 }
1036 }
1037
1038 pub fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1039 let data = self.get_data();
1040 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1041 let parts = msg.simplified_structure_pointers()?;
1042 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1043 match p.body()? {
1044 DecodedBody::Text(text) => {
1045 let mut text = text.as_str().to_string();
1046 text.push_str("\r\n");
1047 text.push_str(content);
1048 p.replace_text_body("text/plain", &text);
1049
1050 let new_data = msg.to_message_string();
1051 self.assign_data(new_data.into_bytes());
1052 Ok(true)
1053 }
1054 DecodedBody::Binary(_) => {
1055 anyhow::bail!("expected text/plain part to be text, but it is binary");
1056 }
1057 }
1058 } else {
1059 Ok(false)
1060 }
1061 }
1062
1063 pub fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1064 let data = self.get_data();
1065 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1066 let parts = msg.simplified_structure_pointers()?;
1067 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1068 match p.body()? {
1069 DecodedBody::Text(text) => {
1070 let mut text = text.as_str().to_string();
1071
1072 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1073 Some(idx) => {
1074 text.insert_str(idx, content);
1075 text.insert_str(idx, "\r\n");
1076 }
1077 None => {
1078 text.push_str("\r\n");
1080 text.push_str(content);
1081 }
1082 }
1083
1084 p.replace_text_body("text/html", &text);
1085
1086 let new_data = msg.to_message_string();
1087 self.assign_data(new_data.into_bytes());
1088 Ok(true)
1089 }
1090 DecodedBody::Binary(_) => {
1091 anyhow::bail!("expected text/html part to be text, but it is binary");
1092 }
1093 }
1094 } else {
1095 Ok(false)
1096 }
1097 }
1098
1099 pub fn check_fix_conformance(
1100 &self,
1101 check: MessageConformance,
1102 fix: MessageConformance,
1103 ) -> anyhow::Result<()> {
1104 let data = self.get_data();
1105 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1106
1107 let conformance = msg.conformance();
1108
1109 let check = check - fix;
1111
1112 if check.intersects(conformance) {
1113 let problems = check.intersection(conformance).to_string();
1114 anyhow::bail!("Message has conformance issues: {problems}");
1115 }
1116
1117 if fix.intersects(conformance) {
1118 let to_fix = fix.intersection(conformance);
1119 let problems = to_fix.to_string();
1120
1121 let missing_headers_only = to_fix
1122 .difference(
1123 MessageConformance::MISSING_DATE_HEADER
1124 | MessageConformance::MISSING_MIME_VERSION
1125 | MessageConformance::MISSING_MESSAGE_ID_HEADER,
1126 )
1127 .is_empty();
1128
1129 if !missing_headers_only {
1130 msg = msg.rebuild().with_context(|| {
1131 format!("Rebuilding message to correct conformance issues: {problems}")
1132 })?;
1133 }
1134
1135 if to_fix.contains(MessageConformance::MISSING_DATE_HEADER) {
1136 msg.headers_mut().set_date(Utc::now());
1137 }
1138
1139 if to_fix.contains(MessageConformance::MISSING_MIME_VERSION) {
1140 msg.headers_mut().set_mime_version("1.0");
1141 }
1142
1143 if to_fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER) {
1144 let sender = self.sender()?;
1145 let domain = sender.domain();
1146 let id = *self.id();
1147 msg.headers_mut()
1148 .set_message_id(mailparsing::MessageID(format!("{id}@{domain}")));
1149 }
1150
1151 let new_data = msg.to_message_string();
1152 self.assign_data(new_data.into_bytes());
1153 }
1154
1155 Ok(())
1156 }
1157}
1158
1159fn is_header_in_names_list(hdr_name: &str, names: &[String]) -> bool {
1160 for name in names {
1161 if hdr_name.eq_ignore_ascii_case(name) {
1162 return true;
1163 }
1164 }
1165 false
1166}
1167
1168fn imported_header_name(name: &str) -> String {
1169 name.chars()
1170 .map(|c| match c.to_ascii_lowercase() {
1171 '-' => '_',
1172 c => c,
1173 })
1174 .collect()
1175}
1176
1177fn is_x_header(name: &str) -> bool {
1178 name.starts_with("X-") || name.starts_with("x-")
1179}
1180
1181fn size_header(name: Option<&str>, value: &str) -> usize {
1182 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1183}
1184
1185fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &str) {
1186 if let Some(name) = name {
1187 dest.extend_from_slice(name.as_bytes());
1188 dest.extend_from_slice(b": ");
1189 }
1190 dest.extend_from_slice(value.as_bytes());
1191 if !value.ends_with("\r\n") {
1192 dest.extend_from_slice(b"\r\n");
1193 }
1194}
1195
1196#[cfg(feature = "impl")]
1197impl UserData for Message {
1198 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1199 methods.add_async_method(
1200 "set_meta",
1201 move |_, this, (name, value): (String, mlua::Value)| async move {
1202 this.load_meta_if_needed().await.map_err(any_err)?;
1203 let value = serde_json::value::to_value(value).map_err(any_err)?;
1204 this.set_meta(name, value).map_err(any_err)?;
1205 Ok(())
1206 },
1207 );
1208 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1209 this.load_meta_if_needed().await.map_err(any_err)?;
1210 let value = this.get_meta(name).map_err(any_err)?;
1211 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1212 });
1213 methods.add_async_method("get_data", move |lua, this, _: ()| async move {
1214 this.load_data_if_needed().await.map_err(any_err)?;
1215 let data = this.get_data();
1216 lua.create_string(&*data)
1217 });
1218 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1219 this.assign_data(data.as_bytes().to_vec());
1220 Ok(())
1221 });
1222
1223 methods.add_method("append_text_plain", move |_lua, this, data: String| {
1224 this.append_text_plain(&data).map_err(any_err)
1225 });
1226
1227 methods.add_method("append_text_html", move |_lua, this, data: String| {
1228 this.append_text_html(&data).map_err(any_err)
1229 });
1230
1231 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1232 methods.add_method("sender", move |_, this, _: ()| {
1233 this.sender().map_err(any_err)
1234 });
1235
1236 methods.add_method("num_attempts", move |_, this, _: ()| {
1237 Ok(this.get_num_attempts())
1238 });
1239
1240 methods.add_method("queue_name", move |_, this, _: ()| {
1241 this.get_queue_name().map_err(any_err)
1242 });
1243
1244 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1245 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1246 let revised_due = this.set_due(due).await.map_err(any_err)?;
1247 lua.to_value(&revised_due)
1248 });
1249
1250 methods.add_method("set_sender", move |lua, this, value: mlua::Value| {
1251 let sender = match value {
1252 mlua::Value::String(s) => {
1253 let s = s.to_str()?;
1254 EnvelopeAddress::parse(&s).map_err(any_err)?
1255 }
1256 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1257 };
1258 this.set_sender(sender).map_err(any_err)
1259 });
1260
1261 methods.add_method("recipient", move |_, this, _: ()| {
1262 this.recipient().map_err(any_err)
1263 });
1264
1265 methods.add_method("set_recipient", move |lua, this, value: mlua::Value| {
1266 let recipient = match value {
1267 mlua::Value::String(s) => {
1268 let s = s.to_str()?;
1269 EnvelopeAddress::parse(&s).map_err(any_err)?
1270 }
1271 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1272 };
1273 this.set_recipient(recipient).map_err(any_err)
1274 });
1275
1276 #[cfg(feature = "impl")]
1277 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1278 this.dkim_sign(signer).await.map_err(any_err)
1279 });
1280
1281 methods.add_async_method("shrink", |_, this, _: ()| async move {
1282 if this.needs_save() {
1283 this.save(None).await.map_err(any_err)?;
1284 }
1285 this.shrink().map_err(any_err)
1286 });
1287
1288 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1289 if this.needs_save() {
1290 this.save(None).await.map_err(any_err)?;
1291 }
1292 this.shrink_data().map_err(any_err)
1293 });
1294
1295 methods.add_method(
1296 "add_authentication_results",
1297 move |lua, this, (serv_id, results): (String, mlua::Value)| {
1298 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1299 let results = AuthenticationResults {
1300 serv_id,
1301 version: None,
1302 results,
1303 };
1304
1305 this.prepend_header(Some("Authentication-Results"), &results.encode_value());
1306
1307 Ok(())
1308 },
1309 );
1310
1311 #[cfg(feature = "impl")]
1312 methods.add_async_method("dkim_verify", |lua, this, ()| async move {
1313 let results = this.dkim_verify().await.map_err(any_err)?;
1314 lua.to_value_with(&results, serialize_options())
1315 });
1316
1317 methods.add_method(
1318 "prepend_header",
1319 move |_, this, (name, value): (String, String)| {
1320 this.prepend_header(Some(&name), &value);
1321 Ok(())
1322 },
1323 );
1324 methods.add_method(
1325 "append_header",
1326 move |_, this, (name, value): (String, String)| {
1327 this.append_header(Some(&name), &value);
1328 Ok(())
1329 },
1330 );
1331 methods.add_method("get_address_header", move |_, this, name: String| {
1332 this.get_address_header(&name).map_err(any_err)
1333 });
1334 methods.add_method("from_header", move |_, this, ()| {
1335 this.get_address_header("From").map_err(any_err)
1336 });
1337 methods.add_method("to_header", move |_, this, ()| {
1338 this.get_address_header("To").map_err(any_err)
1339 });
1340
1341 methods.add_method(
1342 "get_first_named_header_value",
1343 move |_, this, name: String| this.get_first_named_header_value(&name).map_err(any_err),
1344 );
1345 methods.add_method(
1346 "get_all_named_header_values",
1347 move |_, this, name: String| this.get_all_named_header_values(&name).map_err(any_err),
1348 );
1349 methods.add_method("get_all_headers", move |_, this, _: ()| {
1350 Ok(this
1351 .get_all_headers()
1352 .map_err(any_err)?
1353 .into_iter()
1354 .map(|(name, value)| vec![name, value])
1355 .collect::<Vec<Vec<String>>>())
1356 });
1357 methods.add_method("get_all_headers", move |_, this, _: ()| {
1358 Ok(this
1359 .get_all_headers()
1360 .map_err(any_err)?
1361 .into_iter()
1362 .map(|(name, value)| vec![name, value])
1363 .collect::<Vec<Vec<String>>>())
1364 });
1365 methods.add_method(
1366 "import_x_headers",
1367 move |_, this, names: Option<Vec<String>>| {
1368 this.import_x_headers(names.unwrap_or_default())
1369 .map_err(any_err)
1370 },
1371 );
1372
1373 methods.add_method(
1374 "remove_x_headers",
1375 move |_, this, names: Option<Vec<String>>| {
1376 this.remove_x_headers(names.unwrap_or_default())
1377 .map_err(any_err)
1378 },
1379 );
1380 methods.add_method("remove_all_named_headers", move |_, this, name: String| {
1381 this.remove_all_named_headers(&name).map_err(any_err)
1382 });
1383
1384 methods.add_method(
1385 "import_scheduling_header",
1386 move |lua, this, (header_name, remove): (String, bool)| {
1387 let opt_schedule = this
1388 .import_scheduling_header(&header_name, remove)
1389 .map_err(any_err)?;
1390 lua.to_value(&opt_schedule)
1391 },
1392 );
1393
1394 methods.add_method("set_scheduling", move |lua, this, params: mlua::Value| {
1395 let sched: Option<Scheduling> = from_lua_value(lua, params)?;
1396 let opt_schedule = this.set_scheduling(sched).map_err(any_err)?;
1397 lua.to_value(&opt_schedule)
1398 });
1399
1400 methods.add_method("parse_rfc3464", move |lua, this, _: ()| {
1401 let report = this.parse_rfc3464().map_err(any_err)?;
1402 match report {
1403 Some(report) => lua.to_value_with(&report, serialize_options()),
1404 None => Ok(mlua::Value::Nil),
1405 }
1406 });
1407
1408 methods.add_method("parse_rfc5965", move |lua, this, _: ()| {
1409 let report = this.parse_rfc5965().map_err(any_err)?;
1410 match report {
1411 Some(report) => lua.to_value_with(&report, serialize_options()),
1412 None => Ok(mlua::Value::Nil),
1413 }
1414 });
1415
1416 methods.add_async_method("save", |_, this, ()| async move {
1417 this.save(None).await.map_err(any_err)
1418 });
1419
1420 methods.add_method("set_force_sync", move |_, this, force: bool| {
1421 this.set_force_sync(force);
1422 Ok(())
1423 });
1424
1425 methods.add_async_method(
1426 "check_fix_conformance",
1427 |_, this, (check, fix): (String, String)| async move {
1428 use std::str::FromStr;
1429 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1430 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1431
1432 match this.check_fix_conformance(check, fix) {
1433 Ok(_) => Ok(None),
1434 Err(err) => Ok(Some(format!("{err:#}"))),
1435 }
1436 },
1437 );
1438 }
1439}
1440
1441impl TimerEntryWithDelay for WeakMessage {
1442 fn delay(&self) -> Duration {
1443 match self.upgrade() {
1444 None => {
1445 Duration::from_millis(0)
1447 }
1448 Some(msg) => msg.delay(),
1449 }
1450 }
1451}
1452
1453impl TimerEntryWithDelay for Message {
1454 fn delay(&self) -> Duration {
1455 let inner = self.msg_and_id.inner.lock();
1456 match inner.due {
1457 Some(time) => {
1458 let now = Utc::now();
1459 let delta = time - now;
1460 delta.to_std().unwrap_or(Duration::from_millis(0))
1461 }
1462 None => Duration::from_millis(0),
1463 }
1464 }
1465}
1466
1467#[cfg(test)]
1468mod test {
1469 use super::*;
1470 use serde_json::json;
1471
1472 fn new_msg_body<S: AsRef<str>>(s: S) -> Message {
1473 Message::new_dirty(
1474 SpoolId::new(),
1475 EnvelopeAddress::parse("sender@example.com").unwrap(),
1476 EnvelopeAddress::parse("recip@example.com").unwrap(),
1477 serde_json::json!({}),
1478 Arc::new(s.as_ref().as_bytes().to_vec().into_boxed_slice()),
1479 )
1480 .unwrap()
1481 }
1482
1483 fn data_as_string(msg: &Message) -> String {
1484 String::from_utf8(msg.get_data().to_vec()).unwrap()
1485 }
1486
1487 const X_HDR_CONTENT: &str =
1488 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1489
1490 #[test]
1491 fn import_all_x_headers() {
1492 let msg = new_msg_body(X_HDR_CONTENT);
1493
1494 msg.import_x_headers(vec![]).unwrap();
1495 k9::assert_equal!(
1496 msg.get_meta_obj().unwrap(),
1497 json!({
1498 "x_hello": "there",
1499 "x_header": "value",
1500 })
1501 );
1502 }
1503
1504 #[test]
1505 fn meta_and_nil() {
1506 let msg = new_msg_body(X_HDR_CONTENT);
1507 msg.set_meta("test", serde_json::Value::Null).unwrap();
1509 k9::assert_equal!(msg.get_meta("test").unwrap(), serde_json::Value::Null);
1510
1511 let lua = mlua::Lua::new();
1513 lua.globals().set("msg", msg).unwrap();
1514 lua.load("assert(msg:get_meta('test') == nil)")
1515 .exec()
1516 .unwrap();
1517 }
1518
1519 #[test]
1520 fn import_some_x_headers() {
1521 let msg = new_msg_body(X_HDR_CONTENT);
1522
1523 msg.import_x_headers(vec!["x-hello".to_string()]).unwrap();
1524 k9::assert_equal!(
1525 msg.get_meta_obj().unwrap(),
1526 json!({
1527 "x_hello": "there",
1528 })
1529 );
1530 }
1531
1532 #[test]
1533 fn remove_all_x_headers() {
1534 let msg = new_msg_body(X_HDR_CONTENT);
1535
1536 msg.remove_x_headers(vec![]).unwrap();
1537 k9::assert_equal!(
1538 data_as_string(&msg),
1539 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1540 );
1541 }
1542
1543 #[test]
1544 fn prepend_header_2_params() {
1545 let msg = new_msg_body(X_HDR_CONTENT);
1546
1547 msg.prepend_header(Some("Date"), "Today");
1548 k9::assert_equal!(
1549 data_as_string(&msg),
1550 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1551 );
1552 }
1553
1554 #[test]
1555 fn prepend_header_1_params() {
1556 let msg = new_msg_body(X_HDR_CONTENT);
1557
1558 msg.prepend_header(None, "Date: Today");
1559 k9::assert_equal!(
1560 data_as_string(&msg),
1561 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1562 );
1563 }
1564
1565 #[test]
1566 fn append_header_2_params() {
1567 let msg = new_msg_body(X_HDR_CONTENT);
1568
1569 msg.append_header(Some("Date"), "Today");
1570 k9::assert_equal!(
1571 data_as_string(&msg),
1572 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1573 );
1574 }
1575
1576 #[test]
1577 fn append_header_1_params() {
1578 let msg = new_msg_body(X_HDR_CONTENT);
1579
1580 msg.append_header(None, "Date: Today");
1581 k9::assert_equal!(
1582 data_as_string(&msg),
1583 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1584 );
1585 }
1586
1587 const MULTI_HEADER_CONTENT: &str =
1588 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1589
1590 #[test]
1591 fn get_first_header() {
1592 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1593 k9::assert_equal!(
1594 msg.get_first_named_header_value("X-header")
1595 .unwrap()
1596 .unwrap(),
1597 "value"
1598 );
1599 }
1600
1601 #[test]
1602 fn get_all_header() {
1603 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1604 k9::assert_equal!(
1605 msg.get_all_named_header_values("X-header").unwrap(),
1606 vec!["value".to_string(), "another value".to_string()]
1607 );
1608 }
1609
1610 #[test]
1611 fn remove_first() {
1612 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1613 msg.remove_first_named_header("X-header").unwrap();
1614 k9::assert_equal!(
1615 data_as_string(&msg),
1616 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1617 );
1618 }
1619
1620 #[test]
1621 fn remove_all() {
1622 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1623 msg.remove_all_named_headers("X-header").unwrap();
1624 k9::assert_equal!(
1625 data_as_string(&msg),
1626 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1627 );
1628 }
1629
1630 #[test]
1631 fn append_text_plain() {
1632 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1633 msg.append_text_plain("I am at the bottom").unwrap();
1634 k9::assert_equal!(
1635 data_as_string(&msg),
1636 "X-Hello: there\r\n\
1637 X-Header: value\r\n\
1638 Subject: Hello\r\n\
1639 X-Header: another value\r\n\
1640 From :Someone@somewhere\r\n\
1641 Content-Type: text/plain;\r\n\
1642 \tcharset=\"us-ascii\"\r\n\
1643 \r\n\
1644 Body\r\n\
1645 I am at the bottom\r\n"
1646 );
1647 }
1648
1649 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1650\tboundary=\"my-boundary\"\r\n\
1651\r\n\
1652--my-boundary\r\n\
1653Content-Type: text/plain;\r\n\
1654\tcharset=\"us-ascii\"\r\n\
1655\r\n\
1656plain text\r\n\
1657--my-boundary\r\n\
1658Content-Type: text/html;\r\n\
1659\tcharset=\"us-ascii\"\r\n\
1660\r\n\
1661<b>rich</b> text\r\n\
1662--my-boundary\r\n\
1663Content-Type: application/octet-stream\r\n\
1664Content-Transfer-Encoding: base64\r\n\
1665Content-Disposition: attachment;\r\n\
1666\tfilename=\"woot.bin\"\r\n\
1667Content-ID: <woot.id@somewhere>\r\n\
1668\r\n\
1669AAECAw==\r\n\
1670--my-boundary--\r\n\
1671\r\n";
1672
1673 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1674\tboundary=\"my-boundary\"\r\n\
1675\r\n\
1676--my-boundary\r\n\
1677Content-Type: text/plain;\r\n\
1678\tcharset=\"us-ascii\"\r\n\
1679\r\n\
1680plain text\r\n\
1681--my-boundary\r\n\
1682Content-Type: text/html;\r\n\
1683\tcharset=\"us-ascii\"\r\n\
1684\r\n\
1685<BODY>\r\n\
1686<b>rich</b> text\r\n\
1687</BODY>\r\n\
1688--my-boundary\r\n\
1689Content-Type: application/octet-stream\r\n\
1690Content-Transfer-Encoding: base64\r\n\
1691Content-Disposition: attachment;\r\n\
1692\tfilename=\"woot.bin\"\r\n\
1693Content-ID: <woot.id>\r\n\
1694\r\n\
1695AAECAw==\r\n\
1696--my-boundary--\r\n\
1697\r\n";
1698
1699 #[test]
1700 fn append_text_html() {
1701 let msg = new_msg_body(MIXED_CONTENT);
1702 msg.append_text_html("bottom html").unwrap();
1703 k9::snapshot!(
1704 data_as_string(&msg),
1705 r#"
1706Content-Type: multipart/mixed;\r
1707\tboundary="my-boundary"\r
1708\r
1709--my-boundary\r
1710Content-Type: text/plain;\r
1711\tcharset="us-ascii"\r
1712\r
1713plain text\r
1714--my-boundary\r
1715Content-Type: text/html;\r
1716\tcharset="us-ascii"\r
1717\r
1718<b>rich</b> text\r
1719\r
1720bottom html\r
1721--my-boundary\r
1722Content-Type: application/octet-stream\r
1723Content-Transfer-Encoding: base64\r
1724Content-Disposition: attachment;\r
1725\tfilename="woot.bin"\r
1726Content-ID: <woot.id@somewhere>\r
1727\r
1728AAECAw==\r
1729--my-boundary--\r
1730\r
1731
1732"#
1733 );
1734
1735 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
1736 msg.append_text_html("bottom html 👻").unwrap();
1737 k9::snapshot!(
1738 data_as_string(&msg),
1739 r#"
1740Content-Type: multipart/mixed;\r
1741\tboundary="my-boundary"\r
1742\r
1743--my-boundary\r
1744Content-Type: text/plain;\r
1745\tcharset="us-ascii"\r
1746\r
1747plain text\r
1748--my-boundary\r
1749Content-Type: text/html;\r
1750\tcharset="utf-8"\r
1751Content-Transfer-Encoding: quoted-printable\r
1752\r
1753<BODY>\r
1754<b>rich</b> text\r
1755\r
1756bottom html =F0=9F=91=BB</BODY>\r
1757--my-boundary\r
1758Content-Type: application/octet-stream\r
1759Content-Transfer-Encoding: base64\r
1760Content-Disposition: attachment;\r
1761\tfilename="woot.bin"\r
1762Content-ID: <woot.id>\r
1763\r
1764AAECAw==\r
1765--my-boundary--\r
1766\r
1767
1768"#
1769 );
1770 }
1771
1772 #[test]
1773 fn append_text_plain_mixed() {
1774 let msg = new_msg_body(MIXED_CONTENT);
1775 msg.append_text_plain("bottom text 👾").unwrap();
1776 k9::snapshot!(
1777 data_as_string(&msg),
1778 r#"
1779Content-Type: multipart/mixed;\r
1780\tboundary="my-boundary"\r
1781\r
1782--my-boundary\r
1783Content-Type: text/plain;\r
1784\tcharset="utf-8"\r
1785Content-Transfer-Encoding: quoted-printable\r
1786\r
1787plain text\r
1788\r
1789bottom text =F0=9F=91=BE\r
1790--my-boundary\r
1791Content-Type: text/html;\r
1792\tcharset="us-ascii"\r
1793\r
1794<b>rich</b> text\r
1795--my-boundary\r
1796Content-Type: application/octet-stream\r
1797Content-Transfer-Encoding: base64\r
1798Content-Disposition: attachment;\r
1799\tfilename="woot.bin"\r
1800Content-ID: <woot.id@somewhere>\r
1801\r
1802AAECAw==\r
1803--my-boundary--\r
1804\r
1805
1806"#
1807 );
1808 }
1809
1810 #[test]
1811 fn check_conformance_angle_msg_id() {
1812 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
1813Message-ID: <<1234@example.com>>\r
1814\r
1815Hello";
1816 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
1817 k9::snapshot!(
1818 msg.check_fix_conformance(
1819 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1820 MessageConformance::empty(),
1821 )
1822 .unwrap_err(),
1823 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
1824 );
1825
1826 msg.check_fix_conformance(
1827 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1828 MessageConformance::MISSING_MESSAGE_ID_HEADER,
1829 )
1830 .unwrap();
1831
1832 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
1847Message-ID: <<1234@example.com>>\r
1848\r
1849Hello this is a really long line Hello this is a really long line \
1850Hello this is a really long line Hello this is a really long line \
1851Hello this is a really long line Hello this is a really long line \
1852Hello this is a really long line Hello this is a really long line \
1853Hello this is a really long line Hello this is a really long line \
1854Hello this is a really long line Hello this is a really long line \
1855Hello this is a really long line Hello this is a really long line
1856";
1857 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
1858 msg.check_fix_conformance(
1859 MessageConformance::MISSING_COLON_VALUE,
1860 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
1861 )
1862 .unwrap();
1863
1864 }
1888
1889 #[test]
1890 fn check_conformance() {
1891 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1892 msg.check_fix_conformance(
1893 MessageConformance::default(),
1894 MessageConformance::MISSING_MIME_VERSION,
1895 )
1896 .unwrap();
1897 k9::snapshot!(
1898 data_as_string(&msg),
1899 r#"
1900X-Hello: there\r
1901X-Header: value\r
1902Subject: Hello\r
1903X-Header: another value\r
1904From :Someone@somewhere\r
1905Mime-Version: 1.0\r
1906\r
1907Body
1908"#
1909 );
1910
1911 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1912 msg.check_fix_conformance(
1913 MessageConformance::default(),
1914 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
1915 )
1916 .unwrap();
1917 k9::snapshot!(
1918 data_as_string(&msg),
1919 r#"
1920Content-Type: text/plain;\r
1921\tcharset="us-ascii"\r
1922X-Hello: there\r
1923X-Header: value\r
1924Subject: Hello\r
1925X-Header: another value\r
1926From: <Someone@somewhere>\r
1927Mime-Version: 1.0\r
1928\r
1929Body\r
1930
1931"#
1932 );
1933 }
1934
1935 #[test]
1936 fn set_scheduling() -> anyhow::Result<()> {
1937 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1938 assert!(msg.get_due().is_none(), "due is implicitly now");
1939
1940 let now = Utc::now();
1941 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
1942
1943 msg.set_scheduling(Some(Scheduling {
1944 restriction: None,
1945 first_attempt: Some((now + one_day).into()),
1946 expires: None,
1947 }))?;
1948
1949 let due = msg.get_due().expect("due to now be set");
1950 assert!(due - now >= one_day, "due time is at least 1 day away");
1951
1952 Ok(())
1953 }
1954
1955 #[cfg(all(test, target_pointer_width = "64"))]
1956 #[test]
1957 fn sizes() {
1958 assert_eq!(std::mem::size_of::<Message>(), 8);
1959 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
1960 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
1961 }
1962}