kumo_server_common/
acct.rs

1use crate::authn_authz::{Access, AuditRecord, AuthInfo, Identity};
2use crate::disk_space::{MinFree, MonitoredPath};
3use crate::log::{mark_existing_logs_as_done_in_dir, OpenedFile};
4use anyhow::Context;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::OnceLock;
10use std::time::{Duration, Instant};
11use tokio::sync::mpsc::{channel, Receiver, Sender};
12use zstd::Encoder;
13
14static LOGGER: OnceLock<Sender<AcctLogRecord>> = OnceLock::new();
15
16#[derive(Deserialize, Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct AuditLogParams {
19    /// Where to place the log files
20    pub log_dir: PathBuf,
21    /// How many uncompressed bytes to allow per file segment
22    #[serde(default = "AuditLogParams::default_max_file_size")]
23    pub max_file_size: u64,
24    /// Maximum number of outstanding items to be logged before
25    /// the submission will block; helps to avoid runaway issues
26    /// spiralling out of control.
27    #[serde(default = "AuditLogParams::default_back_pressure")]
28    pub back_pressure: usize,
29
30    /// The level of compression.
31    /// 0 - use the zstd default level (probably 3).
32    /// 1-21 are the explicitly configurable levels
33    #[serde(default = "AuditLogParams::default_compression_level")]
34    pub compression_level: i32,
35
36    #[serde(default, with = "duration_serde")]
37    pub max_segment_duration: Option<Duration>,
38
39    #[serde(default)]
40    pub min_free_space: MinFree,
41    #[serde(default)]
42    pub min_free_inodes: MinFree,
43
44    /// Log records for successfully granted authz
45    #[serde(default = "AuditLogParams::default_true")]
46    pub log_authz_allow: bool,
47
48    /// Log records for denied authz
49    #[serde(default = "AuditLogParams::default_true")]
50    pub log_authz_deny: bool,
51
52    /// Log records for successful authn
53    #[serde(default = "AuditLogParams::default_true")]
54    pub log_authn_ok: bool,
55
56    /// Log records for failed authn
57    #[serde(default = "AuditLogParams::default_true")]
58    pub log_authn_fail: bool,
59}
60
61impl AuditLogParams {
62    pub fn default_max_file_size() -> u64 {
63        1_000_000_000
64    }
65    pub fn default_back_pressure() -> usize {
66        128_000
67    }
68    pub fn default_compression_level() -> i32 {
69        0 // use the zstd default
70    }
71
72    pub fn default_true() -> bool {
73        true
74    }
75
76    pub async fn init(&self) -> anyhow::Result<()> {
77        let (tx, rx) = channel(self.back_pressure);
78        let params = self.clone();
79        tokio::spawn(async move {
80            if let Err(err) = params.acct_logger(rx).await {
81                tracing::error!(
82                    "AuditLogParams::acct_logger returned with error: {err}, \
83                    audit logging will cease to function until the next restart!"
84                );
85            }
86        });
87
88        if LOGGER.set(tx).is_err() {
89            anyhow::bail!("AuditLogParams::init must not be called more than once");
90        }
91
92        Ok(())
93    }
94
95    async fn acct_logger(self, mut rx: Receiver<AcctLogRecord>) -> anyhow::Result<()> {
96        if !config::is_validating() {
97            if let Err(err) = mark_existing_logs_as_done_in_dir(&self.log_dir) {
98                tracing::error!("{err}");
99            }
100            std::fs::create_dir_all(&self.log_dir).ok();
101            MonitoredPath {
102                name: format!("log dir {}", self.log_dir.display()),
103                path: self.log_dir.clone(),
104                min_free_space: self.min_free_space,
105                min_free_inodes: self.min_free_inodes,
106            }
107            .register();
108        }
109
110        let mut expire_counter = 0u16;
111        let mut current_file: Option<OpenedFile> = None;
112
113        loop {
114            let deadline = current_file.as_ref().and_then(|f| f.expires);
115
116            let mut check_expire = false;
117            let record = match deadline {
118                Some(deadline) => tokio::select! {
119                    r = rx.recv() => {
120                        match r {
121                            Some(r) => Some(r),
122                            None => return Ok(()),
123                        }
124                    }
125                    _ = tokio::time::sleep_until(deadline.into()) => {
126                        check_expire = true;
127                        None
128                    }
129                },
130                None => match rx.recv().await {
131                    Some(r) => Some(r),
132                    None => return Ok(()),
133                },
134            };
135
136            expire_counter = expire_counter.wrapping_add(1);
137            if expire_counter == 10_000 {
138                check_expire = true;
139            }
140
141            if check_expire {
142                let now = Instant::now();
143                let do_expire = current_file
144                    .as_ref()
145                    .and_then(|f| f.expires.map(|exp| exp <= now))
146                    .unwrap_or(false);
147
148                if do_expire {
149                    current_file.take();
150                }
151                expire_counter = 0;
152            }
153
154            if let Some(record) = record {
155                let should_log = match &record {
156                    AcctLogRecord::Authentication(authn) => {
157                        if authn.success {
158                            self.log_authn_ok
159                        } else {
160                            self.log_authn_fail
161                        }
162                    }
163                    AcctLogRecord::Authorization(authz) => match authz.access {
164                        Access::Allow => self.log_authz_allow,
165                        Access::Deny => self.log_authz_deny,
166                    },
167                };
168
169                if !should_log {
170                    continue;
171                }
172
173                if let Err(err) = self.open_file_if_needed(&mut current_file) {
174                    tracing::error!("{err}");
175                    continue;
176                }
177
178                if let Err(err) = self.do_record(&mut current_file, record) {
179                    tracing::error!("{err}");
180                    continue;
181                }
182            }
183        }
184    }
185
186    fn open_file_if_needed(&self, current_file: &mut Option<OpenedFile>) -> anyhow::Result<()> {
187        if current_file.is_some() {
188            return Ok(());
189        }
190        let now = Utc::now();
191
192        let base_name = now.format("acct-%Y%m%d-%H%M%S%.f").to_string();
193
194        let name = self.log_dir.join(base_name);
195        // They might be trying to use multiple directories below
196        // the configured log_dir, so adjust our idea of its parent
197        // dir
198        let log_dir = name
199            .parent()
200            .expect("log_dir.join ensures we always have a parent");
201
202        let f = match std::fs::OpenOptions::new()
203            .append(true)
204            .create(true)
205            .open(&name)
206        {
207            Ok(f) => f,
208            Err(err) => {
209                if err.kind() == std::io::ErrorKind::NotFound {
210                    match std::fs::create_dir_all(log_dir) {
211                        Ok(_) => {
212                            // Try opening it again now
213                            std::fs::OpenOptions::new()
214                                .append(true)
215                                .create(true)
216                                .open(&name)
217                                .with_context(|| format!("open log file {name:?}"))?
218                        }
219                        Err(dir_err) => {
220                            anyhow::bail!(
221                                "open log file {name:?}: failed: {err:#?}. \
222                                    Additionally, attempting to create dir {} failed: {dir_err:#?}",
223                                log_dir.display()
224                            );
225                        }
226                    }
227                } else {
228                    anyhow::bail!("open log file {name:?}: failed: {err:#?}");
229                }
230            }
231        };
232
233        let file = OpenedFile {
234            file: Encoder::new(f, self.compression_level).context("set up zstd encoder")?,
235            name,
236            written: 0,
237            expires: self
238                .max_segment_duration
239                .map(|duration| Instant::now() + duration),
240        };
241
242        current_file.replace(file);
243        Ok(())
244    }
245
246    fn do_record(
247        &self,
248        current_file: &mut Option<OpenedFile>,
249        record: AcctLogRecord,
250    ) -> anyhow::Result<()> {
251        if let Some(file) = current_file {
252            let mut record_text = Vec::new();
253            serde_json::to_writer(&mut record_text, &record).context("serializing record")?;
254            record_text.push(b'\n');
255
256            file.file
257                .write_all(&record_text)
258                .with_context(|| format!("writing record to {}", file.name.display()))?;
259
260            let need_rotate = file.written >= self.max_file_size
261                || file
262                    .expires
263                    .map(|exp| exp <= Instant::now())
264                    .unwrap_or(false);
265
266            if need_rotate {
267                current_file.take();
268            }
269        }
270        Ok(())
271    }
272}
273
274#[derive(Serialize, Deserialize, Debug)]
275#[serde(tag = "type")]
276pub enum AcctLogRecord {
277    Authentication(AuthnAuditRecord),
278    Authorization(AuditRecord),
279}
280
281impl AcctLogRecord {
282    pub fn timestamp(&self) -> &DateTime<Utc> {
283        match self {
284            Self::Authentication(authn) => &authn.timestamp,
285            Self::Authorization(authz) => &authz.timestamp,
286        }
287    }
288
289    pub fn is_allow(&self) -> bool {
290        match self {
291            Self::Authentication(authn) => authn.success,
292            Self::Authorization(authz) => authz.access == Access::Allow,
293        }
294    }
295}
296
297async fn log_acct(record: AcctLogRecord) -> anyhow::Result<()> {
298    tracing::trace!("Audit: {record:?}");
299    if let Some(sender) = LOGGER.get() {
300        sender.send(record).await?;
301    }
302    Ok(())
303}
304
305pub async fn log_authz(record: AuditRecord) -> anyhow::Result<()> {
306    log_acct(AcctLogRecord::Authorization(record)).await
307}
308
309#[derive(Serialize, Deserialize, Debug)]
310pub struct AuthnAuditRecord {
311    /// When the event occurred
312    pub timestamp: DateTime<Utc>,
313    /// Who they were attempting to authenticate as
314    pub attempted_identity: Identity,
315    /// Whether the attempt was successful
316    pub success: bool,
317    /// The resulting authentication info
318    pub auth_info: AuthInfo,
319}
320
321pub async fn log_authn(record: AuthnAuditRecord) -> anyhow::Result<()> {
322    log_acct(AcctLogRecord::Authentication(record)).await
323}