kumo_dmarc/
lib.rs

1#![allow(dead_code)]
2
3use crate::types::date_range::DateRange;
4use crate::types::feedback::Feedback;
5use crate::types::identifier::Identifier;
6use crate::types::mode::Mode;
7use crate::types::policy::Policy;
8use crate::types::policy_published::PolicyPublished;
9use crate::types::record::Record;
10use crate::types::report_failure::ReportFailure;
11use crate::types::report_metadata::ReportMetadata;
12use crate::types::results::{AuthResults, DmarcResult, PolicyEvaluated, Results, Row};
13pub use crate::types::results::{Disposition, DispositionWithContext};
14use bstr::BString;
15use chrono::{DateTime, Utc};
16use dns_resolver::Resolver;
17use mailparsing::AuthenticationResult;
18use serde::{Deserialize, Serialize};
19use std::collections::{BTreeMap, HashMap};
20use std::fs::File;
21use std::io::{BufRead, BufReader, Write};
22use std::net::IpAddr;
23use std::str::FromStr;
24use std::time::SystemTime;
25use uuid::Uuid;
26
27mod types;
28
29#[cfg(test)]
30mod tests;
31
32const DMARC_REPORT_LOG_FILEPATH: &'static str = "/var/log/kumomta/dmarc.log";
33
34pub struct DmarcPassContext {
35    /// Domain of the sender in the "From:"
36    pub from_domain: String,
37
38    /// Domain that provides the sought-after authorization information.
39    ///
40    /// The "MAIL FROM" email address if available.
41    pub mail_from_domain: Option<String>,
42
43    /// The envelope to
44    pub recipient_domain_list: Vec<String>,
45
46    /// The source IP address
47    pub received_from: String,
48
49    /// The results of the DKIM part of the checks
50    pub dkim_results: Vec<AuthenticationResult>,
51
52    /// The results of the SPF part of the checks
53    pub spf_result: AuthenticationResult,
54
55    /// The additional information needed to perform reporting
56    pub reporting_info: Option<ReportingInfo>,
57}
58
59impl DmarcPassContext {
60    pub async fn check(self, resolver: &dyn Resolver) -> DispositionWithContext {
61        let Self {
62            from_domain,
63            mail_from_domain,
64            recipient_domain_list: recipient_list,
65            received_from,
66            dkim_results,
67            spf_result,
68            reporting_info,
69        } = self;
70
71        let mut dmarc_context = DmarcContext::new(
72            &from_domain,
73            mail_from_domain.as_ref().map(|x| x.as_str()),
74            &recipient_list[..],
75            received_from.as_str(),
76            &dkim_results[..],
77            &spf_result,
78            reporting_info.as_ref(),
79        );
80
81        dmarc_context.check(resolver).await
82    }
83}
84
85#[derive(Clone, Copy)]
86pub(crate) enum SenderDomainAlignment {
87    /// Sender domain is an exact match to the dmarc record
88    Exact,
89
90    /// Sender domain has no exact matching dmarc record
91    /// but its organizational domain does
92    OrganizationalDomain,
93}
94
95pub(crate) enum DmarcRecordResolution {
96    /// DNS could not be resolved at this time
97    TempError,
98
99    /// DNS was resolved, but no DMARC record was found
100    PermError,
101
102    /// DNS was resolved, and DMARC record was found
103    Records(Vec<Record>),
104}
105
106impl From<DmarcRecordResolution> for Disposition {
107    fn from(value: DmarcRecordResolution) -> Self {
108        match value {
109            DmarcRecordResolution::TempError => Disposition::TempError,
110            DmarcRecordResolution::PermError => Disposition::PermError,
111            DmarcRecordResolution::Records(_) => {
112                panic!("records must be parsed before being used in disposition")
113            }
114        }
115    }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
119#[serde(deny_unknown_fields)]
120pub struct ReportingInfo {
121    org_name: String,
122    email: String,
123    extra_contact_info: Option<String>,
124}
125
126/// The individual error records that are then aggregated for output in the report
127#[derive(Serialize, Deserialize, Clone)]
128pub(crate) struct ErrorRecord {
129    pub(crate) version: String,
130    pub(crate) org_name: String,
131    pub(crate) email: String,
132    pub(crate) extra_contact_info: Option<String>,
133    pub(crate) when: DateTime<Utc>,
134    pub(crate) error: String,
135    pub(crate) domain: String,
136    pub(crate) align_dkim: Option<Mode>,
137    pub(crate) align_spf: Option<Mode>,
138    pub(crate) policy: Policy,
139    pub(crate) subdomain_policy: Policy,
140    pub(crate) rate: u8,
141    pub(crate) report_failure: ReportFailure,
142    pub(crate) source_ip: IpAddr,
143    pub(crate) policy_evaluated: PolicyEvaluated,
144    pub(crate) identifier: Identifier,
145    pub(crate) auth_results: AuthResults,
146}
147
148struct DmarcContext<'a> {
149    pub(crate) from_domain: &'a str,
150    pub(crate) mail_from_domain: Option<&'a str>,
151    pub(crate) recipient_list: &'a [String],
152    pub(crate) received_from: &'a str,
153    pub(crate) now: SystemTime,
154    pub(crate) dkim_results: &'a [AuthenticationResult],
155    pub(crate) spf_result: &'a AuthenticationResult,
156    pub(crate) dkim_aligned: DmarcResult,
157    pub(crate) spf_aligned: DmarcResult,
158    pub(crate) reporting_info: Option<&'a ReportingInfo>,
159}
160
161impl<'a> DmarcContext<'a> {
162    /// Create a new evaluation context.
163    ///
164    /// - `from_domain` is the domain of the "From:" header
165    /// - `mail_from_domain` is the domain portion of the "MAIL FROM" identity
166    /// - `client_ip` is the IP address of the SMTP client that is emitting the mail
167    fn new(
168        from_domain: &'a str,
169        mail_from_domain: Option<&'a str>,
170        recipient_list: &'a [String],
171        received_from: &'a str,
172        dkim_results: &'a [AuthenticationResult],
173        spf_result: &'a AuthenticationResult,
174        reporting_info: Option<&'a ReportingInfo>,
175    ) -> DmarcContext<'a> {
176        Self {
177            from_domain,
178            mail_from_domain,
179            recipient_list,
180            received_from,
181            now: SystemTime::now(),
182            dkim_results,
183            spf_result,
184            dkim_aligned: DmarcResult::Pass,
185            spf_aligned: DmarcResult::Pass,
186            reporting_info,
187        }
188    }
189
190    pub async fn report_error(
191        &self,
192        record: &Record,
193        dmarc_domain: &str,
194        sender_domain_alignment: SenderDomainAlignment,
195        error: &str,
196    ) -> std::io::Result<()> {
197        let source_ip = self
198            .received_from
199            .parse()
200            .map_err(|x: std::net::AddrParseError| {
201                std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, x.to_string())
202            })?;
203
204        if let Some(reporting_info) = self.reporting_info {
205            let error_record = ErrorRecord {
206                version: "1.0".to_string(),
207                org_name: reporting_info.org_name.to_string(),
208                email: reporting_info.email.to_string(),
209                extra_contact_info: reporting_info.extra_contact_info.to_owned(),
210                when: Utc::now(),
211                error: error.to_string(),
212                domain: dmarc_domain.to_string(),
213                align_dkim: Some(record.align_dkim),
214                align_spf: Some(record.align_spf),
215                policy: record.policy,
216                subdomain_policy: record.subdomain_policy.unwrap_or(record.policy),
217                rate: record.rate,
218                report_failure: record.report_failure,
219                source_ip,
220                policy_evaluated: PolicyEvaluated {
221                    disposition: record.policy_result(sender_domain_alignment),
222                    dkim: self.dkim_aligned,
223                    spf: self.spf_aligned,
224                    reason: vec![],
225                },
226                identifier: Identifier {
227                    envelope_to: self.recipient_list.into(),
228                    envelope_from: if let Some(mail_from_domain) = self.mail_from_domain {
229                        vec![mail_from_domain.into()]
230                    } else {
231                        vec![]
232                    },
233                    header_from: self.from_domain.into(),
234                },
235                auth_results: AuthResults {
236                    dkim: self.dkim_results.iter().map(|x| x.clone().into()).collect(),
237                    spf: vec![self.spf_result.clone().into()],
238                },
239            };
240
241            let result = serde_json::to_string(&error_record)?;
242
243            let mut f = File::options()
244                .append(true)
245                .open(DMARC_REPORT_LOG_FILEPATH)?;
246
247            writeln!(f, "{result}")?;
248        }
249
250        Ok(())
251    }
252
253    pub async fn aggregate(&self) -> anyhow::Result<()> {
254        let mut input_records = vec![];
255        let file = File::open(DMARC_REPORT_LOG_FILEPATH)?;
256        let lines = BufReader::new(file).lines();
257
258        for line in lines.map_while(Result::ok) {
259            let result: anyhow::Result<ErrorRecord> = serde_json::from_str::<ErrorRecord>(&line)
260                .map_err(|error| {
261                    anyhow::Error::new(error).context(format!(
262                        "Failed to decode a line from the DMARC report file \
263           {DMARC_REPORT_LOG_FILEPATH}. \
264           The line was: {line}. \
265           Is the file corrupt?"
266                    ))
267                })
268                .into();
269
270            input_records.push(result?);
271        }
272
273        let mut errors_grouped_by_email: HashMap<String, BTreeMap<IpAddr, Vec<ErrorRecord>>> =
274            HashMap::new();
275
276        for record in input_records {
277            let entry = errors_grouped_by_email.entry(record.email.clone());
278            let record_source_ip = record.source_ip.clone();
279
280            entry
281                .and_modify(|entry| {
282                    entry
283                        .entry(record.source_ip)
284                        .and_modify(|x| x.push(record.clone()))
285                        .or_insert_with(|| vec![record.clone()]);
286                })
287                .or_insert({
288                    let mut new_group = BTreeMap::new();
289
290                    new_group.insert(record_source_ip, vec![record]);
291
292                    new_group
293                });
294        }
295
296        for (email, errors_grouped_by_ip) in errors_grouped_by_email.iter_mut() {
297            let mut errors = vec![];
298            let mut record = vec![];
299
300            //we know this is safe to do because for this list to be present, we will have found it earlier
301            let (_, first_records) = errors_grouped_by_ip
302                .iter()
303                .next()
304                .expect("guaranteed to not be empty by the logic above");
305
306            let first_record = &first_records[0];
307
308            let version = first_record.version.clone();
309            let org_name = first_record.org_name.clone();
310            let email = email.clone();
311            let extra_contact_info = first_record.extra_contact_info.clone();
312
313            let mut date_range = DateRange::new(first_record.when, first_record.when);
314
315            let report_id = Uuid::new_v4().to_string();
316
317            let domain = first_record.domain.clone();
318            let align_dkim = first_record.align_dkim;
319            let align_spf = first_record.align_spf;
320            let policy = first_record.policy;
321            let subdomain_policy = first_record.subdomain_policy;
322            let rate = first_record.rate;
323            let report_failure = first_record.report_failure;
324
325            for (ip, error_group_for_ip) in errors_grouped_by_ip.iter_mut() {
326                let row = Row {
327                    source_ip: *ip,
328                    count: error_group_for_ip.len() as u64,
329                    policy_evaluated: error_group_for_ip[0].policy_evaluated.clone(),
330                };
331
332                let mut results = Results {
333                    row,
334                    identifiers: Identifier {
335                        envelope_to: vec![],
336                        envelope_from: vec![],
337                        header_from: String::new(),
338                    },
339                    auth_results: AuthResults {
340                        dkim: vec![],
341                        spf: vec![],
342                    },
343                };
344
345                for group_error in error_group_for_ip.iter() {
346                    errors.push(group_error.error.clone());
347
348                    date_range.begin = std::cmp::min(date_range.begin, group_error.when);
349                    date_range.end = std::cmp::max(date_range.end, group_error.when);
350
351                    results
352                        .identifiers
353                        .envelope_from
354                        .extend_from_slice(&group_error.identifier.envelope_from);
355                    results
356                        .identifiers
357                        .envelope_to
358                        .extend_from_slice(&group_error.identifier.envelope_to);
359
360                    results
361                        .auth_results
362                        .dkim
363                        .extend_from_slice(&group_error.auth_results.dkim);
364                    results
365                        .auth_results
366                        .spf
367                        .extend_from_slice(&group_error.auth_results.spf);
368                }
369
370                record.push(results);
371            }
372
373            let _feedback = Feedback {
374                version,
375                metadata: ReportMetadata {
376                    org_name,
377                    email,
378                    extra_contact_info,
379                    report_id,
380                    date_range,
381                    error: errors,
382                },
383                policy: PolicyPublished::new(
384                    domain,
385                    align_dkim,
386                    align_spf,
387                    policy,
388                    subdomain_policy,
389                    rate,
390                    report_failure,
391                ),
392                record,
393            };
394
395            // if let Ok(result) = instant_xml::to_string(&feedback) {
396            //     println!("log: {}", result);
397            // }
398        }
399
400        Ok(())
401    }
402
403    pub async fn check(&mut self, resolver: &dyn Resolver) -> DispositionWithContext {
404        let dmarc_domain = format!("_dmarc.{}", self.from_domain);
405        match fetch_dmarc_records(&dmarc_domain, resolver).await {
406            DmarcRecordResolution::Records(records) => {
407                for record in records {
408                    let mut result = record
409                        .evaluate(self, &dmarc_domain, SenderDomainAlignment::Exact)
410                        .await;
411                    result.props.extend(policy_tags(&record));
412                    return result;
413                }
414            }
415            x => {
416                let normalized_from = psl_utils::normalize_domain(self.from_domain);
417                if let Some(organizational_domain) = psl_utils::domain_str(&normalized_from) {
418                    if organizational_domain != normalized_from {
419                        let address = format!("_dmarc.{}", organizational_domain);
420                        match fetch_dmarc_records(&address, resolver).await {
421                            DmarcRecordResolution::TempError => {
422                                return DispositionWithContext {
423                                    result: Disposition::TempError,
424                                    context: format!(
425                                        "DNS records could not be resolved for {}",
426                                        address
427                                    ),
428                                    props: BTreeMap::new(),
429                                }
430                            }
431                            DmarcRecordResolution::PermError => {
432                                return DispositionWithContext {
433                                    result: Disposition::PermError,
434                                    context: format!("no DMARC records found for {}", address),
435                                    props: BTreeMap::new(),
436                                }
437                            }
438                            DmarcRecordResolution::Records(records) => {
439                                for record in records {
440                                    let mut result = record
441                                        .evaluate(
442                                            self,
443                                            &address,
444                                            SenderDomainAlignment::OrganizationalDomain,
445                                        )
446                                        .await;
447                                    result.props.extend(policy_tags(&record));
448                                    return result;
449                                }
450                            }
451                        }
452                    } else {
453                        return DispositionWithContext {
454                            result: x.into(),
455                            context: format!("no DMARC records found for {}", &self.from_domain),
456                            props: BTreeMap::new(),
457                        };
458                    }
459                }
460            }
461        }
462
463        DispositionWithContext {
464            result: Disposition::None,
465            context: format!("no DMARC records found for {}", &self.from_domain),
466            props: BTreeMap::new(),
467        }
468    }
469}
470
471fn policy_tags(record: &Record) -> BTreeMap<String, BString> {
472    let mut props = BTreeMap::new();
473
474    for (tag, value) in record.tags() {
475        props.insert(format!("policy.{tag}").into(), value.as_str().into());
476    }
477
478    props
479}
480
481// The output is wrapped in a Result to allow matching on errors.
482// Returns an Iterator to the Reader of the lines of the file.
483fn read_lines<P>(filename: P) -> std::io::Result<std::io::Lines<std::io::BufReader<File>>>
484where
485    P: AsRef<std::path::Path>,
486{
487    let file = File::open(filename)?;
488    Ok(std::io::BufReader::new(file).lines())
489}
490
491pub(crate) async fn fetch_dmarc_records(
492    address: &str,
493    resolver: &dyn Resolver,
494) -> DmarcRecordResolution {
495    let initial_txt = match resolver.resolve_txt(address).await {
496        Ok(answer) => {
497            if answer.records.is_empty() || answer.nxdomain {
498                return DmarcRecordResolution::PermError;
499            } else {
500                answer.as_txt()
501            }
502        }
503        Err(_) => {
504            return DmarcRecordResolution::TempError;
505        }
506    };
507
508    let mut records = vec![];
509
510    // TXT records can contain all sorts of stuff, let's walk through
511    // the set that we retrieved and take the first one that parses
512    for txt in initial_txt {
513        if txt.starts_with("v=DMARC1;") {
514            if let Ok(record) = Record::from_str(&txt) {
515                records.push(record);
516            }
517        }
518    }
519
520    if records.is_empty() {
521        return DmarcRecordResolution::PermError;
522    }
523
524    DmarcRecordResolution::Records(records)
525}