kumo_log_types/
lib.rs

1use crate::rfc5965::ARFReport;
2use bounce_classify::BounceClass;
3use chrono::{DateTime, Utc};
4use kumo_address::host_or_socket::HostOrSocketAddress;
5use kumo_address::socket::SocketAddress;
6use rfc5321::Response;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::borrow::Cow;
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use uuid::Uuid;
13
14pub mod rfc3464;
15pub mod rfc5965;
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub struct ResolvedAddress {
19    pub name: String,
20    pub addr: HostOrSocketAddress,
21}
22
23impl std::fmt::Display for ResolvedAddress {
24    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
25        let addr = format!("{}", self.addr);
26        if addr == self.name {
27            // likely: unix domain socket path
28            write!(fmt, "{addr}")
29        } else {
30            write!(fmt, "{}/{addr}", self.name)
31        }
32    }
33}
34
35#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
36pub enum RecordType {
37    /// Recorded by a receiving listener
38    Reception,
39    /// Recorded by the delivery side, most likely as a
40    /// result of attempting a delivery to a remote host
41    Delivery,
42    Bounce,
43    TransientFailure,
44    /// Recorded when a message is expiring from the queue
45    Expiration,
46    /// Administratively failed
47    AdminBounce,
48    /// Contains information about an OOB bounce
49    OOB,
50    /// Contains a feedback report
51    Feedback,
52
53    /// SMTP Listener responded with a 4xx or 5xx
54    Rejection,
55
56    /// Administratively rebound from one queue to another
57    AdminRebind,
58
59    /// Moved from the special deferred injection queue
60    /// and into some other queue
61    DeferredInjectionRebind,
62
63    /// Explains why a message was put into the scheduled queue
64    Delayed,
65
66    /// Special for matching anything in the logging config
67    Any,
68}
69
70impl RecordType {
71    /// Returns true if it makes sense to run the corresponding record
72    /// through the bounce classifier module.
73    /// The rule of thumb for that is if the response came from the
74    /// destination when attempting delivery, but we also include
75    /// administrative bounces and message expirations.
76    pub const fn is_bounce_classifiable(&self) -> bool {
77        match self {
78            Self::Any
79            | Self::Reception
80            | Self::Delivery
81            | Self::DeferredInjectionRebind
82            | Self::AdminRebind
83            | Self::Delayed => false,
84            Self::Bounce
85            | Self::TransientFailure
86            | Self::Expiration
87            | Self::AdminBounce
88            | Self::OOB
89            | Self::Feedback
90            | Self::Rejection => true,
91        }
92    }
93}
94
95/// Unfortunately, when we defined the `timestamp` and `created` fields
96/// in the log structure, we made the decision to log as the integer
97/// unix timestamp format, which causes us to discard the sub-second
98/// information that we otherwise have available.
99///
100/// We'd like to now include the full info in the serialized log
101/// record, without bloating the in-memory representation, or otherwise
102/// explicitly duplicating data to arrange for serde to emit it for us.
103///
104/// That's where this macro comes in; it allows us to serialize those
105/// fields via a proxy type that effectively causes serde to emit two
106/// different serializations of the same value.
107///
108/// Usage is: `ts_serializer(MODULE_NAME, STRUCT_NAME, SECONDS_FIELD, FULL_FIELD)`
109///
110/// The MODULE_NAME and STRUCT_NAME are not especially important and
111/// are really present just for namespacing.
112///
113/// The SECONDS_FIELD defines the name of the field to be emitted
114/// as the unix timestamp (in seconds).
115///
116/// The FULL_FIELD defines the name of the field to be emitted
117/// as the full RFC 3339 datetime.
118///
119/// The macro defines a module and struct that can be used as a proxy
120/// for serialization.
121///
122/// To actually use it, you need to annotate the field in the struct;
123///
124/// ```norun
125/// #[serde(flatten, with = "ts_serializer")]
126/// pub timestamp: DateTime<Utc>,
127/// ```
128///
129/// It is important that `flatten` is used to avoid serde emitting
130/// a nested/child struct, and the `with` attribute is what points
131/// the serialization to the defined proxy module; it must
132/// reference the MODULE_NAME you defined.
133macro_rules! ts_serializer {
134    ($module:ident, $name:ident, $seconds:ident, $full:ident) => {
135        mod $module {
136            use super::*;
137            use serde::{Deserializer, Serializer};
138
139            #[derive(Serialize, Deserialize, Copy, Clone, Eq, Hash, Default, Debug, PartialEq)]
140            struct $name {
141                #[serde(with = "chrono::serde::ts_seconds")]
142                pub $seconds: DateTime<Utc>,
143
144                /// Optional for backwards compatibility: we don't
145                /// expect $full to be present, but we'll take it
146                /// if it is!
147                #[serde(default)]
148                pub $full: Option<DateTime<Utc>>,
149            }
150
151            impl std::ops::Deref for $name {
152                type Target = DateTime<Utc>;
153                fn deref(&self) -> &DateTime<Utc> {
154                    self.$full.as_ref().unwrap_or(&self.$seconds)
155                }
156            }
157
158            impl<T: chrono::TimeZone> From<DateTime<T>> for $name
159            where
160                DateTime<Utc>: From<DateTime<T>>,
161            {
162                fn from(value: DateTime<T>) -> $name {
163                    let timestamp: DateTime<Utc> = value.into();
164                    $name {
165                        $seconds: timestamp,
166                        $full: Some(timestamp),
167                    }
168                }
169            }
170
171            impl From<$name> for DateTime<Utc> {
172                fn from(value: $name) -> DateTime<Utc> {
173                    *value
174                }
175            }
176
177            pub fn serialize<S>(d: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
178            where
179                S: Serializer,
180            {
181                let proxy: $name = (*d).into();
182                proxy.serialize(s)
183            }
184
185            pub fn deserialize<'a, D>(d: D) -> Result<DateTime<Utc>, D::Error>
186            where
187                D: Deserializer<'a>,
188            {
189                $name::deserialize(d).map(|p| p.into())
190            }
191        }
192    };
193}
194
195ts_serializer!(ts_serializer, TimestampSerializer, timestamp, event_time);
196ts_serializer!(ct_serializer, CreationTimeSerializer, created, created_time);
197
198#[derive(Serialize, Deserialize, Debug, Clone)]
199pub struct JsonLogRecord {
200    /// What kind of record this is
201    #[serde(rename = "type")]
202    pub kind: RecordType,
203    /// The message id
204    pub id: String,
205    /// The envelope sender
206    pub sender: String,
207    /// The envelope recipient
208    pub recipient: String,
209    /// Which named queue the message was associated with
210    pub queue: String,
211    /// Which MX site the message was being delivered to
212    pub site: String,
213    /// The size of the message, in bytes
214    pub size: u64,
215    /// The response from/to the peer
216    pub response: Response,
217    /// The address of the peer, and our sense of its
218    /// hostname or EHLO domain
219    pub peer_address: Option<ResolvedAddress>,
220    /// The time at which we are logging this event
221    #[serde(flatten, with = "ts_serializer")]
222    pub timestamp: DateTime<Utc>,
223    /// The time at which the message was initially received and created
224    #[serde(flatten, with = "ct_serializer")]
225    pub created: DateTime<Utc>,
226    /// The number of delivery attempts that have been made.
227    /// Note that this may be approximate after a restart; use the
228    /// number of logged events to determine the true number
229    pub num_attempts: u16,
230
231    pub bounce_classification: BounceClass,
232
233    pub egress_pool: Option<String>,
234    pub egress_source: Option<String>,
235    pub source_address: Option<MaybeProxiedSourceAddress>,
236
237    pub feedback_report: Option<Box<ARFReport>>,
238
239    pub meta: HashMap<String, Value>,
240    pub headers: HashMap<String, Value>,
241
242    /// The protocol used to deliver, or attempt to deliver, this message
243    pub delivery_protocol: Option<String>,
244
245    /// The protocol used to receive this message
246    pub reception_protocol: Option<String>,
247
248    /// The id of the node on which the event occurred
249    pub nodeid: Uuid,
250
251    /// The TLS Cipher used, if applicable
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub tls_cipher: Option<String>,
254
255    /// The TLS protocol version used, if applicable
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub tls_protocol_version: Option<String>,
258
259    /// The Subject Name from the peer TLS certificate, if applicable
260    #[serde(skip_serializing_if = "Option::is_none")]
261    pub tls_peer_subject_name: Option<Vec<String>>,
262
263    /// The provider name, if any.
264    /// This is a way of grouping destination sites operated
265    /// by the same provider.
266    #[serde(skip_serializing_if = "Option::is_none")]
267    pub provider_name: Option<String>,
268
269    /// Uuid identifying a connection/session for either inbound
270    /// or outbound (depending on the type of the record).
271    /// This is useful when correlating a series of messages to
272    /// the same connection for either ingress or egress
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub session_id: Option<Uuid>,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct MaybeProxiedSourceAddress {
279    pub address: SocketAddress,
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub server: Option<SocketAddr>,
282    #[serde(skip_serializing_if = "Option::is_none")]
283    pub protocol: Option<Cow<'static, str>>,
284}
285
286#[cfg(all(test, target_pointer_width = "64"))]
287#[test]
288fn sizes() {
289    assert_eq!(std::mem::size_of::<JsonLogRecord>(), 712);
290}