1use camino::Utf8PathBuf;
2use chrono::{DateTime, Utc};
3use chrono_tz::Tz;
4use serde::{Deserialize, Serialize};
5use std::io::Write;
6use std::time::Duration;
7use zstd::stream::write::Encoder;
8
9#[derive(Deserialize, Serialize)]
11pub struct LogWriterConfig {
12 pub log_dir: Utf8PathBuf,
14 #[serde(default = "default_max_file_size")]
17 pub max_file_size: u64,
18 #[serde(default = "default_compression_level")]
20 pub compression_level: i32,
21 #[serde(
24 default,
25 with = "duration_serde",
26 skip_serializing_if = "Option::is_none"
27 )]
28 pub max_segment_duration: Option<Duration>,
29 #[serde(default, skip_serializing_if = "Option::is_none")]
31 pub suffix: Option<String>,
32 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub tz: Option<Tz>,
36}
37
38fn default_max_file_size() -> u64 {
39 128 * 1024 * 1024
40}
41
42fn default_compression_level() -> i32 {
43 3
44}
45
46impl LogWriterConfig {
47 pub fn new(log_dir: Utf8PathBuf) -> Self {
48 Self {
49 log_dir,
50 max_file_size: default_max_file_size(),
51 compression_level: default_compression_level(),
52 max_segment_duration: None,
53 suffix: None,
54 tz: None,
55 }
56 }
57
58 pub fn max_file_size(mut self, size: u64) -> Self {
59 self.max_file_size = size;
60 self
61 }
62
63 pub fn compression_level(mut self, level: i32) -> Self {
64 self.compression_level = level;
65 self
66 }
67
68 pub fn max_segment_duration(mut self, duration: Duration) -> Self {
69 self.max_segment_duration = Some(duration);
70 self
71 }
72
73 pub fn suffix(mut self, suffix: impl Into<String>) -> Self {
74 self.suffix = Some(suffix.into());
75 self
76 }
77
78 pub fn tz(mut self, tz: Tz) -> Self {
79 self.tz = Some(tz);
80 self
81 }
82
83 pub fn build(self) -> LogWriter {
85 LogWriter {
86 log_dir: self.log_dir,
87 max_file_size: self.max_file_size,
88 compression_level: self.compression_level,
89 max_segment_duration: self.max_segment_duration,
90 suffix: self.suffix,
91 tz: self.tz,
92 current: None,
93 }
94 }
95}
96
97struct OpenSegment {
99 encoder: Encoder<'static, std::fs::File>,
100 path: Utf8PathBuf,
101 bytes_written: u64,
102 opened_at: std::time::Instant,
103}
104
105pub struct LogWriter {
108 log_dir: Utf8PathBuf,
109 max_file_size: u64,
110 compression_level: i32,
111 max_segment_duration: Option<Duration>,
112 suffix: Option<String>,
113 tz: Option<Tz>,
114 current: Option<OpenSegment>,
115}
116
117impl LogWriter {
118 pub fn write_line(&mut self, line: &str) -> anyhow::Result<()> {
125 if self.current.is_none() {
126 self.open_segment()?;
127 }
128
129 let seg = self.current.as_mut().expect("just opened");
130
131 seg.encoder.write_all(line.as_bytes())?;
132 seg.bytes_written += line.len() as u64;
133
134 if !line.ends_with('\n') {
136 seg.encoder.write_all(b"\n")?;
137 seg.bytes_written += 1;
138 }
139
140 if self.should_roll() {
142 self.close_segment()?;
143 }
144
145 Ok(())
146 }
147
148 pub fn write_value<S: serde::Serialize>(&mut self, value: &S) -> anyhow::Result<()> {
150 let json = serde_json::to_string(value)?;
151 self.write_line(&json)
152 }
153
154 pub fn maintain(&mut self) -> anyhow::Result<()> {
158 if self.current.is_some() && self.duration_exceeded() {
159 self.close_segment()?;
160 }
161 Ok(())
162 }
163
164 pub fn close(&mut self) -> anyhow::Result<()> {
167 if self.current.is_some() {
168 self.close_segment()?;
169 }
170 Ok(())
171 }
172
173 pub fn flush_without_marking_done(&mut self) -> anyhow::Result<()> {
177 if let Some(seg) = self.current.take() {
178 seg.encoder.finish()?;
179 }
180 Ok(())
181 }
182
183 fn should_roll(&self) -> bool {
184 let Some(seg) = &self.current else {
185 return false;
186 };
187 if seg.bytes_written >= self.max_file_size {
188 return true;
189 }
190 self.duration_exceeded()
191 }
192
193 fn duration_exceeded(&self) -> bool {
194 let Some(seg) = &self.current else {
195 return false;
196 };
197 if let Some(max_dur) = self.max_segment_duration {
198 if seg.opened_at.elapsed() >= max_dur {
199 return true;
200 }
201 }
202 false
203 }
204
205 fn open_segment(&mut self) -> anyhow::Result<()> {
206 let now: DateTime<Utc> = Utc::now();
207 let mut base_name = match &self.tz {
208 Some(tz) => now.with_timezone(tz).format("%Y%m%d-%H%M%S%.f").to_string(),
209 None => now.format("%Y%m%d-%H%M%S%.f").to_string(),
210 };
211 if let Some(suffix) = &self.suffix {
212 base_name.push_str(suffix);
213 }
214 let path = self.log_dir.join(base_name);
215
216 std::fs::create_dir_all(&self.log_dir)?;
217
218 let file = std::fs::File::create(path.as_std_path())?;
219 let encoder = Encoder::new(file, self.compression_level)?;
220
221 self.current = Some(OpenSegment {
222 encoder,
223 path,
224 bytes_written: 0,
225 opened_at: std::time::Instant::now(),
226 });
227
228 Ok(())
229 }
230
231 fn close_segment(&mut self) -> anyhow::Result<()> {
232 if let Some(seg) = self.current.take() {
233 seg.encoder.finish()?;
235 mark_segment_done(&seg.path)?;
238 }
239 Ok(())
240 }
241}
242
243impl Drop for LogWriter {
244 fn drop(&mut self) {
245 let _ = self.close();
247 }
248}
249
250fn mark_segment_done(path: &Utf8PathBuf) -> std::io::Result<()> {
252 let meta = std::fs::metadata(path.as_std_path())?;
253 let mut perms = meta.permissions();
254 perms.set_readonly(true);
255 std::fs::set_permissions(path.as_std_path(), perms)
256}