kumo_jsonl/
tailer.rs

1use crate::batch::LogBatch;
2use crate::checkpoint::CheckpointData;
3use crate::decompress::FileDecompressor;
4use camino::Utf8PathBuf;
5use filenamegen::Glob;
6use futures::Stream;
7use notify::event::{CreateKind, ModifyKind};
8use notify::{Event, EventKind, Watcher};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Notify;
14
15// ---------------------------------------------------------------------------
16// Default helpers
17// ---------------------------------------------------------------------------
18
19fn default_pattern() -> String {
20    "*".to_string()
21}
22
23fn default_max_batch_size() -> usize {
24    100
25}
26
27fn default_max_batch_latency() -> Duration {
28    Duration::from_secs(1)
29}
30
31// ---------------------------------------------------------------------------
32// ConsumerConfig
33// ---------------------------------------------------------------------------
34
35/// Per-consumer batching, checkpoint, and filter configuration.
36pub struct ConsumerConfig {
37    /// A name that identifies this consumer.  Returned by
38    /// [`LogBatch::consumer_name`].
39    pub name: String,
40    /// Maximum number of records per batch.
41    pub max_batch_size: usize,
42    /// Maximum time to wait for a partial batch to fill before yielding it.
43    pub max_batch_latency: Duration,
44    /// If set, enables checkpoint persistence with this name.
45    /// The checkpoint file will be stored as `.<name>` in the log directory.
46    pub checkpoint_name: Option<String>,
47    /// Optional filter applied to each record.  If the filter returns
48    /// `Ok(false)` the record is not added to this consumer's batch.
49    pub filter: Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>,
50}
51
52impl ConsumerConfig {
53    pub fn new(name: impl Into<String>) -> Self {
54        Self {
55            name: name.into(),
56            max_batch_size: default_max_batch_size(),
57            max_batch_latency: default_max_batch_latency(),
58            checkpoint_name: None,
59            filter: None,
60        }
61    }
62
63    pub fn max_batch_size(mut self, size: usize) -> Self {
64        self.max_batch_size = size;
65        self
66    }
67
68    pub fn max_batch_latency(mut self, latency: Duration) -> Self {
69        self.max_batch_latency = latency;
70        self
71    }
72
73    pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
74        self.checkpoint_name = Some(name.into());
75        self
76    }
77
78    pub fn filter<F>(mut self, f: F) -> Self
79    where
80        F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
81    {
82        self.filter = Some(Box::new(f));
83        self
84    }
85}
86
87// ---------------------------------------------------------------------------
88// MultiConsumerTailerConfig
89// ---------------------------------------------------------------------------
90
91/// Configuration for a tailer that fans out records to multiple consumers.
92pub struct MultiConsumerTailerConfig {
93    /// The directory containing zstd-compressed JSONL log files.
94    pub directory: Utf8PathBuf,
95    /// Glob pattern for matching log filenames.
96    pub pattern: String,
97    /// If set, use a polling-based filesystem watcher.
98    pub poll_watcher: Option<Duration>,
99    /// If true, ignore checkpoints and start from the most recent segment.
100    pub tail: bool,
101    /// The set of consumers that receive records.
102    pub consumers: Vec<ConsumerConfig>,
103}
104
105impl MultiConsumerTailerConfig {
106    pub fn new(directory: Utf8PathBuf, consumers: Vec<ConsumerConfig>) -> Self {
107        Self {
108            directory,
109            pattern: default_pattern(),
110            poll_watcher: None,
111            tail: false,
112            consumers,
113        }
114    }
115
116    pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
117        self.pattern = pattern.into();
118        self
119    }
120
121    pub fn poll_watcher(mut self, interval: Duration) -> Self {
122        self.poll_watcher = Some(interval);
123        self
124    }
125
126    pub fn tail(mut self, enable: bool) -> Self {
127        self.tail = enable;
128        self
129    }
130
131    /// Build the multi-consumer tailer.
132    pub async fn build(self) -> anyhow::Result<MultiConsumerTailer> {
133        // Collect checkpoint paths without borrowing consumers across
134        // an await (consumers contains non-Sync filter closures).
135        let cp_paths: Vec<Option<Utf8PathBuf>> = self
136            .consumers
137            .iter()
138            .map(|c| {
139                c.checkpoint_name
140                    .as_ref()
141                    .map(|name| self.directory.join(format!(".{name}")))
142            })
143            .collect();
144
145        // Now load checkpoints (async) without borrowing consumers.
146        let mut consumer_checkpoints: Vec<Option<CheckpointData>> =
147            Vec::with_capacity(cp_paths.len());
148        let mut earliest_checkpoint: Option<CheckpointData> = None;
149
150        for cp_path in &cp_paths {
151            let cp = if self.tail {
152                resolve_tail_checkpoint(&self.directory, &self.pattern)?
153            } else if let Some(cp_path) = cp_path {
154                CheckpointData::load(cp_path).await?
155            } else {
156                None
157            };
158
159            match (&earliest_checkpoint, &cp) {
160                (None, Some(cp)) => {
161                    earliest_checkpoint = Some(cp.clone());
162                }
163                (Some(existing), Some(cp)) => {
164                    if cp.file < existing.file
165                        || (cp.file == existing.file && cp.line < existing.line)
166                    {
167                        earliest_checkpoint = Some(cp.clone());
168                    }
169                }
170                _ => {}
171            }
172
173            consumer_checkpoints.push(cp);
174        }
175
176        let closed = Arc::new(AtomicBool::new(false));
177        let close_notify = Arc::new(Notify::new());
178
179        let fs_notify = Arc::new(Notify::new());
180        let fs_notify_tx = fs_notify.clone();
181        let event_handler = move |res: Result<Event, _>| match res {
182            Ok(event) => match event.kind {
183                EventKind::Create(CreateKind::File) | EventKind::Modify(ModifyKind::Data(_)) => {
184                    fs_notify_tx.notify_one();
185                }
186                _ => {}
187            },
188            Err(_) => {}
189        };
190        let mut watcher: Box<dyn Watcher + Send> = if let Some(interval) = self.poll_watcher {
191            Box::new(notify::PollWatcher::new(
192                event_handler,
193                notify::Config::default().with_poll_interval(interval),
194            )?)
195        } else {
196            Box::new(notify::recommended_watcher(event_handler)?)
197        };
198        watcher.watch(
199            &self.directory.clone().into_std_path_buf(),
200            notify::RecursiveMode::NonRecursive,
201        )?;
202
203        let shared = Arc::new(TailerShared {
204            closed,
205            close_notify,
206        });
207
208        let stream = make_multi_stream(
209            self.directory,
210            self.pattern,
211            self.consumers,
212            earliest_checkpoint,
213            consumer_checkpoints,
214            cp_paths,
215            fs_notify,
216            shared.clone(),
217        );
218
219        Ok(MultiConsumerTailer {
220            close_handle: CloseHandle { shared },
221            _watcher: watcher,
222            stream: Box::pin(stream),
223        })
224    }
225}
226
227// ---------------------------------------------------------------------------
228// Shared internals
229// ---------------------------------------------------------------------------
230
231struct TailerShared {
232    closed: Arc<AtomicBool>,
233    close_notify: Arc<Notify>,
234}
235
236/// A `Send + Sync` handle that can close a tailer from any context.
237#[derive(Clone)]
238pub struct CloseHandle {
239    shared: Arc<TailerShared>,
240}
241
242impl CloseHandle {
243    /// Signal the stream to terminate.
244    pub fn close(&self) {
245        self.shared.closed.store(true, Ordering::SeqCst);
246        self.shared.close_notify.notify_waiters();
247    }
248}
249
250// ---------------------------------------------------------------------------
251// MultiConsumerTailer
252// ---------------------------------------------------------------------------
253
254/// An async Stream that yields vectors of [`LogBatch`], one per consumer
255/// whose batch is ready.
256pub struct MultiConsumerTailer {
257    close_handle: CloseHandle,
258    _watcher: Box<dyn Watcher + Send>,
259    stream: std::pin::Pin<Box<dyn Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send>>,
260}
261
262impl MultiConsumerTailer {
263    pub fn close_handle(&self) -> CloseHandle {
264        self.close_handle.clone()
265    }
266
267    pub fn close(&self) {
268        self.close_handle.close();
269    }
270}
271
272impl Stream for MultiConsumerTailer {
273    type Item = anyhow::Result<Vec<LogBatch>>;
274
275    fn poll_next(
276        mut self: std::pin::Pin<&mut Self>,
277        cx: &mut std::task::Context<'_>,
278    ) -> std::task::Poll<Option<Self::Item>> {
279        if self.close_handle.shared.closed.load(Ordering::SeqCst) {
280            return std::task::Poll::Ready(None);
281        }
282        self.stream.as_mut().poll_next(cx)
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Single-consumer LogTailerConfig / LogTailer (delegates to multi-consumer)
288// ---------------------------------------------------------------------------
289
290/// Configuration for constructing a single-consumer [`LogTailer`].
291#[derive(Deserialize, Serialize)]
292pub struct LogTailerConfig {
293    pub directory: Utf8PathBuf,
294    #[serde(default = "default_pattern")]
295    pub pattern: String,
296    #[serde(default = "default_max_batch_size")]
297    pub max_batch_size: usize,
298    #[serde(default = "default_max_batch_latency", with = "duration_serde")]
299    pub max_batch_latency: Duration,
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    pub checkpoint_name: Option<String>,
302    #[serde(
303        default,
304        with = "duration_serde",
305        skip_serializing_if = "Option::is_none"
306    )]
307    pub poll_watcher: Option<Duration>,
308    #[serde(default)]
309    pub tail: bool,
310}
311
312impl LogTailerConfig {
313    pub fn new(directory: Utf8PathBuf) -> Self {
314        Self {
315            directory,
316            pattern: default_pattern(),
317            max_batch_size: default_max_batch_size(),
318            max_batch_latency: default_max_batch_latency(),
319            checkpoint_name: None,
320            poll_watcher: None,
321            tail: false,
322        }
323    }
324
325    pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
326        self.pattern = pattern.into();
327        self
328    }
329
330    pub fn max_batch_size(mut self, size: usize) -> Self {
331        self.max_batch_size = size;
332        self
333    }
334
335    pub fn max_batch_latency(mut self, latency: Duration) -> Self {
336        self.max_batch_latency = latency;
337        self
338    }
339
340    pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
341        self.checkpoint_name = Some(name.into());
342        self
343    }
344
345    pub fn poll_watcher(mut self, interval: Duration) -> Self {
346        self.poll_watcher = Some(interval);
347        self
348    }
349
350    pub fn tail(mut self, enable: bool) -> Self {
351        self.tail = enable;
352        self
353    }
354
355    /// Build a single-consumer tailer.
356    pub async fn build(self) -> anyhow::Result<LogTailer> {
357        self.build_with_filter(None::<fn(&serde_json::Value) -> anyhow::Result<bool>>)
358            .await
359    }
360
361    /// Build a single-consumer tailer with an optional record filter.
362    pub async fn build_with_filter<F>(self, filter: Option<F>) -> anyhow::Result<LogTailer>
363    where
364        F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
365    {
366        let mut consumer = ConsumerConfig::new("default")
367            .max_batch_size(self.max_batch_size)
368            .max_batch_latency(self.max_batch_latency);
369        if let Some(name) = self.checkpoint_name.clone() {
370            consumer = consumer.checkpoint_name(name);
371        }
372        if let Some(f) = filter {
373            consumer = consumer.filter(f);
374        }
375
376        let multi_config = MultiConsumerTailerConfig {
377            directory: self.directory,
378            pattern: self.pattern,
379            poll_watcher: self.poll_watcher,
380            tail: self.tail,
381            consumers: vec![consumer],
382        };
383
384        let multi = multi_config.build().await?;
385
386        Ok(LogTailer { inner: multi })
387    }
388}
389
390/// A single-consumer async Stream that yields one [`LogBatch`] at a time.
391///
392/// This is a convenience wrapper around [`MultiConsumerTailer`] with
393/// exactly one consumer.
394pub struct LogTailer {
395    inner: MultiConsumerTailer,
396}
397
398impl LogTailer {
399    pub fn close_handle(&self) -> CloseHandle {
400        self.inner.close_handle()
401    }
402
403    pub fn close(&self) {
404        self.inner.close();
405    }
406}
407
408impl Stream for LogTailer {
409    type Item = anyhow::Result<LogBatch>;
410
411    fn poll_next(
412        mut self: std::pin::Pin<&mut Self>,
413        cx: &mut std::task::Context<'_>,
414    ) -> std::task::Poll<Option<Self::Item>> {
415        use std::task::Poll;
416        // The inner multi-consumer stream yields Vec<LogBatch> with exactly
417        // one element.  Unwrap it.
418        match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
419            Poll::Ready(Some(Ok(mut batches))) => Poll::Ready(Some(Ok(batches
420                .pop()
421                .expect("single consumer yields one batch")))),
422            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
423            Poll::Ready(None) => Poll::Ready(None),
424            Poll::Pending => Poll::Pending,
425        }
426    }
427}
428
429// ---------------------------------------------------------------------------
430// Shared utilities
431// ---------------------------------------------------------------------------
432
433fn resolve_tail_checkpoint(
434    directory: &Utf8PathBuf,
435    pattern: &str,
436) -> anyhow::Result<Option<CheckpointData>> {
437    let glob = Glob::new(pattern)?;
438    let mut files = vec![];
439    for path in glob.walk(directory) {
440        let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
441        if path.is_file() {
442            files.push(path);
443        }
444    }
445    files.sort();
446    Ok(files.last().map(|f| CheckpointData {
447        file: f.to_string(),
448        line: 0,
449    }))
450}
451
452/// Build the file plan: sorted list of matching files in the directory.
453fn build_plan(
454    directory: &Utf8PathBuf,
455    pattern: &str,
456    checkpoint_paths: &[Option<Utf8PathBuf>],
457    last_processed: &Option<Utf8PathBuf>,
458    checkpoint: &Option<CheckpointData>,
459) -> anyhow::Result<Vec<Utf8PathBuf>> {
460    let glob = Glob::new(pattern)?;
461    let mut result = vec![];
462    for path in glob.walk(directory) {
463        let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
464        // Skip checkpoint files
465        if checkpoint_paths.iter().any(|cp| cp.as_ref() == Some(&path)) {
466            continue;
467        }
468        if path.is_file() {
469            result.push(path);
470        }
471    }
472    result.sort();
473
474    if let Some(last) = last_processed {
475        result.retain(|item| item > last);
476    } else if let Some(cp) = checkpoint {
477        let cp_file = &cp.file;
478        result.retain(|item| item.as_str() >= cp_file.as_str());
479    }
480
481    Ok(result)
482}
483
484fn is_file_done(path: &Utf8PathBuf) -> bool {
485    path.metadata()
486        .map(|m| m.permissions().readonly())
487        .unwrap_or(false)
488}
489
490// ---------------------------------------------------------------------------
491// Per-consumer state used during stream construction
492// ---------------------------------------------------------------------------
493
494// ---------------------------------------------------------------------------
495// Multi-consumer stream
496// ---------------------------------------------------------------------------
497
498fn make_multi_stream(
499    directory: Utf8PathBuf,
500    pattern: String,
501    consumers: Vec<ConsumerConfig>,
502    earliest_checkpoint: Option<CheckpointData>,
503    mut consumer_checkpoints: Vec<Option<CheckpointData>>,
504    cp_paths: Vec<Option<Utf8PathBuf>>,
505    fs_notify: Arc<Notify>,
506    shared: Arc<TailerShared>,
507) -> impl Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send {
508    let num_consumers = consumers.len();
509
510    // Extract per-consumer config into parallel vecs
511    let consumer_names: Vec<String> = consumers.iter().map(|c| c.name.clone()).collect();
512    let max_batch_sizes: Vec<usize> = consumers.iter().map(|c| c.max_batch_size).collect();
513    let max_batch_latencies: Vec<Duration> =
514        consumers.iter().map(|c| c.max_batch_latency).collect();
515    let filters: Vec<Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>> =
516        consumers.into_iter().map(|c| c.filter).collect();
517
518    async_stream::try_stream! {
519        let mut last_processed: Option<Utf8PathBuf> = None;
520        let mut global_checkpoint = earliest_checkpoint;
521        let mut skip_lines: usize;
522        let retry_delay = Duration::from_millis(200);
523
524        // Per-consumer skip lines (for the first file only, when
525        // resuming from checkpoint).  The global skip_lines is the
526        // minimum across all consumers for that file, and individual
527        // consumers that are further ahead will have their records
528        // filtered out by index comparison.
529        let mut consumer_skip: Vec<usize> = vec![0; num_consumers];
530
531        'outer: loop {
532            if shared.closed.load(Ordering::SeqCst) {
533                break;
534            }
535
536            let plan = build_plan(
537                &directory,
538                &pattern,
539                &cp_paths,
540                &last_processed,
541                &global_checkpoint,
542            )?;
543
544            if plan.is_empty() {
545                tokio::select! {
546                    _ = shared.close_notify.notified() => break,
547                    _ = fs_notify.notified() => continue,
548                }
549            }
550
551            // Determine skip_lines from the earliest checkpoint
552            if let Some(cp) = &global_checkpoint {
553                if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
554                    skip_lines = cp.line;
555                } else {
556                    skip_lines = 0;
557                }
558            } else {
559                skip_lines = 0;
560            }
561
562            // Determine per-consumer skip lines
563            for i in 0..num_consumers {
564                if let Some(cp) = &consumer_checkpoints[i] {
565                    if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
566                        consumer_skip[i] = cp.line;
567                    } else {
568                        consumer_skip[i] = 0;
569                    }
570                } else {
571                    consumer_skip[i] = 0;
572                }
573            }
574            global_checkpoint.take();
575            for cp in consumer_checkpoints.iter_mut() {
576                cp.take();
577            }
578
579            let mut plan_index = 0;
580            let mut decomp: Option<FileDecompressor> = None;
581            let mut current_path: Option<&Utf8PathBuf> = None;
582            let mut last_lines_consumed: usize = 0;
583            // Track the global line number for the current file so we
584            // can apply per-consumer skip logic.
585            let mut global_line_in_file: usize = skip_lines;
586
587            if let Some(path) = plan.get(plan_index) {
588                let path_std = path.as_std_path().to_owned();
589                decomp = Some(FileDecompressor::open(&path_std)?);
590                current_path = Some(path);
591            }
592
593            // Per-consumer batches and deadlines persist across
594            // fill/yield cycles.  A consumer's batch is "ready" when
595            // it is full or its deadline has expired.  Only ready
596            // batches are yielded; others keep accumulating.
597            let mut batches: Vec<LogBatch> = (0..num_consumers)
598                .map(|i| LogBatch::with_consumer_name(consumer_names[i].clone()))
599                .collect();
600            let mut deadlines: Vec<Option<tokio::time::Instant>> = vec![None; num_consumers];
601
602            while decomp.is_some() {
603                if shared.closed.load(Ordering::SeqCst) {
604                    break 'outer;
605                }
606
607                // Fill batches until at least one is ready
608                'fill: loop {
609                    if shared.closed.load(Ordering::SeqCst) {
610                        break 'outer;
611                    }
612
613                    // Check if any consumer already has a ready batch
614                    let now = tokio::time::Instant::now();
615                    let any_ready = (0..num_consumers).any(|i| {
616                        !batches[i].is_empty()
617                            && (batches[i].len() >= max_batch_sizes[i]
618                                || deadlines[i].map_or(false, |d| now >= d))
619                    });
620                    if any_ready {
621                        break 'fill;
622                    }
623
624                    let d = decomp.as_mut().expect("checked above");
625                    let path = current_path.expect("set with decomp");
626
627                    match d.next_line(skip_lines) {
628                        Ok(Some(line)) => {
629                            let value: serde_json::Value = serde_json::from_str(&line.text)
630                                .map_err(|err| {
631                                    anyhow::anyhow!(
632                                        "Failed to parse a line from {path} (byte offset {}) \
633                                         as json: {err}. Is the file corrupt? You may need \
634                                         to move the file aside to make progress",
635                                        line.byte_offset
636                                    )
637                                })?;
638
639                            for i in 0..num_consumers {
640                                if global_line_in_file < consumer_skip[i] {
641                                    continue;
642                                }
643                                if let Some(ref f) = filters[i] {
644                                    if !f(&value)? {
645                                        continue;
646                                    }
647                                }
648                                batches[i].push_value(
649                                    value.clone(),
650                                    path,
651                                    line.byte_offset,
652                                );
653                                // Start the deadline timer on first record
654                                if deadlines[i].is_none() {
655                                    deadlines[i] = Some(
656                                        tokio::time::Instant::now() + max_batch_latencies[i],
657                                    );
658                                }
659                            }
660                            global_line_in_file += 1;
661                        }
662                        Ok(None) => {
663                            // EOF on current file
664                            if is_file_done(path) {
665                                if d.has_partial_data() {
666                                    Err(anyhow::anyhow!(
667                                        "unexpected EOF for {} with partial line data remaining",
668                                        path
669                                    ))?;
670                                }
671                                last_lines_consumed = d.lines_consumed;
672                                last_processed = Some(path.clone());
673                                skip_lines = 0;
674                                global_line_in_file = 0;
675                                for cs in consumer_skip.iter_mut() {
676                                    *cs = 0;
677                                }
678
679                                plan_index += 1;
680                                if let Some(next_path) = plan.get(plan_index) {
681                                    let path_std = next_path.as_std_path().to_owned();
682                                    decomp = Some(FileDecompressor::open(&path_std)?);
683                                    current_path = Some(next_path);
684                                    continue 'fill;
685                                } else {
686                                    decomp = None;
687                                    current_path = None;
688                                    break 'fill;
689                                }
690                            }
691
692                            // File not done; find the earliest deadline
693                            // among non-empty batches to bound the wait.
694                            let earliest_deadline = (0..num_consumers)
695                                .filter(|&i| !batches[i].is_empty())
696                                .filter_map(|i| deadlines[i])
697                                .min();
698
699                            if let Some(deadline) = earliest_deadline {
700                                let remaining = deadline.saturating_duration_since(
701                                    tokio::time::Instant::now(),
702                                );
703                                if remaining.is_zero() {
704                                    break 'fill;
705                                }
706                                tokio::select! {
707                                    _ = shared.close_notify.notified() => break 'outer,
708                                    _ = tokio::time::sleep(remaining.min(retry_delay)) => {},
709                                    _ = fs_notify.notified() => {},
710                                }
711                            } else {
712                                // All batches empty, file not done — wait
713                                tokio::select! {
714                                    _ = shared.close_notify.notified() => break 'outer,
715                                    _ = tokio::time::sleep(retry_delay) => {},
716                                    _ = fs_notify.notified() => {},
717                                }
718                            }
719                            d.reset_eof();
720                        }
721                        Err(e) => {
722                            Err(e)?;
723                        }
724                    }
725                }
726
727                // Determine which batches are ready to yield
728                let now = tokio::time::Instant::now();
729                let mut ready: Vec<LogBatch> = Vec::new();
730                for i in 0..num_consumers {
731                    let is_ready = !batches[i].is_empty()
732                        && (batches[i].len() >= max_batch_sizes[i]
733                            || deadlines[i].map_or(false, |d| now >= d)
734                            || decomp.is_none()); // end of plan: flush all
735
736                    if !is_ready {
737                        continue;
738                    }
739
740                    // Swap out the ready batch, replace with a fresh one
741                    let mut batch = std::mem::replace(
742                        &mut batches[i],
743                        LogBatch::with_consumer_name(consumer_names[i].clone()),
744                    );
745                    deadlines[i] = None;
746
747                    // Set the commit callback
748                    if let Some(ref cp_path) = cp_paths[i] {
749                        let (cp_file, cp_line) = if let Some(d) = &decomp {
750                            let path = current_path.expect("set with decomp");
751                            (path.clone(), d.lines_consumed)
752                        } else if let Some(last) = &last_processed {
753                            (last.clone(), last_lines_consumed)
754                        } else {
755                            unreachable!("non-empty batch without a source");
756                        };
757                        let cp_path = cp_path.clone();
758                        batch.set_commit_fn(Box::new(move || {
759                            CheckpointData::save_atomic(&cp_path, &cp_file, cp_line)
760                        }));
761                    }
762                    ready.push(batch);
763                }
764
765                if !ready.is_empty() {
766                    skip_lines = 0;
767                    yield ready;
768                }
769            }
770        }
771    }
772}