1use crate::batch::LogBatch;
2use crate::checkpoint::CheckpointData;
3use crate::decompress::FileDecompressor;
4use camino::Utf8PathBuf;
5use filenamegen::Glob;
6use futures::Stream;
7use notify::event::{CreateKind, ModifyKind};
8use notify::{Event, EventKind, Watcher};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Notify;
15use tracing::warn;
16
17fn default_pattern() -> String {
22 "*".to_string()
23}
24
25fn default_max_batch_size() -> usize {
26 100
27}
28
29fn default_max_batch_latency() -> Duration {
30 Duration::from_secs(1)
31}
32
33pub struct ConsumerConfig {
39 pub name: String,
42 pub max_batch_size: usize,
44 pub max_batch_latency: Duration,
46 pub checkpoint_name: Option<String>,
49 pub filter: Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>,
52}
53
54impl ConsumerConfig {
55 pub fn new(name: impl Into<String>) -> Self {
56 Self {
57 name: name.into(),
58 max_batch_size: default_max_batch_size(),
59 max_batch_latency: default_max_batch_latency(),
60 checkpoint_name: None,
61 filter: None,
62 }
63 }
64
65 pub fn max_batch_size(mut self, size: usize) -> Self {
66 self.max_batch_size = size;
67 self
68 }
69
70 pub fn max_batch_latency(mut self, latency: Duration) -> Self {
71 self.max_batch_latency = latency;
72 self
73 }
74
75 pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
76 self.checkpoint_name = Some(name.into());
77 self
78 }
79
80 pub fn filter<F>(mut self, f: F) -> Self
81 where
82 F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
83 {
84 self.filter = Some(Box::new(f));
85 self
86 }
87}
88
89pub struct MultiConsumerTailerConfig {
95 pub directory: Utf8PathBuf,
97 pub pattern: String,
99 pub poll_watcher: Option<Duration>,
101 pub tail: bool,
103 pub consumers: Vec<ConsumerConfig>,
105}
106
107impl MultiConsumerTailerConfig {
108 pub fn new(directory: Utf8PathBuf, consumers: Vec<ConsumerConfig>) -> Self {
109 Self {
110 directory,
111 pattern: default_pattern(),
112 poll_watcher: None,
113 tail: false,
114 consumers,
115 }
116 }
117
118 pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
119 self.pattern = pattern.into();
120 self
121 }
122
123 pub fn poll_watcher(mut self, interval: Duration) -> Self {
124 self.poll_watcher = Some(interval);
125 self
126 }
127
128 pub fn tail(mut self, enable: bool) -> Self {
129 self.tail = enable;
130 self
131 }
132
133 pub async fn build(self) -> anyhow::Result<MultiConsumerTailer> {
135 let cp_paths: Vec<Option<Utf8PathBuf>> = self
138 .consumers
139 .iter()
140 .map(|c| {
141 c.checkpoint_name
142 .as_ref()
143 .map(|name| self.directory.join(format!(".{name}")))
144 })
145 .collect();
146
147 let mut consumer_checkpoints: Vec<Option<CheckpointData>> =
149 Vec::with_capacity(cp_paths.len());
150 let mut earliest_checkpoint: Option<CheckpointData> = None;
151
152 for cp_path in &cp_paths {
153 let cp = if self.tail {
154 resolve_tail_checkpoint(&self.directory, &self.pattern)?
155 } else if let Some(cp_path) = cp_path {
156 CheckpointData::load(cp_path).await?
157 } else {
158 None
159 };
160
161 match (&earliest_checkpoint, &cp) {
162 (None, Some(cp)) => {
163 earliest_checkpoint = Some(cp.clone());
164 }
165 (Some(existing), Some(cp)) => {
166 if cp.file < existing.file
167 || (cp.file == existing.file && cp.line < existing.line)
168 {
169 earliest_checkpoint = Some(cp.clone());
170 }
171 }
172 _ => {}
173 }
174
175 consumer_checkpoints.push(cp);
176 }
177
178 let closed = Arc::new(AtomicBool::new(false));
179 let close_notify = Arc::new(Notify::new());
180
181 let fs_notify = Arc::new(Notify::new());
182 let fs_notify_tx = fs_notify.clone();
183 let event_handler = move |res: Result<Event, _>| match res {
184 Ok(event) => match event.kind {
185 EventKind::Create(CreateKind::File) | EventKind::Modify(ModifyKind::Data(_)) => {
186 fs_notify_tx.notify_one();
187 }
188 _ => {}
189 },
190 Err(_) => {}
191 };
192 let mut watcher: Box<dyn Watcher + Send> = if let Some(interval) = self.poll_watcher {
193 Box::new(notify::PollWatcher::new(
194 event_handler,
195 notify::Config::default().with_poll_interval(interval),
196 )?)
197 } else {
198 Box::new(notify::recommended_watcher(event_handler)?)
199 };
200 watcher.watch(
201 &self.directory.clone().into_std_path_buf(),
202 notify::RecursiveMode::NonRecursive,
203 )?;
204
205 let shared = Arc::new(TailerShared {
206 closed,
207 close_notify,
208 });
209
210 let stream = make_multi_stream(
211 self.directory,
212 self.pattern,
213 self.consumers,
214 earliest_checkpoint,
215 consumer_checkpoints,
216 cp_paths,
217 fs_notify,
218 shared.clone(),
219 );
220
221 Ok(MultiConsumerTailer {
222 close_handle: CloseHandle { shared },
223 _watcher: watcher,
224 stream: Box::pin(stream),
225 })
226 }
227}
228
229struct TailerShared {
234 closed: Arc<AtomicBool>,
235 close_notify: Arc<Notify>,
236}
237
238#[derive(Clone)]
240pub struct CloseHandle {
241 shared: Arc<TailerShared>,
242}
243
244impl CloseHandle {
245 pub fn close(&self) {
247 self.shared.closed.store(true, Ordering::SeqCst);
248 self.shared.close_notify.notify_waiters();
249 }
250}
251
252pub struct MultiConsumerTailer {
259 close_handle: CloseHandle,
260 _watcher: Box<dyn Watcher + Send>,
261 stream: std::pin::Pin<Box<dyn Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send>>,
262}
263
264impl MultiConsumerTailer {
265 pub fn close_handle(&self) -> CloseHandle {
266 self.close_handle.clone()
267 }
268
269 pub fn close(&self) {
270 self.close_handle.close();
271 }
272}
273
274impl Stream for MultiConsumerTailer {
275 type Item = anyhow::Result<Vec<LogBatch>>;
276
277 fn poll_next(
278 mut self: std::pin::Pin<&mut Self>,
279 cx: &mut std::task::Context<'_>,
280 ) -> std::task::Poll<Option<Self::Item>> {
281 if self.close_handle.shared.closed.load(Ordering::SeqCst) {
282 return std::task::Poll::Ready(None);
283 }
284 self.stream.as_mut().poll_next(cx)
285 }
286}
287
288#[derive(Deserialize, Serialize)]
294pub struct LogTailerConfig {
295 pub directory: Utf8PathBuf,
296 #[serde(default = "default_pattern")]
297 pub pattern: String,
298 #[serde(default = "default_max_batch_size")]
299 pub max_batch_size: usize,
300 #[serde(default = "default_max_batch_latency", with = "duration_serde")]
301 pub max_batch_latency: Duration,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
303 pub checkpoint_name: Option<String>,
304 #[serde(
305 default,
306 with = "duration_serde",
307 skip_serializing_if = "Option::is_none"
308 )]
309 pub poll_watcher: Option<Duration>,
310 #[serde(default)]
311 pub tail: bool,
312}
313
314impl LogTailerConfig {
315 pub fn new(directory: Utf8PathBuf) -> Self {
316 Self {
317 directory,
318 pattern: default_pattern(),
319 max_batch_size: default_max_batch_size(),
320 max_batch_latency: default_max_batch_latency(),
321 checkpoint_name: None,
322 poll_watcher: None,
323 tail: false,
324 }
325 }
326
327 pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
328 self.pattern = pattern.into();
329 self
330 }
331
332 pub fn max_batch_size(mut self, size: usize) -> Self {
333 self.max_batch_size = size;
334 self
335 }
336
337 pub fn max_batch_latency(mut self, latency: Duration) -> Self {
338 self.max_batch_latency = latency;
339 self
340 }
341
342 pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
343 self.checkpoint_name = Some(name.into());
344 self
345 }
346
347 pub fn poll_watcher(mut self, interval: Duration) -> Self {
348 self.poll_watcher = Some(interval);
349 self
350 }
351
352 pub fn tail(mut self, enable: bool) -> Self {
353 self.tail = enable;
354 self
355 }
356
357 pub async fn build(self) -> anyhow::Result<LogTailer> {
359 self.build_with_filter(None::<fn(&serde_json::Value) -> anyhow::Result<bool>>)
360 .await
361 }
362
363 pub async fn build_with_filter<F>(self, filter: Option<F>) -> anyhow::Result<LogTailer>
365 where
366 F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
367 {
368 let mut consumer = ConsumerConfig::new("default")
369 .max_batch_size(self.max_batch_size)
370 .max_batch_latency(self.max_batch_latency);
371 if let Some(name) = self.checkpoint_name.clone() {
372 consumer = consumer.checkpoint_name(name);
373 }
374 if let Some(f) = filter {
375 consumer = consumer.filter(f);
376 }
377
378 let multi_config = MultiConsumerTailerConfig {
379 directory: self.directory,
380 pattern: self.pattern,
381 poll_watcher: self.poll_watcher,
382 tail: self.tail,
383 consumers: vec![consumer],
384 };
385
386 let multi = multi_config.build().await?;
387
388 Ok(LogTailer { inner: multi })
389 }
390}
391
392pub struct LogTailer {
397 inner: MultiConsumerTailer,
398}
399
400impl LogTailer {
401 pub fn close_handle(&self) -> CloseHandle {
402 self.inner.close_handle()
403 }
404
405 pub fn close(&self) {
406 self.inner.close();
407 }
408}
409
410impl Stream for LogTailer {
411 type Item = anyhow::Result<LogBatch>;
412
413 fn poll_next(
414 mut self: std::pin::Pin<&mut Self>,
415 cx: &mut std::task::Context<'_>,
416 ) -> std::task::Poll<Option<Self::Item>> {
417 use std::task::Poll;
418 match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
421 Poll::Ready(Some(Ok(mut batches))) => Poll::Ready(Some(Ok(batches
422 .pop()
423 .expect("single consumer yields one batch")))),
424 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
425 Poll::Ready(None) => Poll::Ready(None),
426 Poll::Pending => Poll::Pending,
427 }
428 }
429}
430
431fn resolve_tail_checkpoint(
436 directory: &Utf8PathBuf,
437 pattern: &str,
438) -> anyhow::Result<Option<CheckpointData>> {
439 let glob = Glob::new(pattern)?;
440 let mut files = vec![];
441 for path in glob.walk(directory) {
442 let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
443 if path.is_file() {
444 files.push(path);
445 }
446 }
447 files.sort();
448 Ok(files.last().map(|f| CheckpointData {
449 file: f.to_string(),
450 line: 0,
451 }))
452}
453
454fn build_plan(
456 directory: &Utf8PathBuf,
457 pattern: &str,
458 checkpoint_paths: &[Option<Utf8PathBuf>],
459 last_processed: &Option<Utf8PathBuf>,
460 checkpoint: &Option<CheckpointData>,
461 bad_files: &HashSet<Utf8PathBuf>,
462) -> anyhow::Result<Vec<Utf8PathBuf>> {
463 let glob = Glob::new(pattern)?;
464 let mut result = vec![];
465 for path in glob.walk(directory) {
466 let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
467 if checkpoint_paths.iter().any(|cp| cp.as_ref() == Some(&path)) {
469 continue;
470 }
471 if bad_files.contains(&path) {
474 continue;
475 }
476 if path.is_file() {
477 result.push(path);
478 }
479 }
480 result.sort();
481
482 if let Some(last) = last_processed {
483 result.retain(|item| item > last);
484 } else if let Some(cp) = checkpoint {
485 let cp_file = &cp.file;
486 result.retain(|item| item.as_str() >= cp_file.as_str());
487 }
488
489 Ok(result)
490}
491
492fn is_file_done(path: &Utf8PathBuf) -> bool {
493 path.metadata()
494 .map(|m| m.permissions().readonly())
495 .unwrap_or(false)
496}
497
498fn make_multi_stream(
507 directory: Utf8PathBuf,
508 pattern: String,
509 consumers: Vec<ConsumerConfig>,
510 earliest_checkpoint: Option<CheckpointData>,
511 mut consumer_checkpoints: Vec<Option<CheckpointData>>,
512 cp_paths: Vec<Option<Utf8PathBuf>>,
513 fs_notify: Arc<Notify>,
514 shared: Arc<TailerShared>,
515) -> impl Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send {
516 let num_consumers = consumers.len();
517
518 let consumer_names: Vec<String> = consumers.iter().map(|c| c.name.clone()).collect();
520 let max_batch_sizes: Vec<usize> = consumers.iter().map(|c| c.max_batch_size).collect();
521 let max_batch_latencies: Vec<Duration> =
522 consumers.iter().map(|c| c.max_batch_latency).collect();
523 let filters: Vec<Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>> =
524 consumers.into_iter().map(|c| c.filter).collect();
525
526 async_stream::try_stream! {
527 let mut last_processed: Option<Utf8PathBuf> = None;
528 let mut global_checkpoint = earliest_checkpoint;
529 let mut skip_lines: usize;
530 let retry_delay = Duration::from_millis(200);
531 let mut bad_files: HashSet<Utf8PathBuf> = HashSet::new();
538
539 let mut consumer_skip: Vec<usize> = vec![0; num_consumers];
545
546 'outer: loop {
547 if shared.closed.load(Ordering::SeqCst) {
548 break;
549 }
550
551 let plan = build_plan(
552 &directory,
553 &pattern,
554 &cp_paths,
555 &last_processed,
556 &global_checkpoint,
557 &bad_files,
558 )?;
559
560 if plan.is_empty() {
561 tokio::select! {
562 _ = shared.close_notify.notified() => break,
563 _ = fs_notify.notified() => continue,
564 }
565 }
566
567 if let Some(cp) = &global_checkpoint {
569 if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
570 skip_lines = cp.line;
571 } else {
572 skip_lines = 0;
573 }
574 } else {
575 skip_lines = 0;
576 }
577
578 for i in 0..num_consumers {
580 if let Some(cp) = &consumer_checkpoints[i] {
581 if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
582 consumer_skip[i] = cp.line;
583 } else {
584 consumer_skip[i] = 0;
585 }
586 } else {
587 consumer_skip[i] = 0;
588 }
589 }
590 global_checkpoint.take();
591 for cp in consumer_checkpoints.iter_mut() {
592 cp.take();
593 }
594
595 let mut plan_index = 0;
596 let mut decomp: Option<FileDecompressor> = None;
597 let mut current_path: Option<&Utf8PathBuf> = None;
598 let mut last_lines_consumed: usize = 0;
599 let mut global_line_in_file: usize = skip_lines;
602
603 if let Some(path) = plan.get(plan_index) {
604 let path_std = path.as_std_path().to_owned();
605 decomp = Some(FileDecompressor::open(&path_std)?);
606 current_path = Some(path);
607 }
608
609 let mut batches: Vec<LogBatch> = (0..num_consumers)
614 .map(|i| LogBatch::with_consumer_name(consumer_names[i].clone()))
615 .collect();
616 let mut deadlines: Vec<Option<tokio::time::Instant>> = vec![None; num_consumers];
617
618 while decomp.is_some() {
619 if shared.closed.load(Ordering::SeqCst) {
620 break 'outer;
621 }
622
623 'fill: loop {
625 if shared.closed.load(Ordering::SeqCst) {
626 break 'outer;
627 }
628
629 let now = tokio::time::Instant::now();
631 let any_ready = (0..num_consumers).any(|i| {
632 !batches[i].is_empty()
633 && (batches[i].len() >= max_batch_sizes[i]
634 || deadlines[i].map_or(false, |d| now >= d))
635 });
636 if any_ready {
637 break 'fill;
638 }
639
640 let d = decomp.as_mut().expect("checked above");
641 let path = current_path.expect("set with decomp");
642
643 let mut advance_file: Option<bool> = None;
651
652 match d.next_line(skip_lines) {
653 Ok(Some(line)) => {
654 match serde_json::from_str::<serde_json::Value>(&line.text) {
655 Ok(value) => {
656 for i in 0..num_consumers {
657 if global_line_in_file < consumer_skip[i] {
658 continue;
659 }
660 if let Some(ref f) = filters[i] {
661 if !f(&value)? {
662 continue;
663 }
664 }
665 batches[i].push_value(
666 value.clone(),
667 path,
668 line.byte_offset,
669 );
670 if deadlines[i].is_none() {
672 deadlines[i] = Some(
673 tokio::time::Instant::now() + max_batch_latencies[i],
674 );
675 }
676 }
677 global_line_in_file += 1;
678 }
679 Err(err) => {
680 warn!(
681 "Failed to parse a line from {path} (byte offset {}) \
682 as json: {err}. Skipping remainder of this file; \
683 move it aside if it is not a kumo-jsonl segment.",
684 line.byte_offset
685 );
686 advance_file = Some(true);
687 }
688 }
689 }
690 Ok(None) => {
691 if is_file_done(path) {
693 if d.has_partial_data() {
694 warn!(
702 "unexpected EOF for {} with partial line data remaining; \
703 the writer exited without flushing the zstd stream. \
704 Discarding partial trailing data and advancing to next \
705 segment.",
706 path
707 );
708 }
709 advance_file = Some(false);
710 } else {
711 let earliest_deadline = (0..num_consumers)
714 .filter(|&i| !batches[i].is_empty())
715 .filter_map(|i| deadlines[i])
716 .min();
717
718 if let Some(deadline) = earliest_deadline {
719 let remaining = deadline.saturating_duration_since(
720 tokio::time::Instant::now(),
721 );
722 if remaining.is_zero() {
723 break 'fill;
724 }
725 tokio::select! {
726 _ = shared.close_notify.notified() => break 'outer,
727 _ = tokio::time::sleep(remaining.min(retry_delay)) => {},
728 _ = fs_notify.notified() => {},
729 }
730 } else {
731 tokio::select! {
733 _ = shared.close_notify.notified() => break 'outer,
734 _ = tokio::time::sleep(retry_delay) => {},
735 _ = fs_notify.notified() => {},
736 }
737 }
738 d.reset_eof();
739 }
740 }
741 Err(e) => {
742 warn!(
750 "Error decompressing {path}: {e:#}. Skipping this file; \
751 move it aside if it is not a kumo-jsonl segment."
752 );
753 advance_file = Some(true);
754 }
755 }
756
757 if let Some(mark_bad) = advance_file {
758 if mark_bad {
759 bad_files.insert(path.clone());
760 } else {
761 last_lines_consumed = d.lines_consumed;
762 last_processed = Some(path.clone());
763 }
764 skip_lines = 0;
765 global_line_in_file = 0;
766 for cs in consumer_skip.iter_mut() {
767 *cs = 0;
768 }
769 plan_index += 1;
770 if let Some(next_path) = plan.get(plan_index) {
771 let path_std = next_path.as_std_path().to_owned();
772 decomp = Some(FileDecompressor::open(&path_std)?);
773 current_path = Some(next_path);
774 continue 'fill;
775 } else {
776 decomp = None;
777 current_path = None;
778 break 'fill;
779 }
780 }
781 }
782
783 let now = tokio::time::Instant::now();
785 let mut ready: Vec<LogBatch> = Vec::new();
786 for i in 0..num_consumers {
787 let is_ready = !batches[i].is_empty()
788 && (batches[i].len() >= max_batch_sizes[i]
789 || deadlines[i].map_or(false, |d| now >= d)
790 || decomp.is_none()); if !is_ready {
793 continue;
794 }
795
796 let mut batch = std::mem::replace(
798 &mut batches[i],
799 LogBatch::with_consumer_name(consumer_names[i].clone()),
800 );
801 deadlines[i] = None;
802
803 if let Some(ref cp_path) = cp_paths[i] {
805 let (cp_file, cp_line) = if let Some(d) = &decomp {
806 let path = current_path.expect("set with decomp");
807 (path.clone(), d.lines_consumed)
808 } else if let Some(last) = &last_processed {
809 (last.clone(), last_lines_consumed)
810 } else {
811 unreachable!("non-empty batch without a source");
812 };
813 let cp_path = cp_path.clone();
814 batch.set_commit_fn(Box::new(move || {
815 CheckpointData::save_atomic(&cp_path, &cp_file, cp_line)
816 }));
817 }
818 ready.push(batch);
819 }
820
821 if !ready.is_empty() {
822 skip_lines = 0;
823 yield ready;
824 }
825 }
826 }
827 }
828}