kumo_jsonl/
decompress.rs

1use std::collections::VecDeque;
2use std::io::{BufRead, BufReader};
3use thiserror::Error;
4use zstd_safe::{DCtx, InBuffer, OutBuffer};
5
6#[derive(Error, Debug)]
7#[error("{}", zstd_safe::get_error_name(*.0))]
8pub struct ZStdError(pub usize);
9
10/// A line extracted from the decompressed stream, along with its
11/// byte offset in the decompressed data.
12pub struct DecompressedLine {
13    pub text: String,
14    pub byte_offset: u64,
15}
16
17/// State for incremental zstd decompression and line extraction from a single file.
18pub struct FileDecompressor {
19    file: BufReader<std::fs::File>,
20    context: DCtx<'static>,
21    out_buffer: Vec<u8>,
22    /// Start of the next unprocessed line in `out_buffer`.
23    line_start: usize,
24    /// Number of valid bytes in `out_buffer`.
25    out_pos: usize,
26    /// Total number of lines decompressed so far.
27    lines_decompressed: usize,
28    /// Global line index up to which lines have been consumed or skipped.
29    /// This is the value that should be used for checkpointing.
30    /// Equals skip_before + number of lines actually returned to caller.
31    pub lines_consumed: usize,
32    /// Buffered lines that have been extracted but not yet consumed.
33    pending_lines: VecDeque<DecompressedLine>,
34    /// Whether we've seen EOF on the compressed input.
35    saw_eof: bool,
36    /// Cumulative byte offset in the decompressed stream.
37    /// Tracks the position of `line_start` relative to the start
38    /// of the decompressed output.
39    decompressed_offset: u64,
40}
41
42impl FileDecompressor {
43    /// Open a file and prepare for incremental zstd decompression.
44    pub fn open(path: &std::path::Path) -> anyhow::Result<Self> {
45        let file = BufReader::new(
46            std::fs::File::open(path)
47                .map_err(|e| anyhow::anyhow!("opening {} for read: {e}", path.display()))?,
48        );
49        let mut context = DCtx::create();
50        context
51            .init()
52            .map_err(ZStdError)
53            .map_err(|e| anyhow::anyhow!("initialize zstd decompression context: {e}"))?;
54        context
55            .load_dictionary(&[])
56            .map_err(ZStdError)
57            .map_err(|e| anyhow::anyhow!("load empty dictionary: {e}"))?;
58
59        Ok(Self {
60            file,
61            context,
62            out_buffer: vec![0u8; DCtx::out_size()],
63            line_start: 0,
64            out_pos: 0,
65            lines_decompressed: 0,
66            lines_consumed: 0,
67            pending_lines: VecDeque::new(),
68            saw_eof: false,
69            decompressed_offset: 0,
70        })
71    }
72
73    /// Get the next line from this file.
74    ///
75    /// `skip_before`: lines with index < skip_before are discarded.
76    ///
77    /// Returns:
78    /// - `Ok(Some(line))` — a complete line was extracted.
79    /// - `Ok(None)` — no more data available right now. The caller should check
80    ///   if the file is done or retry later.
81    pub fn next_line(&mut self, skip_before: usize) -> anyhow::Result<Option<DecompressedLine>> {
82        // Return a buffered line if available
83        if let Some(line) = self.pending_lines.pop_front() {
84            self.lines_consumed += 1;
85            return Ok(Some(line));
86        }
87
88        // If we previously saw EOF and have no buffered lines, signal EOF
89        if self.saw_eof {
90            return Ok(None);
91        }
92
93        // Account for skipped lines in lines_consumed
94        if self.lines_consumed < skip_before {
95            self.lines_consumed = skip_before;
96        }
97
98        // Read and decompress more data
99        loop {
100            let in_buffer = self.file.fill_buf()?;
101            if in_buffer.is_empty() {
102                self.saw_eof = true;
103                // Return any buffered line
104                if let Some(line) = self.pending_lines.pop_front() {
105                    self.lines_consumed += 1;
106                    return Ok(Some(line));
107                }
108                return Ok(None);
109            }
110
111            let mut src = InBuffer::around(in_buffer);
112            let mut dest = OutBuffer::around_pos(&mut self.out_buffer, self.out_pos);
113
114            self.context
115                .decompress_stream(&mut dest, &mut src)
116                .map_err(ZStdError)
117                .map_err(|e| anyhow::anyhow!("zstd decompress: {e}"))?;
118
119            let bytes_read = {
120                let pos = src.pos();
121                drop(src);
122                pos
123            };
124            self.file.consume(bytes_read);
125            self.out_pos = dest.pos();
126
127            // Extract complete lines
128            while let Some(idx) =
129                memchr::memchr(b'\n', &self.out_buffer[self.line_start..self.out_pos])
130            {
131                let line_byte_offset = self.decompressed_offset;
132                if self.lines_decompressed >= skip_before {
133                    let this_line = &self.out_buffer[self.line_start..self.line_start + idx];
134                    let line = String::from_utf8_lossy(this_line).into_owned();
135                    self.pending_lines.push_back(DecompressedLine {
136                        text: line,
137                        byte_offset: line_byte_offset,
138                    });
139                }
140                // Advance past the line content + newline
141                let consumed = idx + 1;
142                self.decompressed_offset += consumed as u64;
143                self.line_start += consumed;
144                self.lines_decompressed += 1;
145            }
146
147            // Compact the output buffer
148            if self.line_start == self.out_pos {
149                self.out_pos = 0;
150                self.line_start = 0;
151            } else if self.line_start > 0 {
152                self.out_buffer
153                    .copy_within(self.line_start..self.out_pos, 0);
154                self.out_pos -= self.line_start;
155                self.line_start = 0;
156            }
157
158            // If we extracted any lines, return the first one
159            if let Some(line) = self.pending_lines.pop_front() {
160                self.lines_consumed += 1;
161                return Ok(Some(line));
162            }
163
164            // No complete lines yet; read more data
165        }
166    }
167
168    /// Reset the EOF flag so we can try reading more data
169    /// (useful when tailing a file that is still being written to).
170    pub fn reset_eof(&mut self) {
171        self.saw_eof = false;
172    }
173
174    /// Returns true if there is partial (incomplete line) data remaining
175    /// in the output buffer.
176    pub fn has_partial_data(&self) -> bool {
177        self.out_pos > 0
178    }
179}