kumo_jsonl/batch.rs
1use camino::Utf8PathBuf;
2
3/// Callback that writes a checkpoint when invoked.
4type CommitFn = Box<dyn FnOnce() -> anyhow::Result<()> + Send>;
5
6/// A batch of log records yielded by the tailer stream.
7///
8/// Each record is a parsed JSON value. A batch may contain records
9/// from multiple segment files when a file boundary is crossed while
10/// filling the batch.
11///
12/// Call [`LogBatch::commit`] after processing the batch to advance
13/// the checkpoint. If `commit` is not called the checkpoint remains
14/// at its prior position, so the records in this batch will be
15/// re-read on the next run.
16pub struct LogBatch {
17 /// The name of the consumer this batch belongs to.
18 consumer_name: String,
19 /// The parsed JSON records that comprise the batch.
20 records: Vec<serde_json::Value>,
21 /// The list of unique segment file names that were
22 /// the source of data for the records.
23 file_names: Vec<Utf8PathBuf>,
24 /// The indices of this vec correspond to the indices of `records`.
25 /// The elements in the vec are the indices into `file_names`
26 /// of the file name from which the record was read.
27 line_to_file_name: Vec<usize>,
28 /// The indices of this vec correspond to the indices of `records`.
29 /// The elements in the vec are the byte offset within the
30 /// decompressed stream of the start of that record.
31 byte_offsets: Vec<u64>,
32 /// Callback that writes the checkpoint for this batch.
33 /// Set by the stream before yielding. Consumed by `commit()`.
34 commit_fn: Option<CommitFn>,
35}
36
37impl LogBatch {
38 /// Create a new empty batch with a default consumer name.
39 pub fn new() -> Self {
40 Self::with_consumer_name(String::new())
41 }
42
43 /// Create a new empty batch for the named consumer.
44 pub fn with_consumer_name(name: String) -> Self {
45 Self {
46 consumer_name: name,
47 records: Vec::new(),
48 file_names: Vec::new(),
49 line_to_file_name: Vec::new(),
50 byte_offsets: Vec::new(),
51 commit_fn: None,
52 }
53 }
54
55 /// Add a pre-parsed JSON value to the batch.
56 pub(crate) fn push_value(
57 &mut self,
58 value: serde_json::Value,
59 segment: &Utf8PathBuf,
60 byte_offset: u64,
61 ) {
62 let file_idx = match self.file_names.iter().rposition(|f| f == segment) {
63 Some(idx) => idx,
64 None => {
65 self.file_names.push(segment.clone());
66 self.file_names.len() - 1
67 }
68 };
69 self.records.push(value);
70 self.line_to_file_name.push(file_idx);
71 self.byte_offsets.push(byte_offset);
72 }
73
74 /// Set the commit callback. Called by the stream before yielding.
75 pub(crate) fn set_commit_fn(&mut self, f: CommitFn) {
76 self.commit_fn = Some(f);
77 }
78
79 /// The name of the consumer this batch belongs to.
80 pub fn consumer_name(&self) -> &str {
81 &self.consumer_name
82 }
83
84 /// Advance the checkpoint to the end of this batch.
85 ///
86 /// This confirms that the caller has successfully processed
87 /// the batch. If checkpointing is not enabled, or if this
88 /// batch has already been committed, this is a no-op.
89 pub fn commit(&mut self) -> anyhow::Result<()> {
90 if let Some(f) = self.commit_fn.take() {
91 f()?;
92 }
93 Ok(())
94 }
95
96 /// The number of records in this batch.
97 pub fn len(&self) -> usize {
98 self.records.len()
99 }
100
101 /// Whether the batch is empty.
102 pub fn is_empty(&self) -> bool {
103 self.records.is_empty()
104 }
105
106 /// The parsed JSON records.
107 pub fn records(&self) -> &[serde_json::Value] {
108 &self.records
109 }
110
111 /// The list of unique segment file names that contributed records
112 /// to this batch.
113 pub fn file_names(&self) -> &[Utf8PathBuf] {
114 &self.file_names
115 }
116
117 /// Return the segment file name for the record at `index`.
118 pub fn file_name_for_line(&self, index: usize) -> &Utf8PathBuf {
119 &self.file_names[self.line_to_file_name[index]]
120 }
121
122 /// Return the byte offset in the decompressed stream for the record
123 /// at `index`.
124 pub fn byte_offset_for_line(&self, index: usize) -> u64 {
125 self.byte_offsets[index]
126 }
127}
128
129impl Default for LogBatch {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl AsRef<[serde_json::Value]> for LogBatch {
136 fn as_ref(&self) -> &[serde_json::Value] {
137 &self.records
138 }
139}