kumo_log_types/
lib.rs

1use crate::rfc5965::ARFReport;
2use bounce_classify::BounceClass;
3use chrono::{DateTime, Utc};
4use kumo_address::host_or_socket::HostOrSocketAddress;
5use kumo_address::socket::SocketAddress;
6use rfc5321::Response;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use serde_with::formats::PreferOne;
10use serde_with::{serde_as, OneOrMany};
11use std::borrow::Cow;
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use uuid::Uuid;
15
16pub mod rfc3464;
17pub mod rfc5965;
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20pub struct ResolvedAddress {
21    pub name: String,
22    pub addr: HostOrSocketAddress,
23}
24
25impl std::fmt::Display for ResolvedAddress {
26    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
27        let addr = format!("{}", self.addr);
28        if addr == self.name {
29            // likely: unix domain socket path
30            write!(fmt, "{addr}")
31        } else {
32            write!(fmt, "{}/{addr}", self.name)
33        }
34    }
35}
36
37#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
38pub enum RecordType {
39    /// Recorded by a receiving listener
40    Reception,
41    /// Recorded by the delivery side, most likely as a
42    /// result of attempting a delivery to a remote host
43    Delivery,
44    Bounce,
45    TransientFailure,
46    /// Recorded when a message is expiring from the queue
47    Expiration,
48    /// Administratively failed
49    AdminBounce,
50    /// Contains information about an OOB bounce
51    OOB,
52    /// Contains a feedback report
53    Feedback,
54
55    /// SMTP Listener responded with a 4xx or 5xx
56    Rejection,
57
58    /// Administratively rebound from one queue to another
59    AdminRebind,
60
61    /// Moved from the special deferred injection queue
62    /// and into some other queue
63    DeferredInjectionRebind,
64
65    /// Explains why a message was put into the scheduled queue
66    Delayed,
67
68    /// Message was transferred to another kumomta node,
69    /// which is now responsible for delivery.
70    XferOut,
71
72    /// Message was received from another kumomta node,
73    /// and we are now responsible for delivery.
74    XferIn,
75
76    /// Special for matching anything in the logging config
77    Any,
78}
79
80impl RecordType {
81    /// Returns true if it makes sense to run the corresponding record
82    /// through the bounce classifier module.
83    /// The rule of thumb for that is if the response came from the
84    /// destination when attempting delivery, but we also include
85    /// administrative bounces and message expirations.
86    pub const fn is_bounce_classifiable(&self) -> bool {
87        match self {
88            Self::Any
89            | Self::Reception
90            | Self::Delivery
91            | Self::DeferredInjectionRebind
92            | Self::AdminRebind
93            | Self::XferOut
94            | Self::XferIn
95            | Self::Delayed => false,
96            Self::Bounce
97            | Self::TransientFailure
98            | Self::Expiration
99            | Self::AdminBounce
100            | Self::OOB
101            | Self::Feedback
102            | Self::Rejection => true,
103        }
104    }
105}
106
107/// Unfortunately, when we defined the `timestamp` and `created` fields
108/// in the log structure, we made the decision to log as the integer
109/// unix timestamp format, which causes us to discard the sub-second
110/// information that we otherwise have available.
111///
112/// We'd like to now include the full info in the serialized log
113/// record, without bloating the in-memory representation, or otherwise
114/// explicitly duplicating data to arrange for serde to emit it for us.
115///
116/// That's where this macro comes in; it allows us to serialize those
117/// fields via a proxy type that effectively causes serde to emit two
118/// different serializations of the same value.
119///
120/// Usage is: `ts_serializer(MODULE_NAME, STRUCT_NAME, SECONDS_FIELD, FULL_FIELD)`
121///
122/// The MODULE_NAME and STRUCT_NAME are not especially important and
123/// are really present just for namespacing.
124///
125/// The SECONDS_FIELD defines the name of the field to be emitted
126/// as the unix timestamp (in seconds).
127///
128/// The FULL_FIELD defines the name of the field to be emitted
129/// as the full RFC 3339 datetime.
130///
131/// The macro defines a module and struct that can be used as a proxy
132/// for serialization.
133///
134/// To actually use it, you need to annotate the field in the struct;
135///
136/// ```norun
137/// #[serde(flatten, with = "ts_serializer")]
138/// pub timestamp: DateTime<Utc>,
139/// ```
140///
141/// It is important that `flatten` is used to avoid serde emitting
142/// a nested/child struct, and the `with` attribute is what points
143/// the serialization to the defined proxy module; it must
144/// reference the MODULE_NAME you defined.
145macro_rules! ts_serializer {
146    ($module:ident, $name:ident, $seconds:ident, $full:ident) => {
147        mod $module {
148            use super::*;
149            use serde::{Deserializer, Serializer};
150
151            #[derive(Serialize, Deserialize, Copy, Clone, Eq, Hash, Default, Debug, PartialEq)]
152            struct $name {
153                #[serde(with = "chrono::serde::ts_seconds")]
154                pub $seconds: DateTime<Utc>,
155
156                /// Optional for backwards compatibility: we don't
157                /// expect $full to be present, but we'll take it
158                /// if it is!
159                #[serde(default)]
160                pub $full: Option<DateTime<Utc>>,
161            }
162
163            impl std::ops::Deref for $name {
164                type Target = DateTime<Utc>;
165                fn deref(&self) -> &DateTime<Utc> {
166                    self.$full.as_ref().unwrap_or(&self.$seconds)
167                }
168            }
169
170            impl<T: chrono::TimeZone> From<DateTime<T>> for $name
171            where
172                DateTime<Utc>: From<DateTime<T>>,
173            {
174                fn from(value: DateTime<T>) -> $name {
175                    let timestamp: DateTime<Utc> = value.into();
176                    $name {
177                        $seconds: timestamp,
178                        $full: Some(timestamp),
179                    }
180                }
181            }
182
183            impl From<$name> for DateTime<Utc> {
184                fn from(value: $name) -> DateTime<Utc> {
185                    *value
186                }
187            }
188
189            pub fn serialize<S>(d: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
190            where
191                S: Serializer,
192            {
193                let proxy: $name = (*d).into();
194                proxy.serialize(s)
195            }
196
197            pub fn deserialize<'a, D>(d: D) -> Result<DateTime<Utc>, D::Error>
198            where
199                D: Deserializer<'a>,
200            {
201                $name::deserialize(d).map(|p| p.into())
202            }
203        }
204    };
205}
206
207ts_serializer!(ts_serializer, TimestampSerializer, timestamp, event_time);
208ts_serializer!(ct_serializer, CreationTimeSerializer, created, created_time);
209
210#[serde_as]
211#[derive(Serialize, Deserialize, Debug, Clone)]
212pub struct JsonLogRecord {
213    /// What kind of record this is
214    #[serde(rename = "type")]
215    pub kind: RecordType,
216    /// The message id
217    pub id: String,
218    /// The envelope sender
219    pub sender: String,
220    /// The envelope recipient
221    #[serde_as(as = "OneOrMany<_, PreferOne>")]
222    pub recipient: Vec<String>,
223    /// Which named queue the message was associated with
224    pub queue: String,
225    /// Which MX site the message was being delivered to
226    pub site: String,
227    /// The size of the message, in bytes
228    pub size: u64,
229    /// The response from/to the peer
230    pub response: Response,
231    /// The address of the peer, and our sense of its
232    /// hostname or EHLO domain
233    pub peer_address: Option<ResolvedAddress>,
234    /// The time at which we are logging this event
235    #[serde(flatten, with = "ts_serializer")]
236    pub timestamp: DateTime<Utc>,
237    /// The time at which the message was initially received and created
238    #[serde(flatten, with = "ct_serializer")]
239    pub created: DateTime<Utc>,
240    /// The number of delivery attempts that have been made.
241    /// Note that this may be approximate after a restart; use the
242    /// number of logged events to determine the true number
243    pub num_attempts: u16,
244
245    pub bounce_classification: BounceClass,
246
247    pub egress_pool: Option<String>,
248    pub egress_source: Option<String>,
249    pub source_address: Option<MaybeProxiedSourceAddress>,
250
251    pub feedback_report: Option<Box<ARFReport>>,
252
253    pub meta: HashMap<String, Value>,
254    pub headers: HashMap<String, Value>,
255
256    /// The protocol used to deliver, or attempt to deliver, this message
257    pub delivery_protocol: Option<String>,
258
259    /// The protocol used to receive this message
260    pub reception_protocol: Option<String>,
261
262    /// The id of the node on which the event occurred
263    pub nodeid: Uuid,
264
265    /// The TLS Cipher used, if applicable
266    #[serde(skip_serializing_if = "Option::is_none")]
267    pub tls_cipher: Option<String>,
268
269    /// The TLS protocol version used, if applicable
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub tls_protocol_version: Option<String>,
272
273    /// The Subject Name from the peer TLS certificate, if applicable
274    #[serde(skip_serializing_if = "Option::is_none")]
275    pub tls_peer_subject_name: Option<Vec<String>>,
276
277    /// The provider name, if any.
278    /// This is a way of grouping destination sites operated
279    /// by the same provider.
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub provider_name: Option<String>,
282
283    /// Uuid identifying a connection/session for either inbound
284    /// or outbound (depending on the type of the record).
285    /// This is useful when correlating a series of messages to
286    /// the same connection for either ingress or egress
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub session_id: Option<Uuid>,
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct MaybeProxiedSourceAddress {
293    pub address: SocketAddress,
294    #[serde(skip_serializing_if = "Option::is_none")]
295    pub server: Option<SocketAddr>,
296    #[serde(skip_serializing_if = "Option::is_none")]
297    pub protocol: Option<Cow<'static, str>>,
298}
299
300#[cfg(all(test, target_pointer_width = "64"))]
301#[test]
302fn sizes() {
303    assert_eq!(std::mem::size_of::<JsonLogRecord>(), 712);
304}