kumo_jsonl/
writer.rs

1use camino::Utf8PathBuf;
2use chrono::{DateTime, Utc};
3use chrono_tz::Tz;
4use serde::{Deserialize, Serialize};
5use std::io::Write;
6use std::time::Duration;
7use zstd::stream::write::Encoder;
8
9/// Configuration for constructing a [`LogWriter`].
10#[derive(Deserialize, Serialize)]
11pub struct LogWriterConfig {
12    /// Directory where log segment files will be created.
13    pub log_dir: Utf8PathBuf,
14    /// Maximum number of uncompressed bytes written per segment
15    /// before rolling to a new file.
16    #[serde(default = "default_max_file_size")]
17    pub max_file_size: u64,
18    /// Zstd compression level.
19    #[serde(default = "default_compression_level")]
20    pub compression_level: i32,
21    /// If set, the segment will be closed after this duration
22    /// even if max_file_size has not been reached.
23    #[serde(
24        default,
25        with = "duration_serde",
26        skip_serializing_if = "Option::is_none"
27    )]
28    pub max_segment_duration: Option<Duration>,
29    /// Optional suffix appended to segment file names.
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub suffix: Option<String>,
32    /// Timezone used when computing the segment file name.
33    /// Defaults to UTC.
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub tz: Option<Tz>,
36}
37
38fn default_max_file_size() -> u64 {
39    128 * 1024 * 1024
40}
41
42fn default_compression_level() -> i32 {
43    3
44}
45
46impl LogWriterConfig {
47    pub fn new(log_dir: Utf8PathBuf) -> Self {
48        Self {
49            log_dir,
50            max_file_size: default_max_file_size(),
51            compression_level: default_compression_level(),
52            max_segment_duration: None,
53            suffix: None,
54            tz: None,
55        }
56    }
57
58    pub fn max_file_size(mut self, size: u64) -> Self {
59        self.max_file_size = size;
60        self
61    }
62
63    pub fn compression_level(mut self, level: i32) -> Self {
64        self.compression_level = level;
65        self
66    }
67
68    pub fn max_segment_duration(mut self, duration: Duration) -> Self {
69        self.max_segment_duration = Some(duration);
70        self
71    }
72
73    pub fn suffix(mut self, suffix: impl Into<String>) -> Self {
74        self.suffix = Some(suffix.into());
75        self
76    }
77
78    pub fn tz(mut self, tz: Tz) -> Self {
79        self.tz = Some(tz);
80        self
81    }
82
83    /// Build the [`LogWriter`].
84    pub fn build(self) -> LogWriter {
85        LogWriter {
86            log_dir: self.log_dir,
87            max_file_size: self.max_file_size,
88            compression_level: self.compression_level,
89            max_segment_duration: self.max_segment_duration,
90            suffix: self.suffix,
91            tz: self.tz,
92            current: None,
93        }
94    }
95}
96
97/// State for the currently open segment file.
98struct OpenSegment {
99    encoder: Encoder<'static, std::fs::File>,
100    path: Utf8PathBuf,
101    bytes_written: u64,
102    opened_at: std::time::Instant,
103}
104
105/// A log writer that produces zstd-compressed JSONL segment files
106/// in a directory, compatible with [`LogTailer`](crate::LogTailer).
107pub struct LogWriter {
108    log_dir: Utf8PathBuf,
109    max_file_size: u64,
110    compression_level: i32,
111    max_segment_duration: Option<Duration>,
112    suffix: Option<String>,
113    tz: Option<Tz>,
114    current: Option<OpenSegment>,
115}
116
117impl LogWriter {
118    /// Write a JSONL line (record) to the current segment.
119    ///
120    /// If no segment is open, one will be created.  After writing,
121    /// if the segment has exceeded `max_file_size` or
122    /// `max_segment_duration` it will be closed and a new segment
123    /// will be opened on the next write.
124    pub fn write_line(&mut self, line: &str) -> anyhow::Result<()> {
125        if self.current.is_none() {
126            self.open_segment()?;
127        }
128
129        let seg = self.current.as_mut().expect("just opened");
130
131        seg.encoder.write_all(line.as_bytes())?;
132        seg.bytes_written += line.len() as u64;
133
134        // Ensure the record ends with a newline
135        if !line.ends_with('\n') {
136            seg.encoder.write_all(b"\n")?;
137            seg.bytes_written += 1;
138        }
139
140        // Check if we need to roll to a new segment
141        if self.should_roll() {
142            self.close_segment()?;
143        }
144
145        Ok(())
146    }
147
148    /// Serialize `value` to JSON and write it as a JSONL line.
149    pub fn write_value<S: serde::Serialize>(&mut self, value: &S) -> anyhow::Result<()> {
150        let json = serde_json::to_string(value)?;
151        self.write_line(&json)
152    }
153
154    /// Flush and close the current segment if it has exceeded
155    /// `max_segment_duration`.  This is a no-op if no segment is
156    /// open or if the duration has not been exceeded.
157    pub fn maintain(&mut self) -> anyhow::Result<()> {
158        if self.current.is_some() && self.duration_exceeded() {
159            self.close_segment()?;
160        }
161        Ok(())
162    }
163
164    /// Flush and close the current segment, regardless of whether
165    /// it has exceeded any configured constraints.
166    pub fn close(&mut self) -> anyhow::Result<()> {
167        if self.current.is_some() {
168            self.close_segment()?;
169        }
170        Ok(())
171    }
172
173    /// Finish the zstd stream so the data is readable, but do NOT
174    /// mark the file as done (readonly).  This is useful for tests
175    /// that need to simulate an in-progress segment.
176    pub fn flush_without_marking_done(&mut self) -> anyhow::Result<()> {
177        if let Some(seg) = self.current.take() {
178            seg.encoder.finish()?;
179        }
180        Ok(())
181    }
182
183    fn should_roll(&self) -> bool {
184        let Some(seg) = &self.current else {
185            return false;
186        };
187        if seg.bytes_written >= self.max_file_size {
188            return true;
189        }
190        self.duration_exceeded()
191    }
192
193    fn duration_exceeded(&self) -> bool {
194        let Some(seg) = &self.current else {
195            return false;
196        };
197        if let Some(max_dur) = self.max_segment_duration {
198            if seg.opened_at.elapsed() >= max_dur {
199                return true;
200            }
201        }
202        false
203    }
204
205    fn open_segment(&mut self) -> anyhow::Result<()> {
206        let now: DateTime<Utc> = Utc::now();
207        let mut base_name = match &self.tz {
208            Some(tz) => now.with_timezone(tz).format("%Y%m%d-%H%M%S%.f").to_string(),
209            None => now.format("%Y%m%d-%H%M%S%.f").to_string(),
210        };
211        if let Some(suffix) = &self.suffix {
212            base_name.push_str(suffix);
213        }
214        let path = self.log_dir.join(base_name);
215
216        std::fs::create_dir_all(&self.log_dir)?;
217
218        let file = std::fs::File::create(path.as_std_path())?;
219        let encoder = Encoder::new(file, self.compression_level)?;
220
221        self.current = Some(OpenSegment {
222            encoder,
223            path,
224            bytes_written: 0,
225            opened_at: std::time::Instant::now(),
226        });
227
228        Ok(())
229    }
230
231    fn close_segment(&mut self) -> anyhow::Result<()> {
232        if let Some(seg) = self.current.take() {
233            // Finish the zstd stream (flushes and writes the end frame)
234            seg.encoder.finish()?;
235            // Mark the file as done (readonly) to signal to the tailer
236            // that this segment is complete
237            mark_segment_done(&seg.path)?;
238        }
239        Ok(())
240    }
241}
242
243impl Drop for LogWriter {
244    fn drop(&mut self) {
245        // Best-effort close on drop
246        let _ = self.close();
247    }
248}
249
250/// Mark a segment file as done by removing write permissions.
251fn mark_segment_done(path: &Utf8PathBuf) -> std::io::Result<()> {
252    let meta = std::fs::metadata(path.as_std_path())?;
253    let mut perms = meta.permissions();
254    perms.set_readonly(true);
255    std::fs::set_permissions(path.as_std_path(), perms)
256}