kumo_log_types/lib.rs
1use crate::rfc5965::ARFReport;
2use bounce_classify::BounceClass;
3use chrono::{DateTime, Utc};
4use kumo_address::host_or_socket::HostOrSocketAddress;
5use kumo_address::socket::SocketAddress;
6use rfc5321::Response;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::borrow::Cow;
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use uuid::Uuid;
13
14pub mod rfc3464;
15pub mod rfc5965;
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub struct ResolvedAddress {
19 pub name: String,
20 pub addr: HostOrSocketAddress,
21}
22
23impl std::fmt::Display for ResolvedAddress {
24 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
25 let addr = format!("{}", self.addr);
26 if addr == self.name {
27 // likely: unix domain socket path
28 write!(fmt, "{addr}")
29 } else {
30 write!(fmt, "{}/{addr}", self.name)
31 }
32 }
33}
34
35#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
36pub enum RecordType {
37 /// Recorded by a receiving listener
38 Reception,
39 /// Recorded by the delivery side, most likely as a
40 /// result of attempting a delivery to a remote host
41 Delivery,
42 Bounce,
43 TransientFailure,
44 /// Recorded when a message is expiring from the queue
45 Expiration,
46 /// Administratively failed
47 AdminBounce,
48 /// Contains information about an OOB bounce
49 OOB,
50 /// Contains a feedback report
51 Feedback,
52
53 /// SMTP Listener responded with a 4xx or 5xx
54 Rejection,
55
56 /// Administratively rebound from one queue to another
57 AdminRebind,
58
59 /// Moved from the special deferred injection queue
60 /// and into some other queue
61 DeferredInjectionRebind,
62
63 /// Explains why a message was put into the scheduled queue
64 Delayed,
65
66 /// Special for matching anything in the logging config
67 Any,
68}
69
70impl RecordType {
71 /// Returns true if it makes sense to run the corresponding record
72 /// through the bounce classifier module.
73 /// The rule of thumb for that is if the response came from the
74 /// destination when attempting delivery, but we also include
75 /// administrative bounces and message expirations.
76 pub const fn is_bounce_classifiable(&self) -> bool {
77 match self {
78 Self::Any
79 | Self::Reception
80 | Self::Delivery
81 | Self::DeferredInjectionRebind
82 | Self::AdminRebind
83 | Self::Delayed => false,
84 Self::Bounce
85 | Self::TransientFailure
86 | Self::Expiration
87 | Self::AdminBounce
88 | Self::OOB
89 | Self::Feedback
90 | Self::Rejection => true,
91 }
92 }
93}
94
95/// Unfortunately, when we defined the `timestamp` and `created` fields
96/// in the log structure, we made the decision to log as the integer
97/// unix timestamp format, which causes us to discard the sub-second
98/// information that we otherwise have available.
99///
100/// We'd like to now include the full info in the serialized log
101/// record, without bloating the in-memory representation, or otherwise
102/// explicitly duplicating data to arrange for serde to emit it for us.
103///
104/// That's where this macro comes in; it allows us to serialize those
105/// fields via a proxy type that effectively causes serde to emit two
106/// different serializations of the same value.
107///
108/// Usage is: `ts_serializer(MODULE_NAME, STRUCT_NAME, SECONDS_FIELD, FULL_FIELD)`
109///
110/// The MODULE_NAME and STRUCT_NAME are not especially important and
111/// are really present just for namespacing.
112///
113/// The SECONDS_FIELD defines the name of the field to be emitted
114/// as the unix timestamp (in seconds).
115///
116/// The FULL_FIELD defines the name of the field to be emitted
117/// as the full RFC 3339 datetime.
118///
119/// The macro defines a module and struct that can be used as a proxy
120/// for serialization.
121///
122/// To actually use it, you need to annotate the field in the struct;
123///
124/// ```norun
125/// #[serde(flatten, with = "ts_serializer")]
126/// pub timestamp: DateTime<Utc>,
127/// ```
128///
129/// It is important that `flatten` is used to avoid serde emitting
130/// a nested/child struct, and the `with` attribute is what points
131/// the serialization to the defined proxy module; it must
132/// reference the MODULE_NAME you defined.
133macro_rules! ts_serializer {
134 ($module:ident, $name:ident, $seconds:ident, $full:ident) => {
135 mod $module {
136 use super::*;
137 use serde::{Deserializer, Serializer};
138
139 #[derive(Serialize, Deserialize, Copy, Clone, Eq, Hash, Default, Debug, PartialEq)]
140 struct $name {
141 #[serde(with = "chrono::serde::ts_seconds")]
142 pub $seconds: DateTime<Utc>,
143
144 /// Optional for backwards compatibility: we don't
145 /// expect $full to be present, but we'll take it
146 /// if it is!
147 #[serde(default)]
148 pub $full: Option<DateTime<Utc>>,
149 }
150
151 impl std::ops::Deref for $name {
152 type Target = DateTime<Utc>;
153 fn deref(&self) -> &DateTime<Utc> {
154 self.$full.as_ref().unwrap_or(&self.$seconds)
155 }
156 }
157
158 impl<T: chrono::TimeZone> From<DateTime<T>> for $name
159 where
160 DateTime<Utc>: From<DateTime<T>>,
161 {
162 fn from(value: DateTime<T>) -> $name {
163 let timestamp: DateTime<Utc> = value.into();
164 $name {
165 $seconds: timestamp,
166 $full: Some(timestamp),
167 }
168 }
169 }
170
171 impl From<$name> for DateTime<Utc> {
172 fn from(value: $name) -> DateTime<Utc> {
173 *value
174 }
175 }
176
177 pub fn serialize<S>(d: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
178 where
179 S: Serializer,
180 {
181 let proxy: $name = (*d).into();
182 proxy.serialize(s)
183 }
184
185 pub fn deserialize<'a, D>(d: D) -> Result<DateTime<Utc>, D::Error>
186 where
187 D: Deserializer<'a>,
188 {
189 $name::deserialize(d).map(|p| p.into())
190 }
191 }
192 };
193}
194
195ts_serializer!(ts_serializer, TimestampSerializer, timestamp, event_time);
196ts_serializer!(ct_serializer, CreationTimeSerializer, created, created_time);
197
198#[derive(Serialize, Deserialize, Debug, Clone)]
199pub struct JsonLogRecord {
200 /// What kind of record this is
201 #[serde(rename = "type")]
202 pub kind: RecordType,
203 /// The message id
204 pub id: String,
205 /// The envelope sender
206 pub sender: String,
207 /// The envelope recipient
208 pub recipient: String,
209 /// Which named queue the message was associated with
210 pub queue: String,
211 /// Which MX site the message was being delivered to
212 pub site: String,
213 /// The size of the message, in bytes
214 pub size: u64,
215 /// The response from/to the peer
216 pub response: Response,
217 /// The address of the peer, and our sense of its
218 /// hostname or EHLO domain
219 pub peer_address: Option<ResolvedAddress>,
220 /// The time at which we are logging this event
221 #[serde(flatten, with = "ts_serializer")]
222 pub timestamp: DateTime<Utc>,
223 /// The time at which the message was initially received and created
224 #[serde(flatten, with = "ct_serializer")]
225 pub created: DateTime<Utc>,
226 /// The number of delivery attempts that have been made.
227 /// Note that this may be approximate after a restart; use the
228 /// number of logged events to determine the true number
229 pub num_attempts: u16,
230
231 pub bounce_classification: BounceClass,
232
233 pub egress_pool: Option<String>,
234 pub egress_source: Option<String>,
235 pub source_address: Option<MaybeProxiedSourceAddress>,
236
237 pub feedback_report: Option<Box<ARFReport>>,
238
239 pub meta: HashMap<String, Value>,
240 pub headers: HashMap<String, Value>,
241
242 /// The protocol used to deliver, or attempt to deliver, this message
243 pub delivery_protocol: Option<String>,
244
245 /// The protocol used to receive this message
246 pub reception_protocol: Option<String>,
247
248 /// The id of the node on which the event occurred
249 pub nodeid: Uuid,
250
251 /// The TLS Cipher used, if applicable
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub tls_cipher: Option<String>,
254
255 /// The TLS protocol version used, if applicable
256 #[serde(skip_serializing_if = "Option::is_none")]
257 pub tls_protocol_version: Option<String>,
258
259 /// The Subject Name from the peer TLS certificate, if applicable
260 #[serde(skip_serializing_if = "Option::is_none")]
261 pub tls_peer_subject_name: Option<Vec<String>>,
262
263 /// The provider name, if any.
264 /// This is a way of grouping destination sites operated
265 /// by the same provider.
266 #[serde(skip_serializing_if = "Option::is_none")]
267 pub provider_name: Option<String>,
268
269 /// Uuid identifying a connection/session for either inbound
270 /// or outbound (depending on the type of the record).
271 /// This is useful when correlating a series of messages to
272 /// the same connection for either ingress or egress
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub session_id: Option<Uuid>,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct MaybeProxiedSourceAddress {
279 pub address: SocketAddress,
280 #[serde(skip_serializing_if = "Option::is_none")]
281 pub server: Option<SocketAddr>,
282 #[serde(skip_serializing_if = "Option::is_none")]
283 pub protocol: Option<Cow<'static, str>>,
284}
285
286#[cfg(all(test, target_pointer_width = "64"))]
287#[test]
288fn sizes() {
289 assert_eq!(std::mem::size_of::<JsonLogRecord>(), 712);
290}