1use crate::address::HeaderAddressList;
2#[cfg(feature = "impl")]
3use crate::dkim::Signer;
4#[cfg(feature = "impl")]
5use crate::dkim::SIGN_POOL;
6pub use crate::queue_name::QueueNameComponents;
7use crate::scheduling::Scheduling;
8use anyhow::Context;
9use bstr::{BString, ByteSlice, ByteVec};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "impl")]
12use config::{any_err, from_lua_value, serialize_options};
13use futures::FutureExt;
14use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
15use kumo_chrono_helper::*;
16#[cfg(feature = "impl")]
17use kumo_dkim::arc::ARC;
18use kumo_log_types::rfc3464::Report;
19use kumo_log_types::rfc5965::ARFReport;
20use kumo_prometheus::declare_metric;
21#[cfg(feature = "impl")]
22use mailparsing::{AuthenticationResult, AuthenticationResults, EncodeHeaderValue};
23use mailparsing::{
24 CheckFixSettings, DecodedBody, Header, HeaderParseResult, MessageConformance, MimePart,
25};
26#[cfg(feature = "impl")]
27use mlua::{IntoLua, LuaSerdeExt, UserData, UserDataMethods};
28#[cfg(feature = "impl")]
29use mod_dns_resolver::get_resolver_instance;
30use parking_lot::Mutex;
31use rfc5321::parser::EnvelopeAddress;
32use serde::{Deserialize, Serialize};
33use serde_with::formats::PreferOne;
34use serde_with::{serde_as, OneOrMany};
35use spool::{get_data_spool, get_meta_spool, Spool, SpoolId};
36use std::hash::Hash;
37use std::sync::{Arc, LazyLock, Weak};
38use std::time::{Duration, Instant};
39use timeq::TimerEntryWithDelay;
40
41bitflags::bitflags! {
42 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
43 struct MessageFlags: u8 {
44 const META_DIRTY = 1;
46 const DATA_DIRTY = 2;
48 const SCHEDULED = 4;
50 const FORCE_SYNC = 8;
52 }
53}
54
55declare_metric! {
56static MESSAGE_COUNT: IntGauge("message_count");
63}
64
65declare_metric! {
66static META_COUNT: IntGauge("message_meta_resident_count");
73}
74
75declare_metric! {
76static DATA_COUNT: IntGauge("message_data_resident_count");
86}
87
88static NO_DATA: LazyLock<Arc<Box<[u8]>>> = LazyLock::new(|| Arc::new(vec![].into_boxed_slice()));
89
90declare_metric! {
91static SAVE_HIST: Histogram("message_save_latency");
101}
102
103declare_metric! {
104static LOAD_DATA_HIST: Histogram("message_data_load_latency");
115}
116
117declare_metric! {
118static LOAD_META_HIST: Histogram("message_meta_load_latency");
124}
125
126#[derive(Debug)]
127struct MessageInner {
128 metadata: Option<Box<MetaData>>,
129 data: Arc<Box<[u8]>>,
130 flags: MessageFlags,
131 num_attempts: u16,
132 due: Option<DateTime<Utc>>,
133}
134
135#[derive(Debug)]
136pub(crate) struct MessageWithId {
137 id: SpoolId,
138 inner: Mutex<MessageInner>,
139 link: LinkedListAtomicLink,
140}
141
142intrusive_adapter!(
143 pub(crate) MessageWithIdAdapter = Arc<MessageWithId>: MessageWithId { link: LinkedListAtomicLink }
144);
145
146pub struct MessageList {
151 list: LinkedList<MessageWithIdAdapter>,
152 len: usize,
153}
154
155impl Default for MessageList {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161impl MessageList {
162 pub fn new() -> Self {
164 Self {
165 list: LinkedList::new(MessageWithIdAdapter::default()),
166 len: 0,
167 }
168 }
169
170 pub fn len(&self) -> usize {
172 self.len
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.len == 0
177 }
178
179 pub fn take(&mut self) -> Self {
182 let new_list = Self {
183 list: self.list.take(),
184 len: self.len,
185 };
186 self.len = 0;
187 new_list
188 }
189
190 pub fn push_back(&mut self, message: Message) {
192 self.list.push_back(message.msg_and_id);
193 self.len += 1;
194 }
195
196 pub fn pop_front(&mut self) -> Option<Message> {
198 self.list.pop_front().map(|msg_and_id| {
199 self.len -= 1;
200 Message { msg_and_id }
201 })
202 }
203
204 pub fn pop_back(&mut self) -> Option<Message> {
206 self.list.pop_back().map(|msg_and_id| {
207 self.len -= 1;
208 Message { msg_and_id }
209 })
210 }
211
212 pub fn drain(&mut self) -> Vec<Message> {
216 let mut messages = Vec::with_capacity(self.len);
217 while let Some(msg) = self.pop_front() {
218 messages.push(msg);
219 }
220 messages
221 }
222
223 pub fn extend_from_iter<I>(&mut self, iter: I)
224 where
225 I: Iterator<Item = Message>,
226 {
227 for msg in iter {
228 self.push_back(msg)
229 }
230 }
231}
232
233impl IntoIterator for MessageList {
234 type Item = Message;
235 type IntoIter = MessageListIter;
236 fn into_iter(self) -> MessageListIter {
237 MessageListIter { list: self.list }
238 }
239}
240
241pub struct MessageListIter {
242 list: LinkedList<MessageWithIdAdapter>,
243}
244
245impl Iterator for MessageListIter {
246 type Item = Message;
247 fn next(&mut self) -> Option<Message> {
248 self.list
249 .pop_front()
250 .map(|msg_and_id| Message { msg_and_id })
251 }
252}
253
254#[derive(Clone, Debug)]
255#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
256pub struct Message {
257 pub(crate) msg_and_id: Arc<MessageWithId>,
258}
259
260impl PartialEq for Message {
261 fn eq(&self, other: &Self) -> bool {
262 self.id() == other.id()
263 }
264}
265impl Eq for Message {}
266
267impl Hash for Message {
268 fn hash<H>(&self, hasher: &mut H)
269 where
270 H: std::hash::Hasher,
271 {
272 self.id().hash(hasher)
273 }
274}
275
276#[derive(Clone, Debug)]
277pub struct WeakMessage {
278 weak: Weak<MessageWithId>,
279}
280
281impl WeakMessage {
282 pub fn upgrade(&self) -> Option<Message> {
283 Some(Message {
284 msg_and_id: self.weak.upgrade()?,
285 })
286 }
287}
288
289#[serde_as]
290#[derive(Debug, Serialize, Deserialize, Clone)]
291pub(crate) struct MetaData {
292 pub sender: EnvelopeAddress,
293 #[serde_as(as = "OneOrMany<_, PreferOne>")]
294 pub recipient: Vec<EnvelopeAddress>,
295 pub meta: serde_json::Value,
296 #[serde(default)]
297 pub schedule: Option<Scheduling>,
298}
299
300impl Drop for MessageInner {
301 fn drop(&mut self) {
302 if self.metadata.is_some() {
303 META_COUNT.dec();
304 }
305 if !self.data.is_empty() {
306 DATA_COUNT.dec();
307 }
308 MESSAGE_COUNT.dec();
309 }
310}
311
312impl Message {
313 pub fn new_dirty(
316 id: SpoolId,
317 sender: EnvelopeAddress,
318 recipient: Vec<EnvelopeAddress>,
319 meta: serde_json::Value,
320 data: Arc<Box<[u8]>>,
321 ) -> anyhow::Result<Self> {
322 anyhow::ensure!(meta.is_object(), "metadata must be a json object");
323 MESSAGE_COUNT.inc();
324 DATA_COUNT.inc();
325 META_COUNT.inc();
326 Ok(Self {
327 msg_and_id: Arc::new(MessageWithId {
328 id,
329 inner: Mutex::new(MessageInner {
330 metadata: Some(Box::new(MetaData {
331 sender,
332 recipient,
333 meta,
334 schedule: None,
335 })),
336 data,
337 flags: MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
338 num_attempts: 0,
339 due: None,
340 }),
341 link: LinkedListAtomicLink::default(),
342 }),
343 })
344 }
345
346 pub fn weak(&self) -> WeakMessage {
347 WeakMessage {
348 weak: Arc::downgrade(&self.msg_and_id),
349 }
350 }
351
352 pub fn new_from_spool(id: SpoolId, metadata: Vec<u8>) -> anyhow::Result<Self> {
356 let metadata: MetaData = serde_json::from_slice(&metadata)?;
357 MESSAGE_COUNT.inc();
358 META_COUNT.inc();
359
360 let flags = if metadata.schedule.is_some() {
361 MessageFlags::SCHEDULED
362 } else {
363 MessageFlags::empty()
364 };
365
366 Ok(Self {
367 msg_and_id: Arc::new(MessageWithId {
368 id,
369 inner: Mutex::new(MessageInner {
370 metadata: Some(Box::new(metadata)),
371 data: NO_DATA.clone(),
372 flags,
373 num_attempts: 0,
374 due: None,
375 }),
376 link: LinkedListAtomicLink::default(),
377 }),
378 })
379 }
380
381 pub(crate) fn new_from_parts(id: SpoolId, metadata: MetaData, data: Arc<Box<[u8]>>) -> Self {
382 MESSAGE_COUNT.inc();
383 META_COUNT.inc();
384
385 let flags = if metadata.schedule.is_some() {
386 MessageFlags::SCHEDULED
387 } else {
388 MessageFlags::empty()
389 };
390
391 Self {
392 msg_and_id: Arc::new(MessageWithId {
393 id,
394 inner: Mutex::new(MessageInner {
395 metadata: Some(Box::new(metadata)),
396 data,
397 flags: flags | MessageFlags::META_DIRTY | MessageFlags::DATA_DIRTY,
398 num_attempts: 0,
399 due: None,
400 }),
401 link: LinkedListAtomicLink::default(),
402 }),
403 }
404 }
405
406 pub async fn new_with_id(id: SpoolId) -> anyhow::Result<Self> {
407 let meta_spool = get_meta_spool();
408 let data = meta_spool.load(id).await?;
409 Self::new_from_spool(id, data)
410 }
411
412 pub fn get_num_attempts(&self) -> u16 {
413 let inner = self.msg_and_id.inner.lock();
414 inner.num_attempts
415 }
416
417 pub fn set_num_attempts(&self, num_attempts: u16) {
418 let mut inner = self.msg_and_id.inner.lock();
419 inner.num_attempts = num_attempts;
420 }
421
422 pub fn increment_num_attempts(&self) {
423 let mut inner = self.msg_and_id.inner.lock();
424 inner.num_attempts += 1;
425 }
426
427 pub async fn set_scheduling(
428 &self,
429 scheduling: Option<Scheduling>,
430 ) -> anyhow::Result<Option<Scheduling>> {
431 self.load_meta_if_needed().await?;
432 let mut inner = self.msg_and_id.inner.lock();
433 match &mut inner.metadata {
434 None => anyhow::bail!("set_scheduling: metadata must be loaded first"),
435 Some(meta) => {
436 meta.schedule = scheduling;
437 inner
438 .flags
439 .set(MessageFlags::SCHEDULED, scheduling.is_some());
440 if let Some(sched) = scheduling {
441 let due = inner.due.unwrap_or_else(Utc::now);
442 inner.due = Some(sched.adjust_for_schedule(due));
443 }
444 Ok(scheduling)
445 }
446 }
447 }
448
449 pub async fn get_scheduling(&self) -> anyhow::Result<Option<Scheduling>> {
450 self.load_meta_if_needed().await?;
451 let inner = self.msg_and_id.inner.lock();
452 Ok(inner.metadata.as_ref().and_then(|meta| meta.schedule))
453 }
454
455 pub fn get_due(&self) -> Option<DateTime<Utc>> {
456 let inner = self.msg_and_id.inner.lock();
457 inner.due
458 }
459
460 pub async fn delay_with_jitter(&self, limit: i64) -> anyhow::Result<Option<DateTime<Utc>>> {
461 let scale = rand::random::<f32>();
462 let value = (scale * limit as f32) as i64;
463 self.delay_by(seconds(value)?).await
464 }
465
466 pub async fn delay_by(
467 &self,
468 duration: chrono::Duration,
469 ) -> anyhow::Result<Option<DateTime<Utc>>> {
470 let due = Utc::now() + duration;
471 self.set_due(Some(due)).await
472 }
473
474 pub async fn delay_by_and_jitter(
476 &self,
477 duration: chrono::Duration,
478 ) -> anyhow::Result<Option<DateTime<Utc>>> {
479 let scale = rand::random::<f32>();
480 let value = (scale * 60.) as i64;
481 let due = Utc::now() + duration + seconds(value)?;
482 self.set_due(Some(due)).await
483 }
484
485 pub async fn set_due(
486 &self,
487 due: Option<DateTime<Utc>>,
488 ) -> anyhow::Result<Option<DateTime<Utc>>> {
489 let due = {
490 let mut inner = self.msg_and_id.inner.lock();
491
492 if !inner.flags.contains(MessageFlags::SCHEDULED) {
493 inner.due = due;
495 return Ok(inner.due);
496 }
497
498 let due = due.unwrap_or_else(Utc::now);
499
500 if let Some(meta) = &inner.metadata {
501 inner.due = match &meta.schedule {
502 Some(sched) => Some(sched.adjust_for_schedule(due)),
503 None => Some(due),
504 };
505 return Ok(inner.due);
506 }
507
508 due
511 };
512
513 self.load_meta().await?;
514
515 {
516 let mut inner = self.msg_and_id.inner.lock();
517 match &inner.metadata {
518 Some(meta) => {
519 inner.due = match &meta.schedule {
520 Some(sched) => Some(sched.adjust_for_schedule(due)),
521 None => Some(due),
522 };
523 Ok(inner.due)
524 }
525 None => anyhow::bail!("loaded metadata, but metadata is not set!?"),
526 }
527 }
528 }
529
530 fn get_data_if_dirty(&self) -> Option<Arc<Box<[u8]>>> {
531 let inner = self.msg_and_id.inner.lock();
532 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
533 Some(Arc::clone(&inner.data))
534 } else {
535 None
536 }
537 }
538
539 fn get_meta_if_dirty(&self) -> Option<MetaData> {
540 let inner = self.msg_and_id.inner.lock();
541 if inner.flags.contains(MessageFlags::META_DIRTY) {
542 inner.metadata.as_ref().map(|md| (**md).clone())
543 } else {
544 None
545 }
546 }
547
548 pub(crate) async fn clone_meta_data(&self) -> anyhow::Result<MetaData> {
549 self.load_meta_if_needed().await?;
550 let inner = self.msg_and_id.inner.lock();
551 inner
552 .metadata
553 .as_ref()
554 .ok_or_else(|| anyhow::anyhow!("metadata not loaded even though we just loaded it"))
555 .map(|md| (**md).clone())
556 }
557
558 pub fn set_force_sync(&self, force: bool) {
559 let mut inner = self.msg_and_id.inner.lock();
560 inner.flags.set(MessageFlags::FORCE_SYNC, force);
561 }
562
563 pub fn needs_save(&self) -> bool {
564 let inner = self.msg_and_id.inner.lock();
565 inner.flags.contains(MessageFlags::META_DIRTY)
566 || inner.flags.contains(MessageFlags::DATA_DIRTY)
567 }
568
569 pub async fn save(&self, deadline: Option<Instant>) -> anyhow::Result<()> {
570 let _timer = SAVE_HIST.start_timer();
571 self.save_to(&**get_meta_spool(), &**get_data_spool(), deadline)
572 .await
573 }
574
575 pub async fn save_to(
576 &self,
577 meta_spool: &(dyn Spool + Send + Sync),
578 data_spool: &(dyn Spool + Send + Sync),
579 deadline: Option<Instant>,
580 ) -> anyhow::Result<()> {
581 let force_sync = self
582 .msg_and_id
583 .inner
584 .lock()
585 .flags
586 .contains(MessageFlags::FORCE_SYNC);
587
588 let data_fut = if let Some(data) = self.get_data_if_dirty() {
589 anyhow::ensure!(!data.is_empty(), "message data must not be empty");
590 data_spool
591 .store(self.msg_and_id.id, data, force_sync, deadline)
592 .map(|_| true)
593 .boxed()
594 } else {
595 futures::future::ready(false).boxed()
596 };
597 let meta_fut = if let Some(meta) = self.get_meta_if_dirty() {
598 let meta = Arc::new(serde_json::to_vec(&meta)?.into_boxed_slice());
599 meta_spool
600 .store(self.msg_and_id.id, meta, force_sync, deadline)
601 .map(|_| true)
602 .boxed()
603 } else {
604 futures::future::ready(false).boxed()
605 };
606
607 let (data_res, meta_res) = tokio::join!(data_fut, meta_fut);
613
614 if data_res {
615 self.msg_and_id
616 .inner
617 .lock()
618 .flags
619 .remove(MessageFlags::DATA_DIRTY);
620 }
621 if meta_res {
622 self.msg_and_id
623 .inner
624 .lock()
625 .flags
626 .remove(MessageFlags::META_DIRTY);
627 }
628 Ok(())
629 }
630
631 pub fn id(&self) -> &SpoolId {
632 &self.msg_and_id.id
633 }
634
635 pub async fn save_and_shrink(&self) -> anyhow::Result<bool> {
637 self.save(None).await?;
638 self.shrink()
639 }
640
641 pub async fn save_and_shrink_data(&self) -> anyhow::Result<bool> {
643 self.save(None).await?;
644 self.shrink_data()
645 }
646
647 pub fn shrink_data(&self) -> anyhow::Result<bool> {
648 let mut inner = self.msg_and_id.inner.lock();
649 let mut did_shrink = false;
650 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
651 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
652 }
653 if !inner.data.is_empty() {
654 DATA_COUNT.dec();
655 did_shrink = true;
656 }
657 if !inner.data.is_empty() {
658 inner.data = NO_DATA.clone();
659 did_shrink = true;
660 }
661 Ok(did_shrink)
662 }
663
664 pub fn shrink(&self) -> anyhow::Result<bool> {
665 let mut inner = self.msg_and_id.inner.lock();
666 let mut did_shrink = false;
667 if inner.flags.contains(MessageFlags::DATA_DIRTY) {
668 anyhow::bail!("Cannot shrink message: DATA_DIRTY");
669 }
670 if inner.flags.contains(MessageFlags::META_DIRTY) {
671 anyhow::bail!("Cannot shrink message: META_DIRTY");
672 }
673 if inner.metadata.take().is_some() {
674 META_COUNT.dec();
675 did_shrink = true;
676 }
677 if !inner.data.is_empty() {
678 DATA_COUNT.dec();
679 did_shrink = true;
680 }
681 if !inner.data.is_empty() {
682 inner.data = NO_DATA.clone();
683 did_shrink = true;
684 }
685 Ok(did_shrink)
686 }
687
688 pub async fn sender(&self) -> anyhow::Result<EnvelopeAddress> {
689 self.load_meta_if_needed().await?;
690 let inner = self.msg_and_id.inner.lock();
691 match &inner.metadata {
692 Some(meta) => Ok(meta.sender.clone()),
693 None => anyhow::bail!("Message::sender metadata is not loaded"),
694 }
695 }
696
697 pub async fn set_sender(&self, sender: EnvelopeAddress) -> anyhow::Result<()> {
698 self.load_meta_if_needed().await?;
699 let mut inner = self.msg_and_id.inner.lock();
700 match &mut inner.metadata {
701 Some(meta) => {
702 meta.sender = sender;
703 inner.flags.set(MessageFlags::DATA_DIRTY, true);
704 Ok(())
705 }
706 None => anyhow::bail!("Message::set_sender: metadata is not loaded"),
707 }
708 }
709
710 #[deprecated = "use recipient_list or first_recipient instead"]
711 pub async fn recipient(&self) -> anyhow::Result<EnvelopeAddress> {
712 self.first_recipient().await
713 }
714
715 pub async fn first_recipient(&self) -> anyhow::Result<EnvelopeAddress> {
716 self.load_meta_if_needed().await?;
717 let inner = self.msg_and_id.inner.lock();
718 match &inner.metadata {
719 Some(meta) => match meta.recipient.first() {
720 Some(recip) => Ok(recip.clone()),
721 None => anyhow::bail!("recipient list is empty!?"),
722 },
723 None => anyhow::bail!("Message::first_recipient: metadata is not loaded"),
724 }
725 }
726
727 pub async fn recipient_list(&self) -> anyhow::Result<Vec<EnvelopeAddress>> {
728 self.load_meta_if_needed().await?;
729 let inner = self.msg_and_id.inner.lock();
730 match &inner.metadata {
731 Some(meta) => Ok(meta.recipient.clone()),
732 None => anyhow::bail!("Message::recipient_list: metadata is not loaded"),
733 }
734 }
735
736 pub async fn recipient_list_string(&self) -> anyhow::Result<Vec<String>> {
737 self.load_meta_if_needed().await?;
738 let inner = self.msg_and_id.inner.lock();
739 match &inner.metadata {
740 Some(meta) => Ok(meta.recipient.iter().map(|a| a.to_string()).collect()),
741 None => anyhow::bail!("Message::recipient_list_string: metadata is not loaded"),
742 }
743 }
744
745 #[deprecated = "use set_recipient_list instead"]
746 pub async fn set_recipient(&self, recipient: EnvelopeAddress) -> anyhow::Result<()> {
747 self.set_recipient_list(vec![recipient]).await
748 }
749
750 pub async fn set_recipient_list(&self, recipient: Vec<EnvelopeAddress>) -> anyhow::Result<()> {
751 self.load_meta_if_needed().await?;
752
753 let mut inner = self.msg_and_id.inner.lock();
754 match &mut inner.metadata {
755 Some(meta) => {
756 meta.recipient = recipient;
757 inner.flags.set(MessageFlags::DATA_DIRTY, true);
758 Ok(())
759 }
760 None => anyhow::bail!("Message::set_recipient_list: metadata is not loaded"),
761 }
762 }
763
764 pub fn is_meta_loaded(&self) -> bool {
765 self.msg_and_id.inner.lock().metadata.is_some()
766 }
767
768 pub fn is_data_loaded(&self) -> bool {
769 !self.msg_and_id.inner.lock().data.is_empty()
770 }
771
772 pub async fn load_meta_if_needed(&self) -> anyhow::Result<()> {
773 if self.is_meta_loaded() {
774 return Ok(());
775 }
776 self.load_meta().await
777 }
778
779 pub async fn data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
780 self.load_data_if_needed().await
781 }
782
783 async fn load_data_if_needed(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
784 if self.is_data_loaded() {
785 return Ok(self.get_data_maybe_not_loaded());
786 }
787 self.load_data().await
788 }
789
790 pub async fn load_meta(&self) -> anyhow::Result<()> {
791 let _timer = LOAD_META_HIST.start_timer();
792 self.load_meta_from(&**get_meta_spool()).await
793 }
794
795 async fn load_meta_from(&self, meta_spool: &(dyn Spool + Send + Sync)) -> anyhow::Result<()> {
796 let id = self.id();
797 let data = meta_spool.load(*id).await?;
798 let mut inner = self.msg_and_id.inner.lock();
799 let was_not_loaded = inner.metadata.is_none();
800 let metadata: MetaData = serde_json::from_slice(&data)?;
801 inner.metadata.replace(Box::new(metadata));
802 if was_not_loaded {
803 META_COUNT.inc();
804 }
805 Ok(())
806 }
807
808 pub async fn load_data(&self) -> anyhow::Result<Arc<Box<[u8]>>> {
809 let _timer = LOAD_DATA_HIST.start_timer();
810 self.load_data_from(&**get_data_spool()).await
811 }
812
813 async fn load_data_from(
814 &self,
815 data_spool: &(dyn Spool + Send + Sync),
816 ) -> anyhow::Result<Arc<Box<[u8]>>> {
817 let data = data_spool.load(*self.id()).await?;
818 let mut inner = self.msg_and_id.inner.lock();
819 let was_empty = inner.data.is_empty();
820 inner.data = Arc::new(data.into_boxed_slice());
821 if was_empty {
822 DATA_COUNT.inc();
823 }
824 Ok(inner.data.clone())
825 }
826
827 pub fn assign_data(&self, data: Vec<u8>) {
828 let mut inner = self.msg_and_id.inner.lock();
829 let was_empty = inner.data.is_empty();
830 inner.data = Arc::new(data.into_boxed_slice());
831 inner.flags.set(MessageFlags::DATA_DIRTY, true);
832 if was_empty {
833 DATA_COUNT.inc();
834 }
835 }
836
837 pub fn get_data_maybe_not_loaded(&self) -> Arc<Box<[u8]>> {
838 let inner = self.msg_and_id.inner.lock();
839 inner.data.clone()
840 }
841
842 pub async fn set_meta<S: AsRef<str>, V: Into<serde_json::Value>>(
843 &self,
844 key: S,
845 value: V,
846 ) -> anyhow::Result<()> {
847 self.load_meta_if_needed().await?;
848 let mut inner = self.msg_and_id.inner.lock();
849 match &mut inner.metadata {
850 None => anyhow::bail!("set_meta: metadata must be loaded first"),
851 Some(meta) => {
852 let key = key.as_ref();
853 let value = value.into();
854
855 match &mut meta.meta {
856 serde_json::Value::Object(map) => {
857 map.insert(key.to_string(), value);
858 }
859 _ => anyhow::bail!("metadata is somehow not a json object"),
860 }
861
862 inner.flags.set(MessageFlags::META_DIRTY, true);
863 Ok(())
864 }
865 }
866 }
867
868 pub async fn unset_meta<S: AsRef<str>>(&self, key: S) -> anyhow::Result<()> {
869 self.load_meta_if_needed().await?;
870 let mut inner = self.msg_and_id.inner.lock();
871 match &mut inner.metadata {
872 None => anyhow::bail!("set_meta: metadata must be loaded first"),
873 Some(meta) => {
874 let key = key.as_ref();
875
876 match &mut meta.meta {
877 serde_json::Value::Object(map) => {
878 map.remove(key);
879 }
880 _ => anyhow::bail!("metadata is somehow not a json object"),
881 }
882
883 inner.flags.set(MessageFlags::META_DIRTY, true);
884 Ok(())
885 }
886 }
887 }
888
889 pub async fn get_meta_string<S: serde_json::value::Index + std::fmt::Display + Copy>(
891 &self,
892 key: S,
893 ) -> anyhow::Result<Option<String>> {
894 match self.get_meta(key).await {
895 Ok(serde_json::Value::String(value)) => Ok(Some(value.to_string())),
896 Ok(serde_json::Value::Null) => Ok(None),
897 hmm => {
898 anyhow::bail!("expected '{key}' to be a string value, got {hmm:?}");
899 }
900 }
901 }
902
903 pub async fn get_meta_obj(&self) -> anyhow::Result<serde_json::Value> {
904 self.load_meta_if_needed().await?;
905 let inner = self.msg_and_id.inner.lock();
906 match &inner.metadata {
907 None => anyhow::bail!("get_meta_obj: metadata must be loaded first"),
908 Some(meta) => Ok(meta.meta.clone()),
909 }
910 }
911
912 pub async fn get_meta<S: serde_json::value::Index>(
913 &self,
914 key: S,
915 ) -> anyhow::Result<serde_json::Value> {
916 self.load_meta_if_needed().await?;
917 let inner = self.msg_and_id.inner.lock();
918 match &inner.metadata {
919 None => anyhow::bail!("get_meta: metadata must be loaded first"),
920 Some(meta) => match meta.meta.get(key) {
921 Some(value) => Ok(value.clone()),
922 None => Ok(serde_json::Value::Null),
923 },
924 }
925 }
926
927 pub fn age(&self, now: DateTime<Utc>) -> chrono::Duration {
928 self.msg_and_id.id.age(now)
929 }
930
931 pub async fn get_queue_name(&self) -> anyhow::Result<String> {
932 Ok(match self.get_meta_string("queue").await? {
933 Some(name) => name,
934 None => {
935 let name = QueueNameComponents::format(
936 self.get_meta_string("campaign").await?,
937 self.get_meta_string("tenant").await?,
938 self.first_recipient()
939 .await?
940 .domain()
941 .to_string()
942 .to_lowercase(),
943 self.get_meta_string("routing_domain").await?,
944 );
945 name.to_string()
946 }
947 })
948 }
949
950 #[cfg(feature = "impl")]
951 pub async fn arc_verify(
952 &self,
953 opt_resolver_name: Option<String>,
954 ) -> anyhow::Result<AuthenticationResult> {
955 let resolver = get_resolver_instance(&opt_resolver_name)?;
956 let data = self.data().await?;
957 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
958
959 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
960 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
961
962 let arc = ARC::verify(&message, &**resolver).await;
963 Ok(arc.authentication_result())
964 }
965
966 #[cfg(feature = "impl")]
967 pub async fn arc_seal(
968 &self,
969 signer: Signer,
970 auth_results: AuthenticationResults,
971 opt_resolver_name: Option<String>,
972 ) -> anyhow::Result<()> {
973 let resolver = get_resolver_instance(&opt_resolver_name)?;
974 let data = self.data().await?;
975 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
976 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
977 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
978 let arc = ARC::verify(&message, &**resolver).await;
979
980 let headers = arc.seal(&message, auth_results, signer.signer())?;
981 if !headers.is_empty() {
982 let mut new_data = Vec::<u8>::with_capacity(data.len() + 1024);
983
984 for hdr in headers {
985 hdr.write_header(&mut new_data).ok();
986 }
987 new_data.extend_from_slice(&data);
988 self.assign_data(new_data);
989 }
990
991 Ok(())
992 }
993
994 #[cfg(feature = "impl")]
995 pub async fn dkim_verify(
996 &self,
997 opt_resolver_name: Option<String>,
998 ) -> anyhow::Result<Vec<AuthenticationResult>> {
999 let resolver = get_resolver_instance(&opt_resolver_name)?;
1000 let data = self.data().await?;
1001 let bytes = mailparsing::SharedString::try_from(data.as_ref().as_ref())?;
1002
1003 let parsed = mailparsing::Header::parse_headers(bytes.clone())?;
1004 if parsed
1005 .overall_conformance
1006 .contains(MessageConformance::NON_CANONICAL_LINE_ENDINGS)
1007 {
1008 return Ok(vec![AuthenticationResult {
1009 method: "dkim".into(),
1010 method_version: None,
1011 result: "permerror".into(),
1012 reason: Some("message has non-canonical line endings".into()),
1013 props: Default::default(),
1014 }]);
1015 }
1016 let message = kumo_dkim::ParsedEmail::HeaderOnlyParse { bytes, parsed };
1017
1018 let results = kumo_dkim::verify_email_with_resolver(&message, &**resolver).await?;
1019 Ok(results)
1020 }
1021
1022 pub async fn parse(&self) -> anyhow::Result<MimePart<'static>> {
1027 let data = self.data().await?;
1028 let owned_data = String::from_utf8_lossy(data.as_ref().as_ref()).to_string();
1029 Ok(MimePart::parse(owned_data)?)
1030 }
1031
1032 pub async fn parse_rfc3464(&self) -> anyhow::Result<Option<Report>> {
1033 let data = self.data().await?;
1034 Report::parse(&data)
1035 }
1036
1037 pub async fn parse_rfc5965(&self) -> anyhow::Result<Option<ARFReport>> {
1038 let data = self.data().await?;
1039 ARFReport::parse(&data)
1040 }
1041
1042 pub async fn prepend_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1043 let data = self.data().await?;
1044 let mut new_data = Vec::with_capacity(size_header(name, value) + 2 + data.len());
1045 emit_header(&mut new_data, name, value);
1046 new_data.extend_from_slice(&data);
1047 self.assign_data(new_data);
1048 Ok(())
1049 }
1050
1051 pub async fn append_header(&self, name: Option<&str>, value: &[u8]) -> anyhow::Result<()> {
1052 let data = self.data().await?;
1053 let mut new_data = Vec::with_capacity(size_header(name, value.as_bytes()) + 2 + data.len());
1054 for (idx, window) in data.windows(4).enumerate() {
1055 if window == b"\r\n\r\n" {
1056 let headers = &data[0..idx + 2];
1057 let body = &data[idx + 2..];
1058
1059 new_data.extend_from_slice(headers);
1060 emit_header(&mut new_data, name, value);
1061 new_data.extend_from_slice(body);
1062 self.assign_data(new_data);
1063 return Ok(());
1064 }
1065 }
1066
1067 anyhow::bail!("append_header could not find the end of the header block");
1068 }
1069
1070 pub async fn get_address_header(
1071 &self,
1072 header_name: &str,
1073 ) -> anyhow::Result<Option<HeaderAddressList>> {
1074 let data = self.data().await?;
1075 let HeaderParseResult { headers, .. } =
1076 mailparsing::Header::parse_headers(data.as_ref().as_ref())?;
1077
1078 match headers.get_first(header_name) {
1079 Some(hdr) => {
1080 let list = hdr.as_address_list()?;
1081 let result: HeaderAddressList = list.into();
1082 Ok(Some(result))
1083 }
1084 None => Ok(None),
1085 }
1086 }
1087
1088 pub async fn get_first_named_header_value(
1089 &self,
1090 name: &str,
1091 ) -> anyhow::Result<Option<BString>> {
1092 let data = self.data().await?;
1093 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1094
1095 match headers.get_first(name) {
1096 Some(hdr) => Ok(Some(
1097 hdr.as_unstructured()
1098 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1099 )),
1100 None => Ok(None),
1101 }
1102 }
1103
1104 pub async fn get_all_named_header_values(&self, name: &str) -> anyhow::Result<Vec<BString>> {
1105 let data = self.data().await?;
1106 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1107
1108 let mut values = vec![];
1109 for hdr in headers.iter_named(name) {
1110 values.push(
1111 hdr.as_unstructured()
1112 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1113 );
1114 }
1115 Ok(values)
1116 }
1117
1118 pub async fn get_all_headers(&self) -> anyhow::Result<Vec<(BString, BString)>> {
1119 let data = self.data().await?;
1120 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1121
1122 let mut values = vec![];
1123 for hdr in headers.iter() {
1124 values.push((
1125 hdr.get_name().into(),
1126 hdr.as_unstructured()
1127 .unwrap_or_else(|_| hdr.get_raw_value().into()),
1128 ));
1129 }
1130 Ok(values)
1131 }
1132
1133 pub async fn retain_headers<F: FnMut(&Header) -> bool>(
1134 &self,
1135 mut func: F,
1136 ) -> anyhow::Result<()> {
1137 let data = self.data().await?;
1138 let mut new_data = Vec::with_capacity(data.len());
1139 let HeaderParseResult {
1140 headers,
1141 body_offset,
1142 ..
1143 } = Header::parse_headers(data.as_ref().as_ref())?;
1144 for hdr in headers.iter() {
1145 let retain = (func)(hdr);
1146 if !retain {
1147 continue;
1148 }
1149 hdr.write_header(&mut new_data)?;
1150 }
1151 new_data.extend_from_slice(b"\r\n");
1152 new_data.extend_from_slice(&data[body_offset..]);
1153 self.assign_data(new_data);
1154 Ok(())
1155 }
1156
1157 pub async fn remove_first_named_header(&self, name: &str) -> anyhow::Result<()> {
1158 let mut removed = false;
1159 self.retain_headers(|hdr| {
1160 if hdr.get_name().eq_ignore_ascii_case(name.as_bytes()) && !removed {
1161 removed = true;
1162 false
1163 } else {
1164 true
1165 }
1166 })
1167 .await
1168 }
1169
1170 pub async fn import_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1171 let data = self.data().await?;
1172 let HeaderParseResult { headers, .. } = Header::parse_headers(data.as_ref().as_ref())?;
1173
1174 for hdr in headers.iter() {
1175 let do_import = if names.is_empty() {
1176 is_x_header(hdr.get_name())
1177 } else {
1178 is_header_in_names_list(hdr.get_name(), &names)
1179 };
1180 if do_import {
1181 let name = imported_header_name(&hdr.get_name_lossy());
1182 self.set_meta(name, hdr.as_unstructured()?.to_str_lossy())
1183 .await?;
1184 }
1185 }
1186
1187 Ok(())
1188 }
1189
1190 pub async fn remove_x_headers(&self, names: Vec<String>) -> anyhow::Result<()> {
1191 self.retain_headers(|hdr| {
1192 if names.is_empty() {
1193 !is_x_header(hdr.get_name())
1194 } else {
1195 !is_header_in_names_list(hdr.get_name(), &names)
1196 }
1197 })
1198 .await
1199 }
1200
1201 pub async fn remove_all_named_headers(&self, name: &str) -> anyhow::Result<()> {
1202 self.retain_headers(|hdr| !hdr.get_name().eq_ignore_ascii_case(name.as_bytes()))
1203 .await
1204 }
1205
1206 #[cfg(feature = "impl")]
1207 pub async fn dkim_sign(&self, signer: Signer) -> anyhow::Result<()> {
1208 let data = self.data().await?;
1209 let header = if let Some(runtime) = SIGN_POOL.get() {
1210 runtime.spawn_blocking(move || signer.sign(&data)).await??
1211 } else {
1212 signer.sign(&data)?
1213 };
1214 self.prepend_header(None, header.as_bytes()).await?;
1215 Ok(())
1216 }
1217
1218 pub async fn import_scheduling_header(
1219 &self,
1220 header_name: &str,
1221 remove: bool,
1222 ) -> anyhow::Result<Option<Scheduling>> {
1223 if let Some(value) = self.get_first_named_header_value(header_name).await? {
1224 let sched: Scheduling = serde_json::from_slice(&value).with_context(|| {
1225 format!("{value} from header {header_name} is not a valid Scheduling header")
1226 })?;
1227 let result = self.set_scheduling(Some(sched)).await?;
1228
1229 if remove {
1230 self.remove_all_named_headers(header_name).await?;
1231 }
1232
1233 Ok(result)
1234 } else {
1235 Ok(None)
1236 }
1237 }
1238
1239 pub async fn append_text_plain(&self, content: &str) -> anyhow::Result<bool> {
1240 let data = self.data().await?;
1241 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1242 let parts = msg.simplified_structure_pointers()?;
1243 if let Some(p) = parts.text_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1244 match p.body()? {
1245 DecodedBody::Text(text) => {
1246 let mut text = text.as_bytes().to_vec();
1247 text.push_str("\r\n");
1248 text.push_str(content);
1249 p.replace_text_body("text/plain", &*text)?;
1250
1251 let new_data = msg.to_message_bytes();
1252 self.assign_data(new_data);
1253 Ok(true)
1254 }
1255 DecodedBody::Binary(_) => {
1256 anyhow::bail!("expected text/plain part to be text, but it is binary");
1257 }
1258 }
1259 } else {
1260 Ok(false)
1261 }
1262 }
1263
1264 pub async fn append_text_html(&self, content: &str) -> anyhow::Result<bool> {
1265 let data = self.data().await?;
1266 let mut msg = MimePart::parse(data.as_ref().as_ref())?;
1267 let parts = msg.simplified_structure_pointers()?;
1268 if let Some(p) = parts.html_part.and_then(|p| msg.resolve_ptr_mut(p)) {
1269 match p.body()? {
1270 DecodedBody::Text(text) => {
1271 let mut text = text.as_bytes().to_vec();
1272
1273 match text.rfind("</body>").or_else(|| text.rfind("</BODY>")) {
1274 Some(idx) => {
1275 text.insert_str(idx, content);
1276 text.insert_str(idx, "\r\n");
1277 }
1278 None => {
1279 text.push_str("\r\n");
1281 text.push_str(content);
1282 }
1283 }
1284
1285 p.replace_text_body("text/html", &*text)?;
1286
1287 let new_data = msg.to_message_bytes();
1288 self.assign_data(new_data);
1289 Ok(true)
1290 }
1291 DecodedBody::Binary(_) => {
1292 anyhow::bail!("expected text/html part to be text, but it is binary");
1293 }
1294 }
1295 } else {
1296 Ok(false)
1297 }
1298 }
1299
1300 pub async fn check_fix_conformance(
1301 &self,
1302 check: MessageConformance,
1303 fix: MessageConformance,
1304 settings: Option<&CheckFixSettings>,
1305 ) -> anyhow::Result<()> {
1306 let data = self.data().await?;
1307 let data_bytes = data.as_ref().as_ref();
1308 let msg = MimePart::parse(data_bytes)?;
1309
1310 let mut settings = settings.map(Clone::clone).unwrap_or_default();
1311
1312 if fix.contains(MessageConformance::MISSING_MESSAGE_ID_HEADER)
1313 && settings.message_id.is_none()
1314 && matches!(msg.headers().message_id(), Err(_) | Ok(None))
1315 {
1316 let sender = self.sender().await?;
1317 let domain = sender.domain();
1318 let id = *self.id();
1319 settings.message_id.replace(format!("{id}@{domain}"));
1320 }
1321
1322 if settings.detect_encoding {
1323 settings.data_bytes.replace(data.clone());
1324 }
1325
1326 let opt_msg = msg.check_fix_conformance(check, fix, settings)?;
1327
1328 if let Some(msg) = opt_msg {
1329 let new_data = msg.to_message_bytes();
1330 self.assign_data(new_data);
1331 }
1332
1333 Ok(())
1334 }
1335}
1336
1337fn is_header_in_names_list(hdr_name: &[u8], names: &[String]) -> bool {
1338 for name in names {
1339 if hdr_name.eq_ignore_ascii_case(name.as_bytes()) {
1340 return true;
1341 }
1342 }
1343 false
1344}
1345
1346fn imported_header_name(name: &str) -> String {
1347 name.chars()
1348 .map(|c| match c.to_ascii_lowercase() {
1349 '-' => '_',
1350 c => c,
1351 })
1352 .collect()
1353}
1354
1355fn is_x_header(name: &[u8]) -> bool {
1356 name.starts_with_str("X-") || name.starts_with_str("x-")
1357}
1358
1359fn size_header(name: Option<&str>, value: &[u8]) -> usize {
1360 name.map(|name| name.len() + 2).unwrap_or(0) + value.len()
1361}
1362
1363fn emit_header(dest: &mut Vec<u8>, name: Option<&str>, value: &[u8]) {
1364 if let Some(name) = name {
1365 dest.extend_from_slice(name.as_bytes());
1366 dest.extend_from_slice(b": ");
1367 }
1368 dest.extend_from_slice(value);
1369 if !value.ends_with_str("\r\n") {
1370 dest.extend_from_slice(b"\r\n");
1371 }
1372}
1373
1374#[cfg(feature = "impl")]
1375impl UserData for Message {
1376 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
1377 methods.add_async_method(
1378 "set_meta",
1379 move |_, this, (name, value): (String, mlua::Value)| async move {
1380 let value = serde_json::value::to_value(value).map_err(any_err)?;
1381 this.set_meta(name, value).await.map_err(any_err)?;
1382 Ok(())
1383 },
1384 );
1385 methods.add_async_method("get_meta", move |lua, this, name: String| async move {
1386 let value = this.get_meta(name).await.map_err(any_err)?;
1387 Ok(Some(lua.to_value_with(&value, serialize_options())?))
1388 });
1389 methods.add_async_method("get_data", |lua, this, _: ()| async move {
1390 let data = this.data().await.map_err(any_err)?;
1391 lua.create_string(&*data)
1392 });
1393 methods.add_method("set_data", move |_lua, this, data: mlua::String| {
1394 this.assign_data(data.as_bytes().to_vec());
1395 Ok(())
1396 });
1397
1398 methods.add_async_method("parse_mime", |_lua, this, _: ()| async move {
1399 let data = this.data().await.map_err(any_err)?;
1400 let owned_data = BString::new(data.as_ref().to_vec());
1401 let part = MimePart::parse(owned_data).map_err(any_err)?;
1402 Ok(mod_mimepart::PartRef::new(part))
1403 });
1404
1405 methods.add_async_method("append_text_plain", |_lua, this, data: String| async move {
1406 this.append_text_plain(&data).await.map_err(any_err)
1407 });
1408
1409 methods.add_async_method("append_text_html", |_lua, this, data: String| async move {
1410 this.append_text_html(&data).await.map_err(any_err)
1411 });
1412
1413 methods.add_method("id", move |_, this, _: ()| Ok(this.id().to_string()));
1414 methods.add_async_method("sender", move |_, this, _: ()| async move {
1415 this.sender().await.map_err(any_err)
1416 });
1417
1418 methods.add_method("num_attempts", move |_, this, _: ()| {
1419 Ok(this.get_num_attempts())
1420 });
1421 methods.add_method("increment_num_attempts", move |_, this, _: ()| {
1422 Ok(this.increment_num_attempts())
1423 });
1424
1425 methods.add_async_method("queue_name", move |_, this, _: ()| async move {
1426 this.get_queue_name().await.map_err(any_err)
1427 });
1428
1429 methods.add_async_method("set_due", move |lua, this, due: mlua::Value| async move {
1430 let due: Option<DateTime<Utc>> = lua.from_value(due)?;
1431 let revised_due = this.set_due(due).await.map_err(any_err)?;
1432 lua.to_value(&revised_due)
1433 });
1434
1435 methods.add_async_method(
1436 "set_sender",
1437 move |lua, this, value: mlua::Value| async move {
1438 let sender = match value {
1439 mlua::Value::String(s) => {
1440 let s = s.to_str()?;
1441 EnvelopeAddress::parse(&s).map_err(any_err)?
1442 }
1443 _ => lua.from_value::<EnvelopeAddress>(value.clone())?,
1444 };
1445 this.set_sender(sender).await.map_err(any_err)
1446 },
1447 );
1448
1449 methods.add_async_method("recipient", move |lua, this, _: ()| async move {
1450 let mut recipients = this.recipient_list().await.map_err(any_err)?;
1451 match recipients.len() {
1452 0 => Ok(mlua::Value::Nil),
1453 1 => {
1454 let recip: EnvelopeAddress = recipients.pop().expect("have 1");
1455 recip.into_lua(&lua)
1456 }
1457 _ => recipients.into_lua(&lua),
1458 }
1459 });
1460
1461 methods.add_async_method("recipient_list", move |lua, this, _: ()| async move {
1462 let recipients = this.recipient_list().await.map_err(any_err)?;
1463 recipients.into_lua(&lua)
1464 });
1465
1466 methods.add_async_method(
1467 "set_recipient",
1468 move |lua, this, value: mlua::Value| async move {
1469 let recipients = match value {
1470 mlua::Value::String(s) => {
1471 let s = s.to_str()?;
1472 vec![EnvelopeAddress::parse(&s).map_err(any_err)?]
1473 }
1474 _ => {
1475 if let Ok(recips) = lua.from_value::<Vec<EnvelopeAddress>>(value.clone()) {
1476 recips
1477 } else {
1478 vec![lua.from_value::<EnvelopeAddress>(value.clone())?]
1479 }
1480 }
1481 };
1482 this.set_recipient_list(recipients).await.map_err(any_err)
1483 },
1484 );
1485
1486 #[cfg(feature = "impl")]
1487 methods.add_async_method("dkim_sign", |_, this, signer: Signer| async move {
1488 this.dkim_sign(signer).await.map_err(any_err)
1489 });
1490
1491 methods.add_async_method("shrink", |_, this, _: ()| async move {
1492 if this.needs_save() {
1493 this.save(None).await.map_err(any_err)?;
1494 }
1495 this.shrink().map_err(any_err)
1496 });
1497
1498 methods.add_async_method("shrink_data", |_, this, _: ()| async move {
1499 if this.needs_save() {
1500 this.save(None).await.map_err(any_err)?;
1501 }
1502 this.shrink_data().map_err(any_err)
1503 });
1504
1505 methods.add_async_method(
1506 "add_authentication_results",
1507 |lua, this, (serv_id, results): (mlua::String, mlua::Value)| async move {
1508 let results: Vec<AuthenticationResult> = lua.from_value(results)?;
1509 let results = AuthenticationResults {
1510 serv_id: serv_id.as_bytes().as_ref().into(),
1511 version: None,
1512 results,
1513 };
1514
1515 this.prepend_header(
1516 Some("Authentication-Results"),
1517 results.encode_value().as_bytes(),
1518 )
1519 .await
1520 .map_err(any_err)?;
1521
1522 Ok(())
1523 },
1524 );
1525
1526 #[cfg(feature = "impl")]
1527 methods.add_async_method(
1528 "arc_verify",
1529 |lua, this, opt_resolver_name: Option<String>| async move {
1530 let results = this.arc_verify(opt_resolver_name).await.map_err(any_err)?;
1531 lua.to_value_with(&results, serialize_options())
1532 },
1533 );
1534
1535 #[cfg(feature = "impl")]
1536 methods.add_async_method(
1537 "arc_seal",
1538 |lua,
1539 this,
1540 (signer, serv_id, auth_res, opt_resolver_name): (
1541 Signer,
1542 mlua::String,
1543 mlua::Value,
1544 Option<String>,
1545 )| async move {
1546 let results: Vec<AuthenticationResult> = lua.from_value(auth_res)?;
1547 this.arc_seal(
1548 signer,
1549 AuthenticationResults {
1550 serv_id: serv_id.as_bytes().as_ref().into(),
1551 version: None,
1552 results,
1553 },
1554 opt_resolver_name,
1555 )
1556 .await
1557 .map_err(any_err)
1558 },
1559 );
1560
1561 #[cfg(feature = "impl")]
1562 methods.add_async_method(
1563 "dkim_verify",
1564 |lua, this, opt_resolver_name: Option<String>| async move {
1565 let results = this.dkim_verify(opt_resolver_name).await.map_err(any_err)?;
1566 lua.to_value_with(&results, serialize_options())
1567 },
1568 );
1569
1570 methods.add_async_method(
1571 "prepend_header",
1572 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1573 let encode = encode.unwrap_or(false);
1574 if encode {
1575 let header = Header::new_unstructured(name.clone(), value);
1576 this.prepend_header(Some(&name), header.get_raw_value())
1577 .await
1578 .map_err(any_err)?;
1579 } else {
1580 this.prepend_header(Some(&name), value.as_bytes())
1581 .await
1582 .map_err(any_err)?;
1583 }
1584 Ok(())
1585 },
1586 );
1587 methods.add_async_method(
1588 "append_header",
1589 |_, this, (name, value, encode): (String, String, Option<bool>)| async move {
1590 let encode = encode.unwrap_or(false);
1591 if encode {
1592 let header = Header::new_unstructured(name.clone(), value);
1593 this.append_header(Some(&name), header.get_raw_value())
1594 .await
1595 .map_err(any_err)?;
1596 } else {
1597 this.append_header(Some(&name), value.as_bytes())
1598 .await
1599 .map_err(any_err)?;
1600 }
1601 Ok(())
1602 },
1603 );
1604 methods.add_async_method("get_address_header", |_, this, name: String| async move {
1605 this.get_address_header(&name).await.map_err(any_err)
1606 });
1607 methods.add_async_method("from_header", |_, this, ()| async move {
1608 this.get_address_header("From").await.map_err(any_err)
1609 });
1610 methods.add_async_method("to_header", |_, this, ()| async move {
1611 this.get_address_header("To").await.map_err(any_err)
1612 });
1613
1614 methods.add_async_method(
1615 "get_first_named_header_value",
1616 |_, this, name: String| async move {
1617 this.get_first_named_header_value(&name)
1618 .await
1619 .map_err(any_err)
1620 },
1621 );
1622 methods.add_async_method(
1623 "get_all_named_header_values",
1624 |_, this, name: String| async move {
1625 this.get_all_named_header_values(&name)
1626 .await
1627 .map_err(any_err)
1628 },
1629 );
1630 methods.add_async_method("get_all_headers", |_, this, _: ()| async move {
1631 Ok(this
1632 .get_all_headers()
1633 .await
1634 .map_err(any_err)?
1635 .into_iter()
1636 .map(|(name, value)| vec![name, value])
1637 .collect::<Vec<Vec<BString>>>())
1638 });
1639 methods.add_async_method(
1640 "import_x_headers",
1641 |_, this, names: Option<Vec<String>>| async move {
1642 this.import_x_headers(names.unwrap_or_default())
1643 .await
1644 .map_err(any_err)
1645 },
1646 );
1647
1648 methods.add_async_method(
1649 "remove_x_headers",
1650 |_, this, names: Option<Vec<String>>| async move {
1651 this.remove_x_headers(names.unwrap_or_default())
1652 .await
1653 .map_err(any_err)
1654 },
1655 );
1656 methods.add_async_method(
1657 "remove_all_named_headers",
1658 |_, this, name: String| async move {
1659 this.remove_all_named_headers(&name).await.map_err(any_err)
1660 },
1661 );
1662
1663 methods.add_async_method(
1664 "import_scheduling_header",
1665 |lua, this, (header_name, remove): (String, bool)| async move {
1666 let opt_schedule = this
1667 .import_scheduling_header(&header_name, remove)
1668 .await
1669 .map_err(any_err)?;
1670 lua.to_value(&opt_schedule)
1671 },
1672 );
1673
1674 methods.add_async_method(
1675 "set_scheduling",
1676 move |lua, this, params: mlua::Value| async move {
1677 let sched: Option<Scheduling> = from_lua_value(&lua, params)?;
1678 let opt_schedule = this.set_scheduling(sched).await.map_err(any_err)?;
1679 lua.to_value(&opt_schedule)
1680 },
1681 );
1682
1683 methods.add_async_method("parse_rfc3464", |lua, this, _: ()| async move {
1684 let report = this.parse_rfc3464().await.map_err(any_err)?;
1685 match report {
1686 Some(report) => lua.to_value_with(&report, serialize_options()),
1687 None => Ok(mlua::Value::Nil),
1688 }
1689 });
1690
1691 methods.add_async_method("parse_rfc5965", |lua, this, _: ()| async move {
1692 let report = this.parse_rfc5965().await.map_err(any_err)?;
1693 match report {
1694 Some(report) => lua.to_value_with(&report, serialize_options()),
1695 None => Ok(mlua::Value::Nil),
1696 }
1697 });
1698
1699 methods.add_async_method("save", |_, this, ()| async move {
1700 this.save(None).await.map_err(any_err)
1701 });
1702
1703 methods.add_method("set_force_sync", move |_, this, force: bool| {
1704 this.set_force_sync(force);
1705 Ok(())
1706 });
1707
1708 methods.add_async_method(
1709 "check_fix_conformance",
1710 |lua, this, (check, fix, settings): (String, String, Option<mlua::Value>)| async move {
1711 use std::str::FromStr;
1712 let check = MessageConformance::from_str(&check).map_err(any_err)?;
1713 let fix = MessageConformance::from_str(&fix).map_err(any_err)?;
1714
1715 let settings = match settings {
1716 Some(v) => Some(lua.from_value(v).map_err(any_err)?),
1717 None => None,
1718 };
1719
1720 match this
1721 .check_fix_conformance(check, fix, settings.as_ref())
1722 .await
1723 {
1724 Ok(_) => Ok(None),
1725 Err(err) => Ok(Some(format!("{err:#}"))),
1726 }
1727 },
1728 );
1729 }
1730}
1731
1732impl TimerEntryWithDelay for WeakMessage {
1733 fn delay(&self) -> Duration {
1734 match self.upgrade() {
1735 None => {
1736 Duration::from_millis(0)
1738 }
1739 Some(msg) => msg.delay(),
1740 }
1741 }
1742}
1743
1744impl TimerEntryWithDelay for Message {
1745 fn delay(&self) -> Duration {
1746 let inner = self.msg_and_id.inner.lock();
1747 match inner.due {
1748 Some(time) => {
1749 let now = Utc::now();
1750 let delta = time - now;
1751 delta.to_std().unwrap_or(Duration::from_millis(0))
1752 }
1753 None => Duration::from_millis(0),
1754 }
1755 }
1756}
1757
1758#[cfg(test)]
1759pub(crate) mod test {
1760 use super::*;
1761 use serde_json::json;
1762
1763 pub fn new_msg_body<S: AsRef<[u8]>>(s: S) -> Message {
1764 Message::new_dirty(
1765 SpoolId::new(),
1766 EnvelopeAddress::parse("sender@example.com").unwrap(),
1767 vec![EnvelopeAddress::parse("recip@example.com").unwrap()],
1768 serde_json::json!({}),
1769 Arc::new(s.as_ref().to_vec().into_boxed_slice()),
1770 )
1771 .unwrap()
1772 }
1773
1774 fn data_as_string(msg: &Message) -> String {
1775 String::from_utf8(msg.get_data_maybe_not_loaded().to_vec()).unwrap()
1776 }
1777
1778 const X_HDR_CONTENT: &str =
1779 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody";
1780
1781 #[tokio::test]
1782 async fn import_all_x_headers() {
1783 let msg = new_msg_body(X_HDR_CONTENT);
1784
1785 msg.import_x_headers(vec![]).await.unwrap();
1786 k9::assert_equal!(
1787 msg.get_meta_obj().await.unwrap(),
1788 json!({
1789 "x_hello": "there",
1790 "x_header": "value",
1791 })
1792 );
1793 }
1794
1795 #[tokio::test]
1796 async fn meta_and_nil() {
1797 let msg = new_msg_body(X_HDR_CONTENT);
1798 msg.set_meta("test", serde_json::Value::Null).await.unwrap();
1800 k9::assert_equal!(msg.get_meta("test").await.unwrap(), serde_json::Value::Null);
1801
1802 let lua = mlua::Lua::new();
1804 lua.globals().set("msg", msg).unwrap();
1805 lua.load("assert(msg:get_meta('test') == nil)")
1806 .exec()
1807 .unwrap();
1808 }
1809
1810 #[tokio::test]
1811 async fn import_some_x_headers() {
1812 let msg = new_msg_body(X_HDR_CONTENT);
1813
1814 msg.import_x_headers(vec!["x-hello".to_string()])
1815 .await
1816 .unwrap();
1817 k9::assert_equal!(
1818 msg.get_meta_obj().await.unwrap(),
1819 json!({
1820 "x_hello": "there",
1821 })
1822 );
1823 }
1824
1825 #[tokio::test]
1826 async fn remove_all_x_headers() {
1827 let msg = new_msg_body(X_HDR_CONTENT);
1828
1829 msg.remove_x_headers(vec![]).await.unwrap();
1830 k9::assert_equal!(
1831 data_as_string(&msg),
1832 "Subject: Hello\r\nFrom :Someone\r\n\r\nBody"
1833 );
1834 }
1835
1836 #[tokio::test]
1837 async fn prepend_header_2_params() {
1838 let msg = new_msg_body(X_HDR_CONTENT);
1839
1840 msg.prepend_header(Some("Date"), b"Today").await.unwrap();
1841 k9::assert_equal!(
1842 data_as_string(&msg),
1843 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1844 );
1845 }
1846
1847 #[tokio::test]
1848 async fn prepend_header_1_params() {
1849 let msg = new_msg_body(X_HDR_CONTENT);
1850
1851 msg.prepend_header(None, b"Date: Today").await.unwrap();
1852 k9::assert_equal!(
1853 data_as_string(&msg),
1854 "Date: Today\r\nX-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\n\r\nBody"
1855 );
1856 }
1857
1858 #[tokio::test]
1859 async fn append_header_2_params() {
1860 let msg = new_msg_body(X_HDR_CONTENT);
1861
1862 msg.append_header(Some("Date"), b"Today").await.unwrap();
1863 k9::assert_equal!(
1864 data_as_string(&msg),
1865 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1866 );
1867 }
1868
1869 #[tokio::test]
1870 async fn append_header_1_params() {
1871 let msg = new_msg_body(X_HDR_CONTENT);
1872
1873 msg.append_header(None, b"Date: Today").await.unwrap();
1874 k9::assert_equal!(
1875 data_as_string(&msg),
1876 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nFrom :Someone\r\nDate: Today\r\n\r\nBody"
1877 );
1878 }
1879
1880 const MULTI_HEADER_CONTENT: &str =
1881 "X-Hello: there\r\nX-Header: value\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody";
1882
1883 #[tokio::test]
1884 async fn get_first_header() {
1885 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1886 k9::assert_equal!(
1887 msg.get_first_named_header_value("X-header")
1888 .await
1889 .unwrap()
1890 .unwrap(),
1891 "value"
1892 );
1893 }
1894
1895 #[tokio::test]
1896 async fn get_all_header() {
1897 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1898 k9::assert_equal!(
1899 msg.get_all_named_header_values("X-header").await.unwrap(),
1900 vec!["value".to_string(), "another value".to_string()]
1901 );
1902 }
1903
1904 #[tokio::test]
1905 async fn remove_first() {
1906 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1907 msg.remove_first_named_header("X-header").await.unwrap();
1908 k9::assert_equal!(
1909 data_as_string(&msg),
1910 "X-Hello: there\r\nSubject: Hello\r\nX-Header: another value\r\nFrom :Someone@somewhere\r\n\r\nBody"
1911 );
1912 }
1913
1914 #[tokio::test]
1915 async fn remove_all() {
1916 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1917 msg.remove_all_named_headers("X-header").await.unwrap();
1918 k9::assert_equal!(
1919 data_as_string(&msg),
1920 "X-Hello: there\r\nSubject: Hello\r\nFrom :Someone@somewhere\r\n\r\nBody"
1921 );
1922 }
1923
1924 #[tokio::test]
1925 async fn append_text_plain() {
1926 let msg = new_msg_body(MULTI_HEADER_CONTENT);
1927 msg.append_text_plain("I am at the bottom").await.unwrap();
1928 k9::assert_equal!(
1929 data_as_string(&msg),
1930 "X-Hello: there\r\n\
1931 X-Header: value\r\n\
1932 Subject: Hello\r\n\
1933 X-Header: another value\r\n\
1934 From :Someone@somewhere\r\n\
1935 Content-Type: text/plain;\r\n\
1936 \tcharset=\"us-ascii\"\r\n\
1937 \r\n\
1938 Body\r\n\
1939 I am at the bottom\r\n"
1940 );
1941 }
1942
1943 const MIXED_CONTENT: &str = "Content-Type: multipart/mixed;\r\n\
1944\tboundary=\"my-boundary\"\r\n\
1945\r\n\
1946--my-boundary\r\n\
1947Content-Type: text/plain;\r\n\
1948\tcharset=\"us-ascii\"\r\n\
1949\r\n\
1950plain text\r\n\
1951--my-boundary\r\n\
1952Content-Type: text/html;\r\n\
1953\tcharset=\"us-ascii\"\r\n\
1954\r\n\
1955<b>rich</b> text\r\n\
1956--my-boundary\r\n\
1957Content-Type: application/octet-stream\r\n\
1958Content-Transfer-Encoding: base64\r\n\
1959Content-Disposition: attachment;\r\n\
1960\tfilename=\"woot.bin\"\r\n\
1961Content-ID: <woot.id@somewhere>\r\n\
1962\r\n\
1963AAECAw==\r\n\
1964--my-boundary--\r\n\
1965\r\n";
1966
1967 const MIXED_CONTENT_ENCLOSING_BODY: &str = "Content-Type: multipart/mixed;\r\n\
1968\tboundary=\"my-boundary\"\r\n\
1969\r\n\
1970--my-boundary\r\n\
1971Content-Type: text/plain;\r\n\
1972\tcharset=\"us-ascii\"\r\n\
1973\r\n\
1974plain text\r\n\
1975--my-boundary\r\n\
1976Content-Type: text/html;\r\n\
1977\tcharset=\"us-ascii\"\r\n\
1978\r\n\
1979<BODY>\r\n\
1980<b>rich</b> text\r\n\
1981</BODY>\r\n\
1982--my-boundary\r\n\
1983Content-Type: application/octet-stream\r\n\
1984Content-Transfer-Encoding: base64\r\n\
1985Content-Disposition: attachment;\r\n\
1986\tfilename=\"woot.bin\"\r\n\
1987Content-ID: <woot.id>\r\n\
1988\r\n\
1989AAECAw==\r\n\
1990--my-boundary--\r\n\
1991\r\n";
1992
1993 #[tokio::test]
1994 async fn append_text_html() {
1995 let msg = new_msg_body(MIXED_CONTENT);
1996 msg.append_text_html("bottom html").await.unwrap();
1997 k9::snapshot!(
1998 data_as_string(&msg),
1999 r#"
2000Content-Type: multipart/mixed;\r
2001\tboundary="my-boundary"\r
2002\r
2003--my-boundary\r
2004Content-Type: text/plain;\r
2005\tcharset="us-ascii"\r
2006\r
2007plain text\r
2008--my-boundary\r
2009Content-Type: text/html;\r
2010\tcharset="us-ascii"\r
2011\r
2012<b>rich</b> text\r
2013\r
2014bottom html\r
2015--my-boundary\r
2016Content-Type: application/octet-stream\r
2017Content-Transfer-Encoding: base64\r
2018Content-Disposition: attachment;\r
2019\tfilename="woot.bin"\r
2020Content-ID: <woot.id@somewhere>\r
2021\r
2022AAECAw==\r
2023--my-boundary--\r
2024\r
2025
2026"#
2027 );
2028
2029 let msg = new_msg_body(MIXED_CONTENT_ENCLOSING_BODY);
2030 msg.append_text_html("bottom html 👻").await.unwrap();
2031 k9::snapshot!(
2032 data_as_string(&msg),
2033 r#"
2034Content-Type: multipart/mixed;\r
2035\tboundary="my-boundary"\r
2036\r
2037--my-boundary\r
2038Content-Type: text/plain;\r
2039\tcharset="us-ascii"\r
2040\r
2041plain text\r
2042--my-boundary\r
2043Content-Type: text/html;\r
2044\tcharset="utf-8"\r
2045Content-Transfer-Encoding: quoted-printable\r
2046\r
2047<BODY>\r
2048<b>rich</b> text\r
2049\r
2050bottom html =F0=9F=91=BB</BODY>\r
2051--my-boundary\r
2052Content-Type: application/octet-stream\r
2053Content-Transfer-Encoding: base64\r
2054Content-Disposition: attachment;\r
2055\tfilename="woot.bin"\r
2056Content-ID: <woot.id>\r
2057\r
2058AAECAw==\r
2059--my-boundary--\r
2060\r
2061
2062"#
2063 );
2064 }
2065
2066 #[tokio::test]
2067 async fn append_text_plain_mixed() {
2068 let msg = new_msg_body(MIXED_CONTENT);
2069 msg.append_text_plain("bottom text 👾").await.unwrap();
2070 k9::snapshot!(
2071 data_as_string(&msg),
2072 r#"
2073Content-Type: multipart/mixed;\r
2074\tboundary="my-boundary"\r
2075\r
2076--my-boundary\r
2077Content-Type: text/plain;\r
2078\tcharset="utf-8"\r
2079Content-Transfer-Encoding: quoted-printable\r
2080\r
2081plain text\r
2082\r
2083bottom text =F0=9F=91=BE\r
2084--my-boundary\r
2085Content-Type: text/html;\r
2086\tcharset="us-ascii"\r
2087\r
2088<b>rich</b> text\r
2089--my-boundary\r
2090Content-Type: application/octet-stream\r
2091Content-Transfer-Encoding: base64\r
2092Content-Disposition: attachment;\r
2093\tfilename="woot.bin"\r
2094Content-ID: <woot.id@somewhere>\r
2095\r
2096AAECAw==\r
2097--my-boundary--\r
2098\r
2099
2100"#
2101 );
2102 }
2103
2104 #[tokio::test]
2105 async fn check_conformance_angle_msg_id() {
2106 const DOUBLE_ANGLE_ONLY: &str = "Subject: hello\r
2107Message-ID: <<1234@example.com>>\r
2108\r
2109Hello";
2110 let msg = new_msg_body(DOUBLE_ANGLE_ONLY);
2111 k9::snapshot!(
2112 msg.check_fix_conformance(
2113 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2114 MessageConformance::empty(),
2115 None,
2116 )
2117 .await
2118 .unwrap_err()
2119 .to_string(),
2120 "Message has conformance issues: MISSING_MESSAGE_ID_HEADER"
2121 );
2122
2123 msg.check_fix_conformance(
2124 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2125 MessageConformance::MISSING_MESSAGE_ID_HEADER,
2126 None,
2127 )
2128 .await
2129 .unwrap();
2130
2131 const DOUBLE_ANGLE_AND_LONG_LINE: &str = "Subject: hello\r
2146Message-ID: <<1234@example.com>>\r
2147\r
2148Hello this is a really long line Hello this is a really long line \
2149Hello this is a really long line Hello this is a really long line \
2150Hello this is a really long line Hello this is a really long line \
2151Hello this is a really long line Hello this is a really long line \
2152Hello this is a really long line Hello this is a really long line \
2153Hello this is a really long line Hello this is a really long line \
2154Hello this is a really long line Hello this is a really long line
2155";
2156 let msg = new_msg_body(DOUBLE_ANGLE_AND_LONG_LINE);
2157 msg.check_fix_conformance(
2158 MessageConformance::MISSING_COLON_VALUE,
2159 MessageConformance::MISSING_MESSAGE_ID_HEADER | MessageConformance::LINE_TOO_LONG,
2160 None,
2161 )
2162 .await
2163 .unwrap();
2164
2165 }
2189
2190 #[tokio::test]
2191 async fn check_conformance() {
2192 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2193 msg.check_fix_conformance(
2194 MessageConformance::default(),
2195 MessageConformance::MISSING_MIME_VERSION,
2196 None,
2197 )
2198 .await
2199 .unwrap();
2200 k9::snapshot!(
2201 data_as_string(&msg),
2202 r#"
2203X-Hello: there\r
2204X-Header: value\r
2205Subject: Hello\r
2206X-Header: another value\r
2207From :Someone@somewhere\r
2208Mime-Version: 1.0\r
2209\r
2210Body
2211"#
2212 );
2213
2214 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2215 msg.check_fix_conformance(
2216 MessageConformance::default(),
2217 MessageConformance::MISSING_MIME_VERSION | MessageConformance::NAME_ENDS_WITH_SPACE,
2218 None,
2219 )
2220 .await
2221 .unwrap();
2222 k9::snapshot!(
2223 data_as_string(&msg),
2224 r#"
2225Content-Type: text/plain;\r
2226\tcharset="us-ascii"\r
2227X-Hello: there\r
2228X-Header: value\r
2229Subject: Hello\r
2230X-Header: another value\r
2231From: <Someone@somewhere>\r
2232Mime-Version: 1.0\r
2233\r
2234Body\r
2235
2236"#
2237 );
2238 }
2239
2240 #[tokio::test]
2241 async fn check_fix_latin_input() {
2242 const POUNDS: &[u8] = b"Subject: \xa3\r\n\r\nGBP\r\n";
2243 let msg = new_msg_body(&*POUNDS);
2244 msg.check_fix_conformance(
2245 MessageConformance::default(),
2246 MessageConformance::NEEDS_TRANSFER_ENCODING,
2247 Some(&CheckFixSettings {
2248 detect_encoding: true,
2249 include_encodings: vec!["iso-8859-1".to_string()],
2250 ..Default::default()
2251 }),
2252 )
2253 .await
2254 .unwrap();
2255
2256 let subject = msg
2257 .get_first_named_header_value("subject")
2258 .await
2259 .unwrap()
2260 .unwrap();
2261 assert_eq!(subject, "£");
2262 }
2263
2264 #[tokio::test]
2265 async fn set_scheduling() -> anyhow::Result<()> {
2266 let msg = new_msg_body(MULTI_HEADER_CONTENT);
2267 assert!(msg.get_due().is_none(), "due is implicitly now");
2268
2269 let now = Utc::now();
2270 let one_day = chrono::Duration::try_days(1).expect("1 day to be valid");
2271
2272 msg.set_scheduling(Some(Scheduling {
2273 restriction: None,
2274 first_attempt: Some((now + one_day).into()),
2275 expires: None,
2276 }))
2277 .await?;
2278
2279 let due = msg.get_due().expect("due to now be set");
2280 assert!(due - now >= one_day, "due time is at least 1 day away");
2281
2282 Ok(())
2283 }
2284
2285 #[cfg(all(test, target_pointer_width = "64"))]
2286 #[test]
2287 fn sizes() {
2288 assert_eq!(std::mem::size_of::<Message>(), 8);
2289 assert_eq!(std::mem::size_of::<MessageInner>(), 32);
2290 assert_eq!(std::mem::size_of::<MessageWithId>(), 72);
2291 }
2292}