kumo_server_common/
acct.rs1use crate::authn_authz::{Access, AuditRecord, AuthInfo, Identity};
2use crate::disk_space::{MinFree, MonitoredPath};
3use crate::log::{mark_existing_logs_as_done_in_dir, OpenedFile};
4use anyhow::Context;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::OnceLock;
10use std::time::{Duration, Instant};
11use tokio::sync::mpsc::{channel, Receiver, Sender};
12use zstd::Encoder;
13
14static LOGGER: OnceLock<Sender<AcctLogRecord>> = OnceLock::new();
15
16#[derive(Deserialize, Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct AuditLogParams {
19 pub log_dir: PathBuf,
21 #[serde(default = "AuditLogParams::default_max_file_size")]
23 pub max_file_size: u64,
24 #[serde(default = "AuditLogParams::default_back_pressure")]
28 pub back_pressure: usize,
29
30 #[serde(default = "AuditLogParams::default_compression_level")]
34 pub compression_level: i32,
35
36 #[serde(default, with = "duration_serde")]
37 pub max_segment_duration: Option<Duration>,
38
39 #[serde(default)]
40 pub min_free_space: MinFree,
41 #[serde(default)]
42 pub min_free_inodes: MinFree,
43
44 #[serde(default = "AuditLogParams::default_true")]
46 pub log_authz_allow: bool,
47
48 #[serde(default = "AuditLogParams::default_true")]
50 pub log_authz_deny: bool,
51
52 #[serde(default = "AuditLogParams::default_true")]
54 pub log_authn_ok: bool,
55
56 #[serde(default = "AuditLogParams::default_true")]
58 pub log_authn_fail: bool,
59}
60
61impl AuditLogParams {
62 pub fn default_max_file_size() -> u64 {
63 1_000_000_000
64 }
65 pub fn default_back_pressure() -> usize {
66 128_000
67 }
68 pub fn default_compression_level() -> i32 {
69 0 }
71
72 pub fn default_true() -> bool {
73 true
74 }
75
76 pub async fn init(&self) -> anyhow::Result<()> {
77 let (tx, rx) = channel(self.back_pressure);
78 let params = self.clone();
79 tokio::spawn(async move {
80 if let Err(err) = params.acct_logger(rx).await {
81 tracing::error!(
82 "AuditLogParams::acct_logger returned with error: {err}, \
83 audit logging will cease to function until the next restart!"
84 );
85 }
86 });
87
88 if LOGGER.set(tx).is_err() {
89 anyhow::bail!("AuditLogParams::init must not be called more than once");
90 }
91
92 Ok(())
93 }
94
95 async fn acct_logger(self, mut rx: Receiver<AcctLogRecord>) -> anyhow::Result<()> {
96 if !config::is_validating() {
97 if let Err(err) = mark_existing_logs_as_done_in_dir(&self.log_dir) {
98 tracing::error!("{err}");
99 }
100 std::fs::create_dir_all(&self.log_dir).ok();
101 MonitoredPath {
102 name: format!("log dir {}", self.log_dir.display()),
103 path: self.log_dir.clone(),
104 min_free_space: self.min_free_space,
105 min_free_inodes: self.min_free_inodes,
106 }
107 .register();
108 }
109
110 let mut expire_counter = 0u16;
111 let mut current_file: Option<OpenedFile> = None;
112
113 loop {
114 let deadline = current_file.as_ref().and_then(|f| f.expires);
115
116 let mut check_expire = false;
117 let record = match deadline {
118 Some(deadline) => tokio::select! {
119 r = rx.recv() => {
120 match r {
121 Some(r) => Some(r),
122 None => return Ok(()),
123 }
124 }
125 _ = tokio::time::sleep_until(deadline.into()) => {
126 check_expire = true;
127 None
128 }
129 },
130 None => match rx.recv().await {
131 Some(r) => Some(r),
132 None => return Ok(()),
133 },
134 };
135
136 expire_counter = expire_counter.wrapping_add(1);
137 if expire_counter == 10_000 {
138 check_expire = true;
139 }
140
141 if check_expire {
142 let now = Instant::now();
143 let do_expire = current_file
144 .as_ref()
145 .and_then(|f| f.expires.map(|exp| exp <= now))
146 .unwrap_or(false);
147
148 if do_expire {
149 current_file.take();
150 }
151 expire_counter = 0;
152 }
153
154 if let Some(record) = record {
155 let should_log = match &record {
156 AcctLogRecord::Authentication(authn) => {
157 if authn.success {
158 self.log_authn_ok
159 } else {
160 self.log_authn_fail
161 }
162 }
163 AcctLogRecord::Authorization(authz) => match authz.access {
164 Access::Allow => self.log_authz_allow,
165 Access::Deny => self.log_authz_deny,
166 },
167 };
168
169 if !should_log {
170 continue;
171 }
172
173 if let Err(err) = self.open_file_if_needed(&mut current_file) {
174 tracing::error!("{err}");
175 continue;
176 }
177
178 if let Err(err) = self.do_record(&mut current_file, record) {
179 tracing::error!("{err}");
180 continue;
181 }
182 }
183 }
184 }
185
186 fn open_file_if_needed(&self, current_file: &mut Option<OpenedFile>) -> anyhow::Result<()> {
187 if current_file.is_some() {
188 return Ok(());
189 }
190 let now = Utc::now();
191
192 let base_name = now.format("acct-%Y%m%d-%H%M%S%.f").to_string();
193
194 let name = self.log_dir.join(base_name);
195 let log_dir = name
199 .parent()
200 .expect("log_dir.join ensures we always have a parent");
201
202 let f = match std::fs::OpenOptions::new()
203 .append(true)
204 .create(true)
205 .open(&name)
206 {
207 Ok(f) => f,
208 Err(err) => {
209 if err.kind() == std::io::ErrorKind::NotFound {
210 match std::fs::create_dir_all(log_dir) {
211 Ok(_) => {
212 std::fs::OpenOptions::new()
214 .append(true)
215 .create(true)
216 .open(&name)
217 .with_context(|| format!("open log file {name:?}"))?
218 }
219 Err(dir_err) => {
220 anyhow::bail!(
221 "open log file {name:?}: failed: {err:#?}. \
222 Additionally, attempting to create dir {} failed: {dir_err:#?}",
223 log_dir.display()
224 );
225 }
226 }
227 } else {
228 anyhow::bail!("open log file {name:?}: failed: {err:#?}");
229 }
230 }
231 };
232
233 let file = OpenedFile {
234 file: Encoder::new(f, self.compression_level).context("set up zstd encoder")?,
235 name,
236 written: 0,
237 expires: self
238 .max_segment_duration
239 .map(|duration| Instant::now() + duration),
240 };
241
242 current_file.replace(file);
243 Ok(())
244 }
245
246 fn do_record(
247 &self,
248 current_file: &mut Option<OpenedFile>,
249 record: AcctLogRecord,
250 ) -> anyhow::Result<()> {
251 if let Some(file) = current_file {
252 let mut record_text = Vec::new();
253 serde_json::to_writer(&mut record_text, &record).context("serializing record")?;
254 record_text.push(b'\n');
255
256 file.file
257 .write_all(&record_text)
258 .with_context(|| format!("writing record to {}", file.name.display()))?;
259
260 let need_rotate = file.written >= self.max_file_size
261 || file
262 .expires
263 .map(|exp| exp <= Instant::now())
264 .unwrap_or(false);
265
266 if need_rotate {
267 current_file.take();
268 }
269 }
270 Ok(())
271 }
272}
273
274#[derive(Serialize, Deserialize, Debug)]
275#[serde(tag = "type")]
276pub enum AcctLogRecord {
277 Authentication(AuthnAuditRecord),
278 Authorization(AuditRecord),
279}
280
281impl AcctLogRecord {
282 pub fn timestamp(&self) -> &DateTime<Utc> {
283 match self {
284 Self::Authentication(authn) => &authn.timestamp,
285 Self::Authorization(authz) => &authz.timestamp,
286 }
287 }
288
289 pub fn is_allow(&self) -> bool {
290 match self {
291 Self::Authentication(authn) => authn.success,
292 Self::Authorization(authz) => authz.access == Access::Allow,
293 }
294 }
295}
296
297async fn log_acct(record: AcctLogRecord) -> anyhow::Result<()> {
298 tracing::trace!("Audit: {record:?}");
299 if let Some(sender) = LOGGER.get() {
300 sender.send(record).await?;
301 }
302 Ok(())
303}
304
305pub async fn log_authz(record: AuditRecord) -> anyhow::Result<()> {
306 log_acct(AcctLogRecord::Authorization(record)).await
307}
308
309#[derive(Serialize, Deserialize, Debug)]
310pub struct AuthnAuditRecord {
311 pub timestamp: DateTime<Utc>,
313 pub attempted_identity: Identity,
315 pub success: bool,
317 pub auth_info: AuthInfo,
319}
320
321pub async fn log_authn(record: AuthnAuditRecord) -> anyhow::Result<()> {
322 log_acct(AcctLogRecord::Authentication(record)).await
323}