1use crate::batch::LogBatch;
2use crate::checkpoint::CheckpointData;
3use crate::decompress::FileDecompressor;
4use camino::Utf8PathBuf;
5use filenamegen::Glob;
6use futures::Stream;
7use notify::event::{CreateKind, ModifyKind};
8use notify::{Event, EventKind, Watcher};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Notify;
14
15fn default_pattern() -> String {
20 "*".to_string()
21}
22
23fn default_max_batch_size() -> usize {
24 100
25}
26
27fn default_max_batch_latency() -> Duration {
28 Duration::from_secs(1)
29}
30
31pub struct ConsumerConfig {
37 pub name: String,
40 pub max_batch_size: usize,
42 pub max_batch_latency: Duration,
44 pub checkpoint_name: Option<String>,
47 pub filter: Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>,
50}
51
52impl ConsumerConfig {
53 pub fn new(name: impl Into<String>) -> Self {
54 Self {
55 name: name.into(),
56 max_batch_size: default_max_batch_size(),
57 max_batch_latency: default_max_batch_latency(),
58 checkpoint_name: None,
59 filter: None,
60 }
61 }
62
63 pub fn max_batch_size(mut self, size: usize) -> Self {
64 self.max_batch_size = size;
65 self
66 }
67
68 pub fn max_batch_latency(mut self, latency: Duration) -> Self {
69 self.max_batch_latency = latency;
70 self
71 }
72
73 pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
74 self.checkpoint_name = Some(name.into());
75 self
76 }
77
78 pub fn filter<F>(mut self, f: F) -> Self
79 where
80 F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
81 {
82 self.filter = Some(Box::new(f));
83 self
84 }
85}
86
87pub struct MultiConsumerTailerConfig {
93 pub directory: Utf8PathBuf,
95 pub pattern: String,
97 pub poll_watcher: Option<Duration>,
99 pub tail: bool,
101 pub consumers: Vec<ConsumerConfig>,
103}
104
105impl MultiConsumerTailerConfig {
106 pub fn new(directory: Utf8PathBuf, consumers: Vec<ConsumerConfig>) -> Self {
107 Self {
108 directory,
109 pattern: default_pattern(),
110 poll_watcher: None,
111 tail: false,
112 consumers,
113 }
114 }
115
116 pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
117 self.pattern = pattern.into();
118 self
119 }
120
121 pub fn poll_watcher(mut self, interval: Duration) -> Self {
122 self.poll_watcher = Some(interval);
123 self
124 }
125
126 pub fn tail(mut self, enable: bool) -> Self {
127 self.tail = enable;
128 self
129 }
130
131 pub async fn build(self) -> anyhow::Result<MultiConsumerTailer> {
133 let cp_paths: Vec<Option<Utf8PathBuf>> = self
136 .consumers
137 .iter()
138 .map(|c| {
139 c.checkpoint_name
140 .as_ref()
141 .map(|name| self.directory.join(format!(".{name}")))
142 })
143 .collect();
144
145 let mut consumer_checkpoints: Vec<Option<CheckpointData>> =
147 Vec::with_capacity(cp_paths.len());
148 let mut earliest_checkpoint: Option<CheckpointData> = None;
149
150 for cp_path in &cp_paths {
151 let cp = if self.tail {
152 resolve_tail_checkpoint(&self.directory, &self.pattern)?
153 } else if let Some(cp_path) = cp_path {
154 CheckpointData::load(cp_path).await?
155 } else {
156 None
157 };
158
159 match (&earliest_checkpoint, &cp) {
160 (None, Some(cp)) => {
161 earliest_checkpoint = Some(cp.clone());
162 }
163 (Some(existing), Some(cp)) => {
164 if cp.file < existing.file
165 || (cp.file == existing.file && cp.line < existing.line)
166 {
167 earliest_checkpoint = Some(cp.clone());
168 }
169 }
170 _ => {}
171 }
172
173 consumer_checkpoints.push(cp);
174 }
175
176 let closed = Arc::new(AtomicBool::new(false));
177 let close_notify = Arc::new(Notify::new());
178
179 let fs_notify = Arc::new(Notify::new());
180 let fs_notify_tx = fs_notify.clone();
181 let event_handler = move |res: Result<Event, _>| match res {
182 Ok(event) => match event.kind {
183 EventKind::Create(CreateKind::File) | EventKind::Modify(ModifyKind::Data(_)) => {
184 fs_notify_tx.notify_one();
185 }
186 _ => {}
187 },
188 Err(_) => {}
189 };
190 let mut watcher: Box<dyn Watcher + Send> = if let Some(interval) = self.poll_watcher {
191 Box::new(notify::PollWatcher::new(
192 event_handler,
193 notify::Config::default().with_poll_interval(interval),
194 )?)
195 } else {
196 Box::new(notify::recommended_watcher(event_handler)?)
197 };
198 watcher.watch(
199 &self.directory.clone().into_std_path_buf(),
200 notify::RecursiveMode::NonRecursive,
201 )?;
202
203 let shared = Arc::new(TailerShared {
204 closed,
205 close_notify,
206 });
207
208 let stream = make_multi_stream(
209 self.directory,
210 self.pattern,
211 self.consumers,
212 earliest_checkpoint,
213 consumer_checkpoints,
214 cp_paths,
215 fs_notify,
216 shared.clone(),
217 );
218
219 Ok(MultiConsumerTailer {
220 close_handle: CloseHandle { shared },
221 _watcher: watcher,
222 stream: Box::pin(stream),
223 })
224 }
225}
226
227struct TailerShared {
232 closed: Arc<AtomicBool>,
233 close_notify: Arc<Notify>,
234}
235
236#[derive(Clone)]
238pub struct CloseHandle {
239 shared: Arc<TailerShared>,
240}
241
242impl CloseHandle {
243 pub fn close(&self) {
245 self.shared.closed.store(true, Ordering::SeqCst);
246 self.shared.close_notify.notify_waiters();
247 }
248}
249
250pub struct MultiConsumerTailer {
257 close_handle: CloseHandle,
258 _watcher: Box<dyn Watcher + Send>,
259 stream: std::pin::Pin<Box<dyn Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send>>,
260}
261
262impl MultiConsumerTailer {
263 pub fn close_handle(&self) -> CloseHandle {
264 self.close_handle.clone()
265 }
266
267 pub fn close(&self) {
268 self.close_handle.close();
269 }
270}
271
272impl Stream for MultiConsumerTailer {
273 type Item = anyhow::Result<Vec<LogBatch>>;
274
275 fn poll_next(
276 mut self: std::pin::Pin<&mut Self>,
277 cx: &mut std::task::Context<'_>,
278 ) -> std::task::Poll<Option<Self::Item>> {
279 if self.close_handle.shared.closed.load(Ordering::SeqCst) {
280 return std::task::Poll::Ready(None);
281 }
282 self.stream.as_mut().poll_next(cx)
283 }
284}
285
286#[derive(Deserialize, Serialize)]
292pub struct LogTailerConfig {
293 pub directory: Utf8PathBuf,
294 #[serde(default = "default_pattern")]
295 pub pattern: String,
296 #[serde(default = "default_max_batch_size")]
297 pub max_batch_size: usize,
298 #[serde(default = "default_max_batch_latency", with = "duration_serde")]
299 pub max_batch_latency: Duration,
300 #[serde(default, skip_serializing_if = "Option::is_none")]
301 pub checkpoint_name: Option<String>,
302 #[serde(
303 default,
304 with = "duration_serde",
305 skip_serializing_if = "Option::is_none"
306 )]
307 pub poll_watcher: Option<Duration>,
308 #[serde(default)]
309 pub tail: bool,
310}
311
312impl LogTailerConfig {
313 pub fn new(directory: Utf8PathBuf) -> Self {
314 Self {
315 directory,
316 pattern: default_pattern(),
317 max_batch_size: default_max_batch_size(),
318 max_batch_latency: default_max_batch_latency(),
319 checkpoint_name: None,
320 poll_watcher: None,
321 tail: false,
322 }
323 }
324
325 pub fn pattern(mut self, pattern: impl Into<String>) -> Self {
326 self.pattern = pattern.into();
327 self
328 }
329
330 pub fn max_batch_size(mut self, size: usize) -> Self {
331 self.max_batch_size = size;
332 self
333 }
334
335 pub fn max_batch_latency(mut self, latency: Duration) -> Self {
336 self.max_batch_latency = latency;
337 self
338 }
339
340 pub fn checkpoint_name(mut self, name: impl Into<String>) -> Self {
341 self.checkpoint_name = Some(name.into());
342 self
343 }
344
345 pub fn poll_watcher(mut self, interval: Duration) -> Self {
346 self.poll_watcher = Some(interval);
347 self
348 }
349
350 pub fn tail(mut self, enable: bool) -> Self {
351 self.tail = enable;
352 self
353 }
354
355 pub async fn build(self) -> anyhow::Result<LogTailer> {
357 self.build_with_filter(None::<fn(&serde_json::Value) -> anyhow::Result<bool>>)
358 .await
359 }
360
361 pub async fn build_with_filter<F>(self, filter: Option<F>) -> anyhow::Result<LogTailer>
363 where
364 F: Fn(&serde_json::Value) -> anyhow::Result<bool> + Send + 'static,
365 {
366 let mut consumer = ConsumerConfig::new("default")
367 .max_batch_size(self.max_batch_size)
368 .max_batch_latency(self.max_batch_latency);
369 if let Some(name) = self.checkpoint_name.clone() {
370 consumer = consumer.checkpoint_name(name);
371 }
372 if let Some(f) = filter {
373 consumer = consumer.filter(f);
374 }
375
376 let multi_config = MultiConsumerTailerConfig {
377 directory: self.directory,
378 pattern: self.pattern,
379 poll_watcher: self.poll_watcher,
380 tail: self.tail,
381 consumers: vec![consumer],
382 };
383
384 let multi = multi_config.build().await?;
385
386 Ok(LogTailer { inner: multi })
387 }
388}
389
390pub struct LogTailer {
395 inner: MultiConsumerTailer,
396}
397
398impl LogTailer {
399 pub fn close_handle(&self) -> CloseHandle {
400 self.inner.close_handle()
401 }
402
403 pub fn close(&self) {
404 self.inner.close();
405 }
406}
407
408impl Stream for LogTailer {
409 type Item = anyhow::Result<LogBatch>;
410
411 fn poll_next(
412 mut self: std::pin::Pin<&mut Self>,
413 cx: &mut std::task::Context<'_>,
414 ) -> std::task::Poll<Option<Self::Item>> {
415 use std::task::Poll;
416 match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
419 Poll::Ready(Some(Ok(mut batches))) => Poll::Ready(Some(Ok(batches
420 .pop()
421 .expect("single consumer yields one batch")))),
422 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
423 Poll::Ready(None) => Poll::Ready(None),
424 Poll::Pending => Poll::Pending,
425 }
426 }
427}
428
429fn resolve_tail_checkpoint(
434 directory: &Utf8PathBuf,
435 pattern: &str,
436) -> anyhow::Result<Option<CheckpointData>> {
437 let glob = Glob::new(pattern)?;
438 let mut files = vec![];
439 for path in glob.walk(directory) {
440 let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
441 if path.is_file() {
442 files.push(path);
443 }
444 }
445 files.sort();
446 Ok(files.last().map(|f| CheckpointData {
447 file: f.to_string(),
448 line: 0,
449 }))
450}
451
452fn build_plan(
454 directory: &Utf8PathBuf,
455 pattern: &str,
456 checkpoint_paths: &[Option<Utf8PathBuf>],
457 last_processed: &Option<Utf8PathBuf>,
458 checkpoint: &Option<CheckpointData>,
459) -> anyhow::Result<Vec<Utf8PathBuf>> {
460 let glob = Glob::new(pattern)?;
461 let mut result = vec![];
462 for path in glob.walk(directory) {
463 let path = directory.join(Utf8PathBuf::try_from(path).map_err(|e| anyhow::anyhow!("{e}"))?);
464 if checkpoint_paths.iter().any(|cp| cp.as_ref() == Some(&path)) {
466 continue;
467 }
468 if path.is_file() {
469 result.push(path);
470 }
471 }
472 result.sort();
473
474 if let Some(last) = last_processed {
475 result.retain(|item| item > last);
476 } else if let Some(cp) = checkpoint {
477 let cp_file = &cp.file;
478 result.retain(|item| item.as_str() >= cp_file.as_str());
479 }
480
481 Ok(result)
482}
483
484fn is_file_done(path: &Utf8PathBuf) -> bool {
485 path.metadata()
486 .map(|m| m.permissions().readonly())
487 .unwrap_or(false)
488}
489
490fn make_multi_stream(
499 directory: Utf8PathBuf,
500 pattern: String,
501 consumers: Vec<ConsumerConfig>,
502 earliest_checkpoint: Option<CheckpointData>,
503 mut consumer_checkpoints: Vec<Option<CheckpointData>>,
504 cp_paths: Vec<Option<Utf8PathBuf>>,
505 fs_notify: Arc<Notify>,
506 shared: Arc<TailerShared>,
507) -> impl Stream<Item = anyhow::Result<Vec<LogBatch>>> + Send {
508 let num_consumers = consumers.len();
509
510 let consumer_names: Vec<String> = consumers.iter().map(|c| c.name.clone()).collect();
512 let max_batch_sizes: Vec<usize> = consumers.iter().map(|c| c.max_batch_size).collect();
513 let max_batch_latencies: Vec<Duration> =
514 consumers.iter().map(|c| c.max_batch_latency).collect();
515 let filters: Vec<Option<Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send>>> =
516 consumers.into_iter().map(|c| c.filter).collect();
517
518 async_stream::try_stream! {
519 let mut last_processed: Option<Utf8PathBuf> = None;
520 let mut global_checkpoint = earliest_checkpoint;
521 let mut skip_lines: usize;
522 let retry_delay = Duration::from_millis(200);
523
524 let mut consumer_skip: Vec<usize> = vec![0; num_consumers];
530
531 'outer: loop {
532 if shared.closed.load(Ordering::SeqCst) {
533 break;
534 }
535
536 let plan = build_plan(
537 &directory,
538 &pattern,
539 &cp_paths,
540 &last_processed,
541 &global_checkpoint,
542 )?;
543
544 if plan.is_empty() {
545 tokio::select! {
546 _ = shared.close_notify.notified() => break,
547 _ = fs_notify.notified() => continue,
548 }
549 }
550
551 if let Some(cp) = &global_checkpoint {
553 if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
554 skip_lines = cp.line;
555 } else {
556 skip_lines = 0;
557 }
558 } else {
559 skip_lines = 0;
560 }
561
562 for i in 0..num_consumers {
564 if let Some(cp) = &consumer_checkpoints[i] {
565 if plan.first().map(|p| p.as_str()) == Some(cp.file.as_str()) {
566 consumer_skip[i] = cp.line;
567 } else {
568 consumer_skip[i] = 0;
569 }
570 } else {
571 consumer_skip[i] = 0;
572 }
573 }
574 global_checkpoint.take();
575 for cp in consumer_checkpoints.iter_mut() {
576 cp.take();
577 }
578
579 let mut plan_index = 0;
580 let mut decomp: Option<FileDecompressor> = None;
581 let mut current_path: Option<&Utf8PathBuf> = None;
582 let mut last_lines_consumed: usize = 0;
583 let mut global_line_in_file: usize = skip_lines;
586
587 if let Some(path) = plan.get(plan_index) {
588 let path_std = path.as_std_path().to_owned();
589 decomp = Some(FileDecompressor::open(&path_std)?);
590 current_path = Some(path);
591 }
592
593 let mut batches: Vec<LogBatch> = (0..num_consumers)
598 .map(|i| LogBatch::with_consumer_name(consumer_names[i].clone()))
599 .collect();
600 let mut deadlines: Vec<Option<tokio::time::Instant>> = vec![None; num_consumers];
601
602 while decomp.is_some() {
603 if shared.closed.load(Ordering::SeqCst) {
604 break 'outer;
605 }
606
607 'fill: loop {
609 if shared.closed.load(Ordering::SeqCst) {
610 break 'outer;
611 }
612
613 let now = tokio::time::Instant::now();
615 let any_ready = (0..num_consumers).any(|i| {
616 !batches[i].is_empty()
617 && (batches[i].len() >= max_batch_sizes[i]
618 || deadlines[i].map_or(false, |d| now >= d))
619 });
620 if any_ready {
621 break 'fill;
622 }
623
624 let d = decomp.as_mut().expect("checked above");
625 let path = current_path.expect("set with decomp");
626
627 match d.next_line(skip_lines) {
628 Ok(Some(line)) => {
629 let value: serde_json::Value = serde_json::from_str(&line.text)
630 .map_err(|err| {
631 anyhow::anyhow!(
632 "Failed to parse a line from {path} (byte offset {}) \
633 as json: {err}. Is the file corrupt? You may need \
634 to move the file aside to make progress",
635 line.byte_offset
636 )
637 })?;
638
639 for i in 0..num_consumers {
640 if global_line_in_file < consumer_skip[i] {
641 continue;
642 }
643 if let Some(ref f) = filters[i] {
644 if !f(&value)? {
645 continue;
646 }
647 }
648 batches[i].push_value(
649 value.clone(),
650 path,
651 line.byte_offset,
652 );
653 if deadlines[i].is_none() {
655 deadlines[i] = Some(
656 tokio::time::Instant::now() + max_batch_latencies[i],
657 );
658 }
659 }
660 global_line_in_file += 1;
661 }
662 Ok(None) => {
663 if is_file_done(path) {
665 if d.has_partial_data() {
666 Err(anyhow::anyhow!(
667 "unexpected EOF for {} with partial line data remaining",
668 path
669 ))?;
670 }
671 last_lines_consumed = d.lines_consumed;
672 last_processed = Some(path.clone());
673 skip_lines = 0;
674 global_line_in_file = 0;
675 for cs in consumer_skip.iter_mut() {
676 *cs = 0;
677 }
678
679 plan_index += 1;
680 if let Some(next_path) = plan.get(plan_index) {
681 let path_std = next_path.as_std_path().to_owned();
682 decomp = Some(FileDecompressor::open(&path_std)?);
683 current_path = Some(next_path);
684 continue 'fill;
685 } else {
686 decomp = None;
687 current_path = None;
688 break 'fill;
689 }
690 }
691
692 let earliest_deadline = (0..num_consumers)
695 .filter(|&i| !batches[i].is_empty())
696 .filter_map(|i| deadlines[i])
697 .min();
698
699 if let Some(deadline) = earliest_deadline {
700 let remaining = deadline.saturating_duration_since(
701 tokio::time::Instant::now(),
702 );
703 if remaining.is_zero() {
704 break 'fill;
705 }
706 tokio::select! {
707 _ = shared.close_notify.notified() => break 'outer,
708 _ = tokio::time::sleep(remaining.min(retry_delay)) => {},
709 _ = fs_notify.notified() => {},
710 }
711 } else {
712 tokio::select! {
714 _ = shared.close_notify.notified() => break 'outer,
715 _ = tokio::time::sleep(retry_delay) => {},
716 _ = fs_notify.notified() => {},
717 }
718 }
719 d.reset_eof();
720 }
721 Err(e) => {
722 Err(e)?;
723 }
724 }
725 }
726
727 let now = tokio::time::Instant::now();
729 let mut ready: Vec<LogBatch> = Vec::new();
730 for i in 0..num_consumers {
731 let is_ready = !batches[i].is_empty()
732 && (batches[i].len() >= max_batch_sizes[i]
733 || deadlines[i].map_or(false, |d| now >= d)
734 || decomp.is_none()); if !is_ready {
737 continue;
738 }
739
740 let mut batch = std::mem::replace(
742 &mut batches[i],
743 LogBatch::with_consumer_name(consumer_names[i].clone()),
744 );
745 deadlines[i] = None;
746
747 if let Some(ref cp_path) = cp_paths[i] {
749 let (cp_file, cp_line) = if let Some(d) = &decomp {
750 let path = current_path.expect("set with decomp");
751 (path.clone(), d.lines_consumed)
752 } else if let Some(last) = &last_processed {
753 (last.clone(), last_lines_consumed)
754 } else {
755 unreachable!("non-empty batch without a source");
756 };
757 let cp_path = cp_path.clone();
758 batch.set_commit_fn(Box::new(move || {
759 CheckpointData::save_atomic(&cp_path, &cp_file, cp_line)
760 }));
761 }
762 ready.push(batch);
763 }
764
765 if !ready.is_empty() {
766 skip_lines = 0;
767 yield ready;
768 }
769 }
770 }
771 }
772}