kumo_log_types/lib.rs
1use crate::rfc5965::ARFReport;
2use bounce_classify::BounceClass;
3use chrono::{DateTime, Utc};
4use kumo_address::host_or_socket::HostOrSocketAddress;
5use kumo_address::socket::SocketAddress;
6use rfc5321::Response;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use serde_with::formats::PreferOne;
10use serde_with::{serde_as, OneOrMany};
11use std::borrow::Cow;
12use std::collections::HashMap;
13use std::net::SocketAddr;
14use uuid::Uuid;
15
16pub mod rfc3464;
17pub mod rfc5965;
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20pub struct ResolvedAddress {
21 pub name: String,
22 pub addr: HostOrSocketAddress,
23}
24
25impl std::fmt::Display for ResolvedAddress {
26 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
27 let addr = format!("{}", self.addr);
28 if addr == self.name {
29 // likely: unix domain socket path
30 write!(fmt, "{addr}")
31 } else {
32 write!(fmt, "{}/{addr}", self.name)
33 }
34 }
35}
36
37#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
38pub enum RecordType {
39 /// Recorded by a receiving listener
40 Reception,
41 /// Recorded by the delivery side, most likely as a
42 /// result of attempting a delivery to a remote host
43 Delivery,
44 Bounce,
45 TransientFailure,
46 /// Recorded when a message is expiring from the queue
47 Expiration,
48 /// Administratively failed
49 AdminBounce,
50 /// Contains information about an OOB bounce
51 OOB,
52 /// Contains a feedback report
53 Feedback,
54
55 /// SMTP Listener responded with a 4xx or 5xx
56 Rejection,
57
58 /// Administratively rebound from one queue to another
59 AdminRebind,
60
61 /// Moved from the special deferred injection queue
62 /// and into some other queue
63 DeferredInjectionRebind,
64
65 /// Explains why a message was put into the scheduled queue
66 Delayed,
67
68 /// Message was transferred to another kumomta node,
69 /// which is now responsible for delivery.
70 XferOut,
71
72 /// Message was received from another kumomta node,
73 /// and we are now responsible for delivery.
74 XferIn,
75
76 /// Special for matching anything in the logging config
77 Any,
78}
79
80impl RecordType {
81 /// Returns true if it makes sense to run the corresponding record
82 /// through the bounce classifier module.
83 /// The rule of thumb for that is if the response came from the
84 /// destination when attempting delivery, but we also include
85 /// administrative bounces and message expirations.
86 pub const fn is_bounce_classifiable(&self) -> bool {
87 match self {
88 Self::Any
89 | Self::Reception
90 | Self::Delivery
91 | Self::DeferredInjectionRebind
92 | Self::AdminRebind
93 | Self::XferOut
94 | Self::XferIn
95 | Self::Delayed => false,
96 Self::Bounce
97 | Self::TransientFailure
98 | Self::Expiration
99 | Self::AdminBounce
100 | Self::OOB
101 | Self::Feedback
102 | Self::Rejection => true,
103 }
104 }
105}
106
107/// Unfortunately, when we defined the `timestamp` and `created` fields
108/// in the log structure, we made the decision to log as the integer
109/// unix timestamp format, which causes us to discard the sub-second
110/// information that we otherwise have available.
111///
112/// We'd like to now include the full info in the serialized log
113/// record, without bloating the in-memory representation, or otherwise
114/// explicitly duplicating data to arrange for serde to emit it for us.
115///
116/// That's where this macro comes in; it allows us to serialize those
117/// fields via a proxy type that effectively causes serde to emit two
118/// different serializations of the same value.
119///
120/// Usage is: `ts_serializer(MODULE_NAME, STRUCT_NAME, SECONDS_FIELD, FULL_FIELD)`
121///
122/// The MODULE_NAME and STRUCT_NAME are not especially important and
123/// are really present just for namespacing.
124///
125/// The SECONDS_FIELD defines the name of the field to be emitted
126/// as the unix timestamp (in seconds).
127///
128/// The FULL_FIELD defines the name of the field to be emitted
129/// as the full RFC 3339 datetime.
130///
131/// The macro defines a module and struct that can be used as a proxy
132/// for serialization.
133///
134/// To actually use it, you need to annotate the field in the struct;
135///
136/// ```norun
137/// #[serde(flatten, with = "ts_serializer")]
138/// pub timestamp: DateTime<Utc>,
139/// ```
140///
141/// It is important that `flatten` is used to avoid serde emitting
142/// a nested/child struct, and the `with` attribute is what points
143/// the serialization to the defined proxy module; it must
144/// reference the MODULE_NAME you defined.
145macro_rules! ts_serializer {
146 ($module:ident, $name:ident, $seconds:ident, $full:ident) => {
147 mod $module {
148 use super::*;
149 use serde::{Deserializer, Serializer};
150
151 #[derive(Serialize, Deserialize, Copy, Clone, Eq, Hash, Default, Debug, PartialEq)]
152 struct $name {
153 #[serde(with = "chrono::serde::ts_seconds")]
154 pub $seconds: DateTime<Utc>,
155
156 /// Optional for backwards compatibility: we don't
157 /// expect $full to be present, but we'll take it
158 /// if it is!
159 #[serde(default)]
160 pub $full: Option<DateTime<Utc>>,
161 }
162
163 impl std::ops::Deref for $name {
164 type Target = DateTime<Utc>;
165 fn deref(&self) -> &DateTime<Utc> {
166 self.$full.as_ref().unwrap_or(&self.$seconds)
167 }
168 }
169
170 impl<T: chrono::TimeZone> From<DateTime<T>> for $name
171 where
172 DateTime<Utc>: From<DateTime<T>>,
173 {
174 fn from(value: DateTime<T>) -> $name {
175 let timestamp: DateTime<Utc> = value.into();
176 $name {
177 $seconds: timestamp,
178 $full: Some(timestamp),
179 }
180 }
181 }
182
183 impl From<$name> for DateTime<Utc> {
184 fn from(value: $name) -> DateTime<Utc> {
185 *value
186 }
187 }
188
189 pub fn serialize<S>(d: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
190 where
191 S: Serializer,
192 {
193 let proxy: $name = (*d).into();
194 proxy.serialize(s)
195 }
196
197 pub fn deserialize<'a, D>(d: D) -> Result<DateTime<Utc>, D::Error>
198 where
199 D: Deserializer<'a>,
200 {
201 $name::deserialize(d).map(|p| p.into())
202 }
203 }
204 };
205}
206
207ts_serializer!(ts_serializer, TimestampSerializer, timestamp, event_time);
208ts_serializer!(ct_serializer, CreationTimeSerializer, created, created_time);
209
210#[serde_as]
211#[derive(Serialize, Deserialize, Debug, Clone)]
212pub struct JsonLogRecord {
213 /// What kind of record this is
214 #[serde(rename = "type")]
215 pub kind: RecordType,
216 /// The message id
217 pub id: String,
218 /// The envelope sender
219 pub sender: String,
220 /// The envelope recipient
221 #[serde_as(as = "OneOrMany<_, PreferOne>")]
222 pub recipient: Vec<String>,
223 /// Which named queue the message was associated with
224 pub queue: String,
225 /// Which MX site the message was being delivered to
226 pub site: String,
227 /// The size of the message, in bytes
228 pub size: u64,
229 /// The response from/to the peer
230 pub response: Response,
231 /// The address of the peer, and our sense of its
232 /// hostname or EHLO domain
233 pub peer_address: Option<ResolvedAddress>,
234 /// The time at which we are logging this event
235 #[serde(flatten, with = "ts_serializer")]
236 pub timestamp: DateTime<Utc>,
237 /// The time at which the message was initially received and created
238 #[serde(flatten, with = "ct_serializer")]
239 pub created: DateTime<Utc>,
240 /// The number of delivery attempts that have been made.
241 /// Note that this may be approximate after a restart; use the
242 /// number of logged events to determine the true number
243 pub num_attempts: u16,
244
245 pub bounce_classification: BounceClass,
246
247 pub egress_pool: Option<String>,
248 pub egress_source: Option<String>,
249 pub source_address: Option<MaybeProxiedSourceAddress>,
250
251 pub feedback_report: Option<Box<ARFReport>>,
252
253 pub meta: HashMap<String, Value>,
254 pub headers: HashMap<String, Value>,
255
256 /// The protocol used to deliver, or attempt to deliver, this message
257 pub delivery_protocol: Option<String>,
258
259 /// The protocol used to receive this message
260 pub reception_protocol: Option<String>,
261
262 /// The id of the node on which the event occurred
263 pub nodeid: Uuid,
264
265 /// The TLS Cipher used, if applicable
266 #[serde(skip_serializing_if = "Option::is_none")]
267 pub tls_cipher: Option<String>,
268
269 /// The TLS protocol version used, if applicable
270 #[serde(skip_serializing_if = "Option::is_none")]
271 pub tls_protocol_version: Option<String>,
272
273 /// The Subject Name from the peer TLS certificate, if applicable
274 #[serde(skip_serializing_if = "Option::is_none")]
275 pub tls_peer_subject_name: Option<Vec<String>>,
276
277 /// The provider name, if any.
278 /// This is a way of grouping destination sites operated
279 /// by the same provider.
280 #[serde(skip_serializing_if = "Option::is_none")]
281 pub provider_name: Option<String>,
282
283 /// Uuid identifying a connection/session for either inbound
284 /// or outbound (depending on the type of the record).
285 /// This is useful when correlating a series of messages to
286 /// the same connection for either ingress or egress
287 #[serde(skip_serializing_if = "Option::is_none")]
288 pub session_id: Option<Uuid>,
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct MaybeProxiedSourceAddress {
293 pub address: SocketAddress,
294 #[serde(skip_serializing_if = "Option::is_none")]
295 pub server: Option<SocketAddr>,
296 #[serde(skip_serializing_if = "Option::is_none")]
297 pub protocol: Option<Cow<'static, str>>,
298}
299
300#[cfg(all(test, target_pointer_width = "64"))]
301#[test]
302fn sizes() {
303 assert_eq!(std::mem::size_of::<JsonLogRecord>(), 712);
304}