1#![allow(dead_code)]
2
3use crate::types::date_range::DateRange;
4use crate::types::feedback::Feedback;
5use crate::types::identifier::Identifier;
6use crate::types::mode::Mode;
7use crate::types::policy::Policy;
8use crate::types::policy_published::PolicyPublished;
9use crate::types::record::Record;
10use crate::types::report_failure::ReportFailure;
11use crate::types::report_metadata::ReportMetadata;
12use crate::types::results::{AuthResults, DmarcResult, PolicyEvaluated, Results, Row};
13pub use crate::types::results::{Disposition, DispositionWithContext};
14use bstr::BString;
15use chrono::{DateTime, Utc};
16use dns_resolver::Resolver;
17use mailparsing::AuthenticationResult;
18use serde::{Deserialize, Serialize};
19use std::collections::{BTreeMap, HashMap};
20use std::fs::File;
21use std::io::{BufRead, BufReader, Write};
22use std::net::IpAddr;
23use std::str::FromStr;
24use std::time::SystemTime;
25use uuid::Uuid;
26
27mod types;
28
29#[cfg(test)]
30mod tests;
31
32const DMARC_REPORT_LOG_FILEPATH: &'static str = "/var/log/kumomta/dmarc.log";
33
34pub struct DmarcPassContext {
35 pub from_domain: String,
37
38 pub mail_from_domain: Option<String>,
42
43 pub recipient_domain_list: Vec<String>,
45
46 pub received_from: String,
48
49 pub dkim_results: Vec<AuthenticationResult>,
51
52 pub spf_result: AuthenticationResult,
54
55 pub reporting_info: Option<ReportingInfo>,
57}
58
59impl DmarcPassContext {
60 pub async fn check(self, resolver: &dyn Resolver) -> DispositionWithContext {
61 let Self {
62 from_domain,
63 mail_from_domain,
64 recipient_domain_list: recipient_list,
65 received_from,
66 dkim_results,
67 spf_result,
68 reporting_info,
69 } = self;
70
71 let mut dmarc_context = DmarcContext::new(
72 &from_domain,
73 mail_from_domain.as_ref().map(|x| x.as_str()),
74 &recipient_list[..],
75 received_from.as_str(),
76 &dkim_results[..],
77 &spf_result,
78 reporting_info.as_ref(),
79 );
80
81 dmarc_context.check(resolver).await
82 }
83}
84
85#[derive(Clone, Copy)]
86pub(crate) enum SenderDomainAlignment {
87 Exact,
89
90 OrganizationalDomain,
93}
94
95pub(crate) enum DmarcRecordResolution {
96 TempError,
98
99 PermError,
101
102 Records(Vec<Record>),
104}
105
106impl From<DmarcRecordResolution> for Disposition {
107 fn from(value: DmarcRecordResolution) -> Self {
108 match value {
109 DmarcRecordResolution::TempError => Disposition::TempError,
110 DmarcRecordResolution::PermError => Disposition::PermError,
111 DmarcRecordResolution::Records(_) => {
112 panic!("records must be parsed before being used in disposition")
113 }
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
119#[serde(deny_unknown_fields)]
120pub struct ReportingInfo {
121 org_name: String,
122 email: String,
123 extra_contact_info: Option<String>,
124}
125
126#[derive(Serialize, Deserialize, Clone)]
128pub(crate) struct ErrorRecord {
129 pub(crate) version: String,
130 pub(crate) org_name: String,
131 pub(crate) email: String,
132 pub(crate) extra_contact_info: Option<String>,
133 pub(crate) when: DateTime<Utc>,
134 pub(crate) error: String,
135 pub(crate) domain: String,
136 pub(crate) align_dkim: Option<Mode>,
137 pub(crate) align_spf: Option<Mode>,
138 pub(crate) policy: Policy,
139 pub(crate) subdomain_policy: Policy,
140 pub(crate) rate: u8,
141 pub(crate) report_failure: ReportFailure,
142 pub(crate) source_ip: IpAddr,
143 pub(crate) policy_evaluated: PolicyEvaluated,
144 pub(crate) identifier: Identifier,
145 pub(crate) auth_results: AuthResults,
146}
147
148struct DmarcContext<'a> {
149 pub(crate) from_domain: &'a str,
150 pub(crate) mail_from_domain: Option<&'a str>,
151 pub(crate) recipient_list: &'a [String],
152 pub(crate) received_from: &'a str,
153 pub(crate) now: SystemTime,
154 pub(crate) dkim_results: &'a [AuthenticationResult],
155 pub(crate) spf_result: &'a AuthenticationResult,
156 pub(crate) dkim_aligned: DmarcResult,
157 pub(crate) spf_aligned: DmarcResult,
158 pub(crate) reporting_info: Option<&'a ReportingInfo>,
159}
160
161impl<'a> DmarcContext<'a> {
162 fn new(
168 from_domain: &'a str,
169 mail_from_domain: Option<&'a str>,
170 recipient_list: &'a [String],
171 received_from: &'a str,
172 dkim_results: &'a [AuthenticationResult],
173 spf_result: &'a AuthenticationResult,
174 reporting_info: Option<&'a ReportingInfo>,
175 ) -> DmarcContext<'a> {
176 Self {
177 from_domain,
178 mail_from_domain,
179 recipient_list,
180 received_from,
181 now: SystemTime::now(),
182 dkim_results,
183 spf_result,
184 dkim_aligned: DmarcResult::Pass,
185 spf_aligned: DmarcResult::Pass,
186 reporting_info,
187 }
188 }
189
190 pub async fn report_error(
191 &self,
192 record: &Record,
193 dmarc_domain: &str,
194 sender_domain_alignment: SenderDomainAlignment,
195 error: &str,
196 ) -> std::io::Result<()> {
197 let source_ip = self
198 .received_from
199 .parse()
200 .map_err(|x: std::net::AddrParseError| {
201 std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, x.to_string())
202 })?;
203
204 if let Some(reporting_info) = self.reporting_info {
205 let error_record = ErrorRecord {
206 version: "1.0".to_string(),
207 org_name: reporting_info.org_name.to_string(),
208 email: reporting_info.email.to_string(),
209 extra_contact_info: reporting_info.extra_contact_info.to_owned(),
210 when: Utc::now(),
211 error: error.to_string(),
212 domain: dmarc_domain.to_string(),
213 align_dkim: Some(record.align_dkim),
214 align_spf: Some(record.align_spf),
215 policy: record.policy,
216 subdomain_policy: record.subdomain_policy.unwrap_or(record.policy),
217 rate: record.rate,
218 report_failure: record.report_failure,
219 source_ip,
220 policy_evaluated: PolicyEvaluated {
221 disposition: record.policy_result(sender_domain_alignment),
222 dkim: self.dkim_aligned,
223 spf: self.spf_aligned,
224 reason: vec![],
225 },
226 identifier: Identifier {
227 envelope_to: self.recipient_list.into(),
228 envelope_from: if let Some(mail_from_domain) = self.mail_from_domain {
229 vec![mail_from_domain.into()]
230 } else {
231 vec![]
232 },
233 header_from: self.from_domain.into(),
234 },
235 auth_results: AuthResults {
236 dkim: self.dkim_results.iter().map(|x| x.clone().into()).collect(),
237 spf: vec![self.spf_result.clone().into()],
238 },
239 };
240
241 let result = serde_json::to_string(&error_record)?;
242
243 let mut f = File::options()
244 .append(true)
245 .open(DMARC_REPORT_LOG_FILEPATH)?;
246
247 writeln!(f, "{result}")?;
248 }
249
250 Ok(())
251 }
252
253 pub async fn aggregate(&self) -> anyhow::Result<()> {
254 let mut input_records = vec![];
255 let file = File::open(DMARC_REPORT_LOG_FILEPATH)?;
256 let lines = BufReader::new(file).lines();
257
258 for line in lines.map_while(Result::ok) {
259 let result: anyhow::Result<ErrorRecord> = serde_json::from_str::<ErrorRecord>(&line)
260 .map_err(|error| {
261 anyhow::Error::new(error).context(format!(
262 "Failed to decode a line from the DMARC report file \
263 {DMARC_REPORT_LOG_FILEPATH}. \
264 The line was: {line}. \
265 Is the file corrupt?"
266 ))
267 })
268 .into();
269
270 input_records.push(result?);
271 }
272
273 let mut errors_grouped_by_email: HashMap<String, BTreeMap<IpAddr, Vec<ErrorRecord>>> =
274 HashMap::new();
275
276 for record in input_records {
277 let entry = errors_grouped_by_email.entry(record.email.clone());
278 let record_source_ip = record.source_ip.clone();
279
280 entry
281 .and_modify(|entry| {
282 entry
283 .entry(record.source_ip)
284 .and_modify(|x| x.push(record.clone()))
285 .or_insert_with(|| vec![record.clone()]);
286 })
287 .or_insert({
288 let mut new_group = BTreeMap::new();
289
290 new_group.insert(record_source_ip, vec![record]);
291
292 new_group
293 });
294 }
295
296 for (email, errors_grouped_by_ip) in errors_grouped_by_email.iter_mut() {
297 let mut errors = vec![];
298 let mut record = vec![];
299
300 let (_, first_records) = errors_grouped_by_ip
302 .iter()
303 .next()
304 .expect("guaranteed to not be empty by the logic above");
305
306 let first_record = &first_records[0];
307
308 let version = first_record.version.clone();
309 let org_name = first_record.org_name.clone();
310 let email = email.clone();
311 let extra_contact_info = first_record.extra_contact_info.clone();
312
313 let mut date_range = DateRange::new(first_record.when, first_record.when);
314
315 let report_id = Uuid::new_v4().to_string();
316
317 let domain = first_record.domain.clone();
318 let align_dkim = first_record.align_dkim;
319 let align_spf = first_record.align_spf;
320 let policy = first_record.policy;
321 let subdomain_policy = first_record.subdomain_policy;
322 let rate = first_record.rate;
323 let report_failure = first_record.report_failure;
324
325 for (ip, error_group_for_ip) in errors_grouped_by_ip.iter_mut() {
326 let row = Row {
327 source_ip: *ip,
328 count: error_group_for_ip.len() as u64,
329 policy_evaluated: error_group_for_ip[0].policy_evaluated.clone(),
330 };
331
332 let mut results = Results {
333 row,
334 identifiers: Identifier {
335 envelope_to: vec![],
336 envelope_from: vec![],
337 header_from: String::new(),
338 },
339 auth_results: AuthResults {
340 dkim: vec![],
341 spf: vec![],
342 },
343 };
344
345 for group_error in error_group_for_ip.iter() {
346 errors.push(group_error.error.clone());
347
348 date_range.begin = std::cmp::min(date_range.begin, group_error.when);
349 date_range.end = std::cmp::max(date_range.end, group_error.when);
350
351 results
352 .identifiers
353 .envelope_from
354 .extend_from_slice(&group_error.identifier.envelope_from);
355 results
356 .identifiers
357 .envelope_to
358 .extend_from_slice(&group_error.identifier.envelope_to);
359
360 results
361 .auth_results
362 .dkim
363 .extend_from_slice(&group_error.auth_results.dkim);
364 results
365 .auth_results
366 .spf
367 .extend_from_slice(&group_error.auth_results.spf);
368 }
369
370 record.push(results);
371 }
372
373 let _feedback = Feedback {
374 version,
375 metadata: ReportMetadata {
376 org_name,
377 email,
378 extra_contact_info,
379 report_id,
380 date_range,
381 error: errors,
382 },
383 policy: PolicyPublished::new(
384 domain,
385 align_dkim,
386 align_spf,
387 policy,
388 subdomain_policy,
389 rate,
390 report_failure,
391 ),
392 record,
393 };
394
395 }
399
400 Ok(())
401 }
402
403 pub async fn check(&mut self, resolver: &dyn Resolver) -> DispositionWithContext {
404 let dmarc_domain = format!("_dmarc.{}", self.from_domain);
405 match fetch_dmarc_records(&dmarc_domain, resolver).await {
406 DmarcRecordResolution::Records(records) => {
407 for record in records {
408 let mut result = record
409 .evaluate(self, &dmarc_domain, SenderDomainAlignment::Exact)
410 .await;
411 result.props.extend(policy_tags(&record));
412 return result;
413 }
414 }
415 x => {
416 let normalized_from = psl_utils::normalize_domain(self.from_domain);
417 if let Some(organizational_domain) = psl_utils::domain_str(&normalized_from) {
418 if organizational_domain != normalized_from {
419 let address = format!("_dmarc.{}", organizational_domain);
420 match fetch_dmarc_records(&address, resolver).await {
421 DmarcRecordResolution::TempError => {
422 return DispositionWithContext {
423 result: Disposition::TempError,
424 context: format!(
425 "DNS records could not be resolved for {}",
426 address
427 ),
428 props: BTreeMap::new(),
429 }
430 }
431 DmarcRecordResolution::PermError => {
432 return DispositionWithContext {
433 result: Disposition::PermError,
434 context: format!("no DMARC records found for {}", address),
435 props: BTreeMap::new(),
436 }
437 }
438 DmarcRecordResolution::Records(records) => {
439 for record in records {
440 let mut result = record
441 .evaluate(
442 self,
443 &address,
444 SenderDomainAlignment::OrganizationalDomain,
445 )
446 .await;
447 result.props.extend(policy_tags(&record));
448 return result;
449 }
450 }
451 }
452 } else {
453 return DispositionWithContext {
454 result: x.into(),
455 context: format!("no DMARC records found for {}", &self.from_domain),
456 props: BTreeMap::new(),
457 };
458 }
459 }
460 }
461 }
462
463 DispositionWithContext {
464 result: Disposition::None,
465 context: format!("no DMARC records found for {}", &self.from_domain),
466 props: BTreeMap::new(),
467 }
468 }
469}
470
471fn policy_tags(record: &Record) -> BTreeMap<String, BString> {
472 let mut props = BTreeMap::new();
473
474 for (tag, value) in record.tags() {
475 props.insert(format!("policy.{tag}").into(), value.as_str().into());
476 }
477
478 props
479}
480
481fn read_lines<P>(filename: P) -> std::io::Result<std::io::Lines<std::io::BufReader<File>>>
484where
485 P: AsRef<std::path::Path>,
486{
487 let file = File::open(filename)?;
488 Ok(std::io::BufReader::new(file).lines())
489}
490
491pub(crate) async fn fetch_dmarc_records(
492 address: &str,
493 resolver: &dyn Resolver,
494) -> DmarcRecordResolution {
495 let initial_txt = match resolver.resolve_txt(address).await {
496 Ok(answer) => {
497 if answer.records.is_empty() || answer.nxdomain {
498 return DmarcRecordResolution::PermError;
499 } else {
500 answer.as_txt()
501 }
502 }
503 Err(_) => {
504 return DmarcRecordResolution::TempError;
505 }
506 };
507
508 let mut records = vec![];
509
510 for txt in initial_txt {
513 if txt.starts_with("v=DMARC1;") {
514 if let Ok(record) = Record::from_str(&txt) {
515 records.push(record);
516 }
517 }
518 }
519
520 if records.is_empty() {
521 return DmarcRecordResolution::PermError;
522 }
523
524 DmarcRecordResolution::Records(records)
525}