1use std::collections::VecDeque;
2use std::io::{BufRead, BufReader};
3use thiserror::Error;
4use zstd_safe::{DCtx, InBuffer, OutBuffer};
5
6#[derive(Error, Debug)]
7#[error("{}", zstd_safe::get_error_name(*.0))]
8pub struct ZStdError(pub usize);
9
10pub struct DecompressedLine {
13 pub text: String,
14 pub byte_offset: u64,
15}
16
17pub struct FileDecompressor {
19 file: BufReader<std::fs::File>,
20 context: DCtx<'static>,
21 out_buffer: Vec<u8>,
22 line_start: usize,
24 out_pos: usize,
26 lines_decompressed: usize,
28 pub lines_consumed: usize,
32 pending_lines: VecDeque<DecompressedLine>,
34 saw_eof: bool,
36 decompressed_offset: u64,
40}
41
42impl FileDecompressor {
43 pub fn open(path: &std::path::Path) -> anyhow::Result<Self> {
45 let file = BufReader::new(
46 std::fs::File::open(path)
47 .map_err(|e| anyhow::anyhow!("opening {} for read: {e}", path.display()))?,
48 );
49 let mut context = DCtx::create();
50 context
51 .init()
52 .map_err(ZStdError)
53 .map_err(|e| anyhow::anyhow!("initialize zstd decompression context: {e}"))?;
54 context
55 .load_dictionary(&[])
56 .map_err(ZStdError)
57 .map_err(|e| anyhow::anyhow!("load empty dictionary: {e}"))?;
58
59 Ok(Self {
60 file,
61 context,
62 out_buffer: vec![0u8; DCtx::out_size()],
63 line_start: 0,
64 out_pos: 0,
65 lines_decompressed: 0,
66 lines_consumed: 0,
67 pending_lines: VecDeque::new(),
68 saw_eof: false,
69 decompressed_offset: 0,
70 })
71 }
72
73 pub fn next_line(&mut self, skip_before: usize) -> anyhow::Result<Option<DecompressedLine>> {
82 if let Some(line) = self.pending_lines.pop_front() {
84 self.lines_consumed += 1;
85 return Ok(Some(line));
86 }
87
88 if self.saw_eof {
90 return Ok(None);
91 }
92
93 if self.lines_consumed < skip_before {
95 self.lines_consumed = skip_before;
96 }
97
98 loop {
100 let in_buffer = self.file.fill_buf()?;
101 if in_buffer.is_empty() {
102 self.saw_eof = true;
103 if let Some(line) = self.pending_lines.pop_front() {
105 self.lines_consumed += 1;
106 return Ok(Some(line));
107 }
108 return Ok(None);
109 }
110
111 let mut src = InBuffer::around(in_buffer);
112 let mut dest = OutBuffer::around_pos(&mut self.out_buffer, self.out_pos);
113
114 self.context
115 .decompress_stream(&mut dest, &mut src)
116 .map_err(ZStdError)
117 .map_err(|e| anyhow::anyhow!("zstd decompress: {e}"))?;
118
119 let bytes_read = {
120 let pos = src.pos();
121 drop(src);
122 pos
123 };
124 self.file.consume(bytes_read);
125 self.out_pos = dest.pos();
126
127 while let Some(idx) =
129 memchr::memchr(b'\n', &self.out_buffer[self.line_start..self.out_pos])
130 {
131 let line_byte_offset = self.decompressed_offset;
132 if self.lines_decompressed >= skip_before {
133 let this_line = &self.out_buffer[self.line_start..self.line_start + idx];
134 let line = String::from_utf8_lossy(this_line).into_owned();
135 self.pending_lines.push_back(DecompressedLine {
136 text: line,
137 byte_offset: line_byte_offset,
138 });
139 }
140 let consumed = idx + 1;
142 self.decompressed_offset += consumed as u64;
143 self.line_start += consumed;
144 self.lines_decompressed += 1;
145 }
146
147 if self.line_start == self.out_pos {
149 self.out_pos = 0;
150 self.line_start = 0;
151 } else if self.line_start > 0 {
152 self.out_buffer
153 .copy_within(self.line_start..self.out_pos, 0);
154 self.out_pos -= self.line_start;
155 self.line_start = 0;
156 }
157
158 if let Some(line) = self.pending_lines.pop_front() {
160 self.lines_consumed += 1;
161 return Ok(Some(line));
162 }
163
164 }
166 }
167
168 pub fn reset_eof(&mut self) {
171 self.saw_eof = false;
172 }
173
174 pub fn has_partial_data(&self) -> bool {
177 self.out_pos > 0
178 }
179}