kumo_jsonl/
tailer.rs

1use crate::batch::LogBatch;
2use crate::checkpoint::CheckpointData;
3use crate::decompress::FileDecompressor;
4use camino::Utf8PathBuf;
5use filenamegen::Glob;
6use futures::Stream;
7use notify::event::{CreateKind, ModifyKind};
8use notify::{Event, EventKind, Watcher};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Notify;
15use tracing::warn;
16
17// ---------------------------------------------------------------------------
18// Default helpers
19// ---------------------------------------------------------------------------
20
21fn default_pattern() -> String {
22    "*".to_string()
23}
24
25fn default_max_batch_size() -> usize {
26    100
27}
28
29fn default_max_batch_latency() -> Duration {
30    Duration::from_secs(1)
31}
32
33// ---------------------------------------------------------------------------
34// ConsumerConfig
35// ---------------------------------------------------------------------------
36
37/// Per-consumer batching, checkpoint, and filter configuration.
38pub struct ConsumerConfig {
39    /// A name that identifies this consumer.  Returned by
40    /// [`LogBatch::consumer_name`].
41    pub name: String,
42    /// Maximum number of records per batch.
43    pub max_batch_size: usize,
44    /// Maximum time to wait for a partial batch to fill before yielding it.
45    pub max_batch_latency: Duration,
46    /// If set, enables checkpoint persistence with this name.
47    /// The checkpoint file will be stored as `.<name>` in the log directory.
48    pub checkpoint_name: Option<String>,
49    /// Optional filter applied to each record.  If the filter returns
50    /// `Ok(false)` the record is not added to this consumer's batch.
51    pub filter: Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>,
52}
53
54impl ConsumerConfig {
55    pub fn new(name: impl Into<String>) -> Self {
56        Self {
57            name: name.into(),
58            max_batch_size: default_max_batch_size(),
59            max_batch_latency: default_max_batch_latency(),
60            checkpoint_name: None,
61            filter: None,
62        }
63    }
64
65    pub fn max_batch_size(mut self, size: usize) -> Self {
66        self.max_batch_size = size;
67        self
68    }
69
70    pub fn max_batch_latency(mut self, latency: Duration) -> Self {
71        self.max_batch_latency = latency;
72        self
73    }
74
75    pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
76        self.checkpoint_name = Some(name.into());
77        self
78    }
79
80    pub fn filter<F>(mut self, f: F) -> Self
81    where
82        F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
83    {
84        self.filter = Some(Box::new(f));
85        self
86    }
87}
88
89// ---------------------------------------------------------------------------
90// MultiConsumerTailerConfig
91// ---------------------------------------------------------------------------
92
93/// Configuration for a tailer that fans out records to multiple consumers.
94pub struct MultiConsumerTailerConfig {
95    /// The directory containing zstd-compressed JSONL log files.
96    pub directory: Utf8PathBuf,
97    /// Glob pattern for matching log filenames.
98    pub pattern: String,
99    /// If set, use a polling-based filesystem watcher.
100    pub poll_watcher: Option<Duration>,
101    /// If true, ignore checkpoints and start from the most recent segment.
102    pub tail: bool,
103    /// The set of consumers that receive records.
104    pub consumers: Vec<ConsumerConfig>,
105}
106
107impl MultiConsumerTailerConfig {
108    pub fn new(directory: Utf8PathBuf, consumers: Vec<ConsumerConfig>) -> Self {
109        Self {
110            directory,
111            pattern: default_pattern(),
112            poll_watcher: None,
113            tail: false,
114            consumers,
115        }
116    }
117
118    pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
119        self.pattern = pattern.into();
120        self
121    }
122
123    pub fn poll_watcher(mut self, interval: Duration) -> Self {
124        self.poll_watcher = Some(interval);
125        self
126    }
127
128    pub fn tail(mut self, enable: bool) -> Self {
129        self.tail = enable;
130        self
131    }
132
133    /// Build the multi-consumer tailer.
134    pub async fn build(self) -> anyhow::Result<MultiConsumerTailer> {
135        // Collect checkpoint paths without borrowing consumers across
136        // an await (consumers contains non-Sync filter closures).
137        let cp_paths: Vec<Option<Utf8PathBuf>> = self
138            .consumers
139            .iter()
140            .map(|c| {
141                c.checkpoint_name
142                    .as_ref()
143                    .map(|name| self.directory.join(format!(".{name}")))
144            })
145            .collect();
146
147        // Now load checkpoints (async) without borrowing consumers.
148        let mut consumer_checkpoints: Vec<Option<CheckpointData>> =
149            Vec::with_capacity(cp_paths.len());
150        let mut earliest_checkpoint: Option<CheckpointData> = None;
151
152        for cp_path in &cp_paths {
153            let cp = if self.tail {
154                resolve_tail_checkpoint(&self.directory, &self.pattern)?
155            } else if let Some(cp_path) = cp_path {
156                CheckpointData::load(cp_path).await?
157            } else {
158                None
159            };
160
161            match (&earliest_checkpoint, &cp) {
162                (None, Some(cp)) => {
163                    earliest_checkpoint = Some(cp.clone());
164                }
165                (Some(existing), Some(cp)) => {
166                    if cp.file < existing.file
167                        || (cp.file == existing.file && cp.line < existing.line)
168                    {
169                        earliest_checkpoint = Some(cp.clone());
170                    }
171                }
172                _ => {}
173            }
174
175            consumer_checkpoints.push(cp);
176        }
177
178        let closed = Arc::new(AtomicBool::new(false));
179        let close_notify = Arc::new(Notify::new());
180
181        let fs_notify = Arc::new(Notify::new());
182        let fs_notify_tx = fs_notify.clone();
183        let event_handler = move |res: Result<Event, _>| match res {
184            Ok(event) => match event.kind {
185                EventKind::Create(CreateKind::File) | EventKind::Modify(ModifyKind::Data(_)) => {
186                    fs_notify_tx.notify_one();
187                }
188                _ => {}
189            },
190            Err(_) => {}
191        };
192        let mut watcher: Box<dyn Watcher + Send> = if let Some(interval) = self.poll_watcher {
193            Box::new(notify::PollWatcher::new(
194                event_handler,
195                notify::Config::default().with_poll_interval(interval),
196            )?)
197        } else {
198            Box::new(notify::recommended_watcher(event_handler)?)
199        };
200        watcher.watch(
201            &self.directory.clone().into_std_path_buf(),
202            notify::RecursiveMode::NonRecursive,
203        )?;
204
205        let shared = Arc::new(TailerShared {
206            closed,
207            close_notify,
208        });
209
210        let stream = make_multi_stream(
211            self.directory,
212            self.pattern,
213            self.consumers,
214            earliest_checkpoint,
215            consumer_checkpoints,
216            cp_paths,
217            fs_notify,
218            shared.clone(),
219        );
220
221        Ok(MultiConsumerTailer {
222            close_handle: CloseHandle { shared },
223            _watcher: watcher,
224            stream: Box::pin(stream),
225        })
226    }
227}
228
229// ---------------------------------------------------------------------------
230// Shared internals
231// ---------------------------------------------------------------------------
232
233struct TailerShared {
234    closed: Arc<AtomicBool>,
235    close_notify: Arc<Notify>,
236}
237
238/// A `Send + Sync` handle that can close a tailer from any context.
239#[derive(Clone)]
240pub struct CloseHandle {
241    shared: Arc<TailerShared>,
242}
243
244impl CloseHandle {
245    /// Signal the stream to terminate.
246    pub fn close(&self) {
247        self.shared.closed.store(true, Ordering::SeqCst);
248        self.shared.close_notify.notify_waiters();
249    }
250}
251
252// ---------------------------------------------------------------------------
253// MultiConsumerTailer
254// ---------------------------------------------------------------------------
255
256/// An async Stream that yields vectors of [`LogBatch`], one per consumer
257/// whose batch is ready.
258pub struct MultiConsumerTailer {
259    close_handle: CloseHandle,
260    _watcher: Box<dyn Watcher + Send>,
261    stream: std::pin::Pin<Box<dyn Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send>>,
262}
263
264impl MultiConsumerTailer {
265    pub fn close_handle(&self) -> CloseHandle {
266        self.close_handle.clone()
267    }
268
269    pub fn close(&self) {
270        self.close_handle.close();
271    }
272}
273
274impl Stream for MultiConsumerTailer {
275    type Item = anyhow::Result<Vec<LogBatch>>;
276
277    fn poll_next(
278        mut self: std::pin::Pin<&mut Self>,
279        cx: &mut std::task::Context<'_>,
280    ) -> std::task::Poll<Option<Self::Item>> {
281        if self.close_handle.shared.closed.load(Ordering::SeqCst) {
282            return std::task::Poll::Ready(None);
283        }
284        self.stream.as_mut().poll_next(cx)
285    }
286}
287
288// ---------------------------------------------------------------------------
289// Single-consumer LogTailerConfig / LogTailer (delegates to multi-consumer)
290// ---------------------------------------------------------------------------
291
292/// Configuration for constructing a single-consumer [`LogTailer`].
293#[derive(Deserialize, Serialize)]
294pub struct LogTailerConfig {
295    pub directory: Utf8PathBuf,
296    #[serde(default = "default_pattern")]
297    pub pattern: String,
298    #[serde(default = "default_max_batch_size")]
299    pub max_batch_size: usize,
300    #[serde(default = "default_max_batch_latency", with = "duration_serde")]
301    pub max_batch_latency: Duration,
302    #[serde(default, skip_serializing_if = "Option::is_none")]
303    pub checkpoint_name: Option<String>,
304    #[serde(
305        default,
306        with = "duration_serde",
307        skip_serializing_if = "Option::is_none"
308    )]
309    pub poll_watcher: Option<Duration>,
310    #[serde(default)]
311    pub tail: bool,
312}
313
314impl LogTailerConfig {
315    pub fn new(directory: Utf8PathBuf) -> Self {
316        Self {
317            directory,
318            pattern: default_pattern(),
319            max_batch_size: default_max_batch_size(),
320            max_batch_latency: default_max_batch_latency(),
321            checkpoint_name: None,
322            poll_watcher: None,
323            tail: false,
324        }
325    }
326
327    pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
328        self.pattern = pattern.into();
329        self
330    }
331
332    pub fn max_batch_size(mut self, size: usize) -> Self {
333        self.max_batch_size = size;
334        self
335    }
336
337    pub fn max_batch_latency(mut self, latency: Duration) -> Self {
338        self.max_batch_latency = latency;
339        self
340    }
341
342    pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
343        self.checkpoint_name = Some(name.into());
344        self
345    }
346
347    pub fn poll_watcher(mut self, interval: Duration) -> Self {
348        self.poll_watcher = Some(interval);
349        self
350    }
351
352    pub fn tail(mut self, enable: bool) -> Self {
353        self.tail = enable;
354        self
355    }
356
357    /// Build a single-consumer tailer.
358    pub async fn build(self) -> anyhow::Result<LogTailer> {
359        self.build_with_filter(None::<fn(&serde_json::Value) -> anyhow::Result<bool>>)
360            .await
361    }
362
363    /// Build a single-consumer tailer with an optional record filter.
364    pub async fn build_with_filter<F>(self, filter: Option<F>) -> anyhow::Result<LogTailer>
365    where
366        F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
367    {
368        let mut consumer = ConsumerConfig::new("default")
369            .max_batch_size(self.max_batch_size)
370            .max_batch_latency(self.max_batch_latency);
371        if let Some(name) = self.checkpoint_name.clone() {
372            consumer = consumer.checkpoint_name(name);
373        }
374        if let Some(f) = filter {
375            consumer = consumer.filter(f);
376        }
377
378        let multi_config = MultiConsumerTailerConfig {
379            directory: self.directory,
380            pattern: self.pattern,
381            poll_watcher: self.poll_watcher,
382            tail: self.tail,
383            consumers: vec![consumer],
384        };
385
386        let multi = multi_config.build().await?;
387
388        Ok(LogTailer { inner: multi })
389    }
390}
391
392/// A single-consumer async Stream that yields one [`LogBatch`] at a time.
393///
394/// This is a convenience wrapper around [`MultiConsumerTailer`] with
395/// exactly one consumer.
396pub struct LogTailer {
397    inner: MultiConsumerTailer,
398}
399
400impl LogTailer {
401    pub fn close_handle(&self) -> CloseHandle {
402        self.inner.close_handle()
403    }
404
405    pub fn close(&self) {
406        self.inner.close();
407    }
408}
409
410impl Stream for LogTailer {
411    type Item = anyhow::Result<LogBatch>;
412
413    fn poll_next(
414        mut self: std::pin::Pin<&mut Self>,
415        cx: &mut std::task::Context<'_>,
416    ) -> std::task::Poll<Option<Self::Item>> {
417        use std::task::Poll;
418        // The inner multi-consumer stream yields Vec<LogBatch> with exactly
419        // one element.  Unwrap it.
420        match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
421            Poll::Ready(Some(Ok(mut batches))) => Poll::Ready(Some(Ok(batches
422                .pop()
423                .expect("single consumer yields one batch")))),
424            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
425            Poll::Ready(None) => Poll::Ready(None),
426            Poll::Pending => Poll::Pending,
427        }
428    }
429}
430
431// ---------------------------------------------------------------------------
432// Shared utilities
433// ---------------------------------------------------------------------------
434
435fn resolve_tail_checkpoint(
436    directory: &Utf8PathBuf,
437    pattern: &str,
438) -> anyhow::Result<Option<CheckpointData>> {
439    let glob = Glob::new(pattern)?;
440    let mut files = vec![];
441    for path in glob.walk(directory) {
442        let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
443        if path.is_file() {
444            files.push(path);
445        }
446    }
447    files.sort();
448    Ok(files.last().map(|f| CheckpointData {
449        file: f.to_string(),
450        line: 0,
451    }))
452}
453
454/// Build the file plan: sorted list of matching files in the directory.
455fn build_plan(
456    directory: &Utf8PathBuf,
457    pattern: &str,
458    checkpoint_paths: &[Option<Utf8PathBuf>],
459    last_processed: &Option<Utf8PathBuf>,
460    checkpoint: &Option<CheckpointData>,
461    bad_files: &HashSet<Utf8PathBuf>,
462) -> anyhow::Result<Vec<Utf8PathBuf>> {
463    let glob = Glob::new(pattern)?;
464    let mut result = vec![];
465    for path in glob.walk(directory) {
466        let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
467        // Skip checkpoint files
468        if checkpoint_paths.iter().any(|cp| cp.as_ref() == Some(&path)) {
469            continue;
470        }
471        // Skip files we've previously determined to be unreadable
472        // (foreign content or unrecoverable corruption).
473        if bad_files.contains(&path) {
474            continue;
475        }
476        if path.is_file() {
477            result.push(path);
478        }
479    }
480    result.sort();
481
482    if let Some(last) = last_processed {
483        result.retain(|item| item > last);
484    } else if let Some(cp) = checkpoint {
485        let cp_file = &cp.file;
486        result.retain(|item| item.as_str() >= cp_file.as_str());
487    }
488
489    Ok(result)
490}
491
492fn is_file_done(path: &Utf8PathBuf) -> bool {
493    path.metadata()
494        .map(|m| m.permissions().readonly())
495        .unwrap_or(false)
496}
497
498// ---------------------------------------------------------------------------
499// Per-consumer state used during stream construction
500// ---------------------------------------------------------------------------
501
502// ---------------------------------------------------------------------------
503// Multi-consumer stream
504// ---------------------------------------------------------------------------
505
506fn make_multi_stream(
507    directory: Utf8PathBuf,
508    pattern: String,
509    consumers: Vec<ConsumerConfig>,
510    earliest_checkpoint: Option<CheckpointData>,
511    mut consumer_checkpoints: Vec<Option<CheckpointData>>,
512    cp_paths: Vec<Option<Utf8PathBuf>>,
513    fs_notify: Arc<Notify>,
514    shared: Arc<TailerShared>,
515) -> impl Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send {
516    let num_consumers = consumers.len();
517
518    // Extract per-consumer config into parallel vecs
519    let consumer_names: Vec<String> = consumers.iter().map(|c| c.name.clone()).collect();
520    let max_batch_sizes: Vec<usize> = consumers.iter().map(|c| c.max_batch_size).collect();
521    let max_batch_latencies: Vec<Duration> =
522        consumers.iter().map(|c| c.max_batch_latency).collect();
523    let filters: Vec<Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>> =
524        consumers.into_iter().map(|c| c.filter).collect();
525
526    async_stream::try_stream! {
527        let mut last_processed: Option<Utf8PathBuf> = None;
528        let mut global_checkpoint = earliest_checkpoint;
529        let mut skip_lines: usize;
530        let retry_delay = Duration::from_millis(200);
531        // Files that produced unrecoverable read errors (foreign content,
532        // corruption, etc).  Kept in-memory for the lifetime of the tailer
533        // so we don't re-attempt them on every directory rescan.  Unlike
534        // `last_processed`, this does not interact with file ordering, so
535        // a bad file whose name sorts after future legitimate segments
536        // does not hide them.
537        let mut bad_files: HashSet<Utf8PathBuf> = HashSet::new();
538
539        // Per-consumer skip lines (for the first file only, when
540        // resuming from checkpoint).  The global skip_lines is the
541        // minimum across all consumers for that file, and individual
542        // consumers that are further ahead will have their records
543        // filtered out by index comparison.
544        let mut consumer_skip: Vec<usize> = vec![0; num_consumers];
545
546        'outer: loop {
547            if shared.closed.load(Ordering::SeqCst) {
548                break;
549            }
550
551            let plan = build_plan(
552                &directory,
553                &pattern,
554                &cp_paths,
555                &last_processed,
556                &global_checkpoint,
557                &bad_files,
558            )?;
559
560            if plan.is_empty() {
561                tokio::select! {
562                    _ = shared.close_notify.notified() => break,
563                    _ = fs_notify.notified() => continue,
564                }
565            }
566
567            // Determine skip_lines from the earliest checkpoint
568            if let Some(cp) = &global_checkpoint {
569                if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
570                    skip_lines = cp.line;
571                } else {
572                    skip_lines = 0;
573                }
574            } else {
575                skip_lines = 0;
576            }
577
578            // Determine per-consumer skip lines
579            for i in 0..num_consumers {
580                if let Some(cp) = &consumer_checkpoints[i] {
581                    if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
582                        consumer_skip[i] = cp.line;
583                    } else {
584                        consumer_skip[i] = 0;
585                    }
586                } else {
587                    consumer_skip[i] = 0;
588                }
589            }
590            global_checkpoint.take();
591            for cp in consumer_checkpoints.iter_mut() {
592                cp.take();
593            }
594
595            let mut plan_index = 0;
596            let mut decomp: Option<FileDecompressor> = None;
597            let mut current_path: Option<&Utf8PathBuf> = None;
598            let mut last_lines_consumed: usize = 0;
599            // Track the global line number for the current file so we
600            // can apply per-consumer skip logic.
601            let mut global_line_in_file: usize = skip_lines;
602
603            if let Some(path) = plan.get(plan_index) {
604                let path_std = path.as_std_path().to_owned();
605                decomp = Some(FileDecompressor::open(&path_std)?);
606                current_path = Some(path);
607            }
608
609            // Per-consumer batches and deadlines persist across
610            // fill/yield cycles.  A consumer's batch is "ready" when
611            // it is full or its deadline has expired.  Only ready
612            // batches are yielded; others keep accumulating.
613            let mut batches: Vec<LogBatch> = (0..num_consumers)
614                .map(|i| LogBatch::with_consumer_name(consumer_names[i].clone()))
615                .collect();
616            let mut deadlines: Vec<Option<tokio::time::Instant>> = vec![None; num_consumers];
617
618            while decomp.is_some() {
619                if shared.closed.load(Ordering::SeqCst) {
620                    break 'outer;
621                }
622
623                // Fill batches until at least one is ready
624                'fill: loop {
625                    if shared.closed.load(Ordering::SeqCst) {
626                        break 'outer;
627                    }
628
629                    // Check if any consumer already has a ready batch
630                    let now = tokio::time::Instant::now();
631                    let any_ready = (0..num_consumers).any(|i| {
632                        !batches[i].is_empty()
633                            && (batches[i].len() >= max_batch_sizes[i]
634                                || deadlines[i].map_or(false, |d| now >= d))
635                    });
636                    if any_ready {
637                        break 'fill;
638                    }
639
640                    let d = decomp.as_mut().expect("checked above");
641                    let path = current_path.expect("set with decomp");
642
643                    // When set after processing the current decompressor
644                    // call, advance to the next file in the plan.
645                    // `Some(true)`  -> mark the file as bad (foreign or
646                    //                  unrecoverable corruption); do not
647                    //                  update `last_processed`.
648                    // `Some(false)` -> file completed normally; update
649                    //                  `last_processed`.
650                    let mut advance_file: Option<bool> = None;
651
652                    match d.next_line(skip_lines) {
653                        Ok(Some(line)) => {
654                            match serde_json::from_str::<serde_json::Value>(&line.text) {
655                                Ok(value) => {
656                                    for i in 0..num_consumers {
657                                        if global_line_in_file < consumer_skip[i] {
658                                            continue;
659                                        }
660                                        if let Some(ref f) = filters[i] {
661                                            if !f(&value)? {
662                                                continue;
663                                            }
664                                        }
665                                        batches[i].push_value(
666                                            value.clone(),
667                                            path,
668                                            line.byte_offset,
669                                        );
670                                        // Start the deadline timer on first record
671                                        if deadlines[i].is_none() {
672                                            deadlines[i] = Some(
673                                                tokio::time::Instant::now() + max_batch_latencies[i],
674                                            );
675                                        }
676                                    }
677                                    global_line_in_file += 1;
678                                }
679                                Err(err) => {
680                                    warn!(
681                                        "Failed to parse a line from {path} (byte offset {}) \
682                                         as json: {err}. Skipping remainder of this file; \
683                                         move it aside if it is not a kumo-jsonl segment.",
684                                        line.byte_offset
685                                    );
686                                    advance_file = Some(true);
687                                }
688                            }
689                        }
690                        Ok(None) => {
691                            // EOF on current file
692                            if is_file_done(path) {
693                                if d.has_partial_data() {
694                                    // The writer was killed before it
695                                    // could finish the zstd stream and the
696                                    // segment was later marked done by the
697                                    // producer's startup sweep over
698                                    // abandoned segments.  Discard the
699                                    // partial trailing line and treat the
700                                    // file as complete.
701                                    warn!(
702                                        "unexpected EOF for {} with partial line data remaining; \
703                                         the writer exited without flushing the zstd stream. \
704                                         Discarding partial trailing data and advancing to next \
705                                         segment.",
706                                        path
707                                    );
708                                }
709                                advance_file = Some(false);
710                            } else {
711                                // File not done; find the earliest deadline
712                                // among non-empty batches to bound the wait.
713                                let earliest_deadline = (0..num_consumers)
714                                    .filter(|&i| !batches[i].is_empty())
715                                    .filter_map(|i| deadlines[i])
716                                    .min();
717
718                                if let Some(deadline) = earliest_deadline {
719                                    let remaining = deadline.saturating_duration_since(
720                                        tokio::time::Instant::now(),
721                                    );
722                                    if remaining.is_zero() {
723                                        break 'fill;
724                                    }
725                                    tokio::select! {
726                                        _ = shared.close_notify.notified() => break 'outer,
727                                        _ = tokio::time::sleep(remaining.min(retry_delay)) => {},
728                                        _ = fs_notify.notified() => {},
729                                    }
730                                } else {
731                                    // All batches empty, file not done — wait
732                                    tokio::select! {
733                                        _ = shared.close_notify.notified() => break 'outer,
734                                        _ = tokio::time::sleep(retry_delay) => {},
735                                        _ = fs_notify.notified() => {},
736                                    }
737                                }
738                                d.reset_eof();
739                            }
740                        }
741                        Err(e) => {
742                            // Any decompression error means this file is
743                            // not consumable -- either it is foreign
744                            // content that was dropped into the directory,
745                            // or its bytes are corrupt in a way that
746                            // waiting cannot resolve (an incomplete-but-
747                            // well-formed stream surfaces as Ok(None), not
748                            // Err).  Log and skip.
749                            warn!(
750                                "Error decompressing {path}: {e:#}. Skipping this file; \
751                                 move it aside if it is not a kumo-jsonl segment."
752                            );
753                            advance_file = Some(true);
754                        }
755                    }
756
757                    if let Some(mark_bad) = advance_file {
758                        if mark_bad {
759                            bad_files.insert(path.clone());
760                        } else {
761                            last_lines_consumed = d.lines_consumed;
762                            last_processed = Some(path.clone());
763                        }
764                        skip_lines = 0;
765                        global_line_in_file = 0;
766                        for cs in consumer_skip.iter_mut() {
767                            *cs = 0;
768                        }
769                        plan_index += 1;
770                        if let Some(next_path) = plan.get(plan_index) {
771                            let path_std = next_path.as_std_path().to_owned();
772                            decomp = Some(FileDecompressor::open(&path_std)?);
773                            current_path = Some(next_path);
774                            continue 'fill;
775                        } else {
776                            decomp = None;
777                            current_path = None;
778                            break 'fill;
779                        }
780                    }
781                }
782
783                // Determine which batches are ready to yield
784                let now = tokio::time::Instant::now();
785                let mut ready: Vec<LogBatch> = Vec::new();
786                for i in 0..num_consumers {
787                    let is_ready = !batches[i].is_empty()
788                        && (batches[i].len() >= max_batch_sizes[i]
789                            || deadlines[i].map_or(false, |d| now >= d)
790                            || decomp.is_none()); // end of plan: flush all
791
792                    if !is_ready {
793                        continue;
794                    }
795
796                    // Swap out the ready batch, replace with a fresh one
797                    let mut batch = std::mem::replace(
798                        &mut batches[i],
799                        LogBatch::with_consumer_name(consumer_names[i].clone()),
800                    );
801                    deadlines[i] = None;
802
803                    // Set the commit callback
804                    if let Some(ref cp_path) = cp_paths[i] {
805                        let (cp_file, cp_line) = if let Some(d) = &decomp {
806                            let path = current_path.expect("set with decomp");
807                            (path.clone(), d.lines_consumed)
808                        } else if let Some(last) = &last_processed {
809                            (last.clone(), last_lines_consumed)
810                        } else {
811                            unreachable!("non-empty batch without a source");
812                        };
813                        let cp_path = cp_path.clone();
814                        batch.set_commit_fn(Box::new(move || {
815                            CheckpointData::save_atomic(&cp_path, &cp_file, cp_line)
816                        }));
817                    }
818                    ready.push(batch);
819                }
820
821                if !ready.is_empty() {
822                    skip_lines = 0;
823                    yield ready;
824                }
825            }
826        }
827    }
828}