kumo_jsonl/
batch.rs

1use camino::Utf8PathBuf;
2
3/// Callback that writes a checkpoint when invoked.
4type CommitFn = Box<dyn FnOnce() -> anyhow::Result<()> + Send>;
5
6/// A batch of log records yielded by the tailer stream.
7///
8/// Each record is a parsed JSON value.  A batch may contain records
9/// from multiple segment files when a file boundary is crossed while
10/// filling the batch.
11///
12/// Call [`LogBatch::commit`] after processing the batch to advance
13/// the checkpoint.  If `commit` is not called the checkpoint remains
14/// at its prior position, so the records in this batch will be
15/// re-read on the next run.
16pub struct LogBatch {
17    /// The name of the consumer this batch belongs to.
18    consumer_name: String,
19    /// The parsed JSON records that comprise the batch.
20    records: Vec<serde_json::Value>,
21    /// The list of unique segment file names that were
22    /// the source of data for the records.
23    file_names: Vec<Utf8PathBuf>,
24    /// The indices of this vec correspond to the indices of `records`.
25    /// The elements in the vec are the indices into `file_names`
26    /// of the file name from which the record was read.
27    line_to_file_name: Vec<usize>,
28    /// The indices of this vec correspond to the indices of `records`.
29    /// The elements in the vec are the byte offset within the
30    /// decompressed stream of the start of that record.
31    byte_offsets: Vec<u64>,
32    /// Callback that writes the checkpoint for this batch.
33    /// Set by the stream before yielding.  Consumed by `commit()`.
34    commit_fn: Option<CommitFn>,
35}
36
37impl LogBatch {
38    /// Create a new empty batch with a default consumer name.
39    pub fn new() -> Self {
40        Self::with_consumer_name(String::new())
41    }
42
43    /// Create a new empty batch for the named consumer.
44    pub fn with_consumer_name(name: String) -> Self {
45        Self {
46            consumer_name: name,
47            records: Vec::new(),
48            file_names: Vec::new(),
49            line_to_file_name: Vec::new(),
50            byte_offsets: Vec::new(),
51            commit_fn: None,
52        }
53    }
54
55    /// Add a pre-parsed JSON value to the batch.
56    pub(crate) fn push_value(
57        &mut self,
58        value: serde_json::Value,
59        segment: &Utf8PathBuf,
60        byte_offset: u64,
61    ) {
62        let file_idx = match self.file_names.iter().rposition(|f| f == segment) {
63            Some(idx) => idx,
64            None => {
65                self.file_names.push(segment.clone());
66                self.file_names.len() - 1
67            }
68        };
69        self.records.push(value);
70        self.line_to_file_name.push(file_idx);
71        self.byte_offsets.push(byte_offset);
72    }
73
74    /// Set the commit callback.  Called by the stream before yielding.
75    pub(crate) fn set_commit_fn(&mut self, f: CommitFn) {
76        self.commit_fn = Some(f);
77    }
78
79    /// The name of the consumer this batch belongs to.
80    pub fn consumer_name(&self) -> &str {
81        &self.consumer_name
82    }
83
84    /// Advance the checkpoint to the end of this batch.
85    ///
86    /// This confirms that the caller has successfully processed
87    /// the batch.  If checkpointing is not enabled, or if this
88    /// batch has already been committed, this is a no-op.
89    pub fn commit(&mut self) -> anyhow::Result<()> {
90        if let Some(f) = self.commit_fn.take() {
91            f()?;
92        }
93        Ok(())
94    }
95
96    /// The number of records in this batch.
97    pub fn len(&self) -> usize {
98        self.records.len()
99    }
100
101    /// Whether the batch is empty.
102    pub fn is_empty(&self) -> bool {
103        self.records.is_empty()
104    }
105
106    /// The parsed JSON records.
107    pub fn records(&self) -> &[serde_json::Value] {
108        &self.records
109    }
110
111    /// The list of unique segment file names that contributed records
112    /// to this batch.
113    pub fn file_names(&self) -> &[Utf8PathBuf] {
114        &self.file_names
115    }
116
117    /// Return the segment file name for the record at `index`.
118    pub fn file_name_for_line(&self, index: usize) -> &Utf8PathBuf {
119        &self.file_names[self.line_to_file_name[index]]
120    }
121
122    /// Return the byte offset in the decompressed stream for the record
123    /// at `index`.
124    pub fn byte_offset_for_line(&self, index: usize) -> u64 {
125        self.byte_offsets[index]
126    }
127}
128
129impl Default for LogBatch {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl AsRef<[serde_json::Value]> for LogBatch {
136    fn as_ref(&self) -> &[serde_json::Value] {
137        &self.records
138    }
139}