1use crate::{
2 CloseHandle, ConsumerConfig, LogBatch, LogTailerConfig, LogWriterConfig,
3 MultiConsumerTailerConfig,
4};
5use config::{any_err, from_lua_value, get_or_create_sub_module, SerdeWrappedValue};
6use futures::StreamExt;
7use mlua::{Lua, LuaSerdeExt, MetaMethod, UserData, UserDataMethods, UserDataRef, UserDataRefMut};
8use serde::Deserialize;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14struct LuaLogBatch {
25 inner: Option<LogBatch>,
26}
27
28impl LuaLogBatch {
29 fn consumer_name(_lua: &Lua, this: &Self, _: ()) -> mlua::Result<String> {
32 Ok(this
33 .inner
34 .as_ref()
35 .map(|b| b.consumer_name().to_string())
36 .unwrap_or_default())
37 }
38
39 fn records(lua: &Lua, this: &Self, _: ()) -> mlua::Result<mlua::Value> {
42 let table = lua.create_table()?;
43 if let Some(batch) = &this.inner {
44 let options = config::serialize_options();
45 for (i, value) in batch.records().iter().enumerate() {
46 let lua_value = lua.to_value_with(value, options)?;
47 table.raw_set(i + 1, lua_value)?;
48 }
49 }
50 Ok(mlua::Value::Table(table))
51 }
52
53 fn commit(_lua: &Lua, this: &mut Self, _: ()) -> mlua::Result<()> {
56 if let Some(mut batch) = this.inner.take() {
57 batch.commit().map_err(any_err)?;
58 }
59 Ok(())
60 }
61
62 fn iter_records(lua: &Lua, this: &Self, _: ()) -> mlua::Result<mlua::Function> {
67 let records = match &this.inner {
68 Some(batch) => batch.records().to_vec(),
69 None => Vec::new(),
70 };
71 let records = std::sync::Arc::new(records);
72 let idx = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
73 lua.create_function(move |lua, ()| {
74 let i = idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
75 if let Some(value) = records.get(i) {
76 let options = config::serialize_options();
77 let lua_value = lua.to_value_with(value, options)?;
78 Ok(lua_value)
79 } else {
80 Ok(mlua::Value::Nil)
81 }
82 })
83 }
84}
85
86impl UserData for LuaLogBatch {
87 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
88 methods.add_method("consumer_name", Self::consumer_name);
89 methods.add_method("records", Self::records);
90 methods.add_method("iter_records", Self::iter_records);
91 methods.add_method_mut("commit", Self::commit);
92 }
93}
94
95struct LuaLogTailer {
100 stream: Arc<Mutex<Pin<Box<crate::LogTailer>>>>,
101 close_handle: CloseHandle,
102}
103
104impl LuaLogTailer {
105 async fn close(_lua: Lua, this: UserDataRefMut<Self>, _: ()) -> mlua::Result<()> {
106 this.close_handle.close();
107 Ok(())
108 }
109
110 async fn batches(lua: Lua, this: UserDataRef<Self>, _: ()) -> mlua::Result<mlua::Function> {
111 let stream = this.stream.clone();
112 lua.create_async_function(move |lua, ()| {
113 let stream = stream.clone();
114 async move {
115 let mut guard = stream.lock().await;
116 match guard.next().await {
117 Some(Ok(batch)) => Ok(mlua::Value::UserData(
118 lua.create_userdata(LuaLogBatch { inner: Some(batch) })?,
119 )),
120 Some(Err(e)) => Err(any_err(e)),
121 None => Ok(mlua::Value::Nil),
122 }
123 }
124 })
125 }
126}
127
128impl UserData for LuaLogTailer {
129 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
130 methods.add_async_method("batches", Self::batches);
131 methods.add_async_method_mut("close", Self::close);
132 methods.add_async_meta_method_mut(MetaMethod::Close, Self::close);
133 }
134}
135
136struct LuaMultiConsumerTailer {
141 stream: Arc<Mutex<Pin<Box<crate::MultiConsumerTailer>>>>,
142 close_handle: CloseHandle,
143}
144
145impl LuaMultiConsumerTailer {
146 async fn close(_lua: Lua, this: UserDataRefMut<Self>, _: ()) -> mlua::Result<()> {
147 this.close_handle.close();
148 Ok(())
149 }
150
151 async fn batches(lua: Lua, this: UserDataRef<Self>, _: ()) -> mlua::Result<mlua::Function> {
155 let stream = this.stream.clone();
156 lua.create_async_function(move |lua, ()| {
157 let stream = stream.clone();
158 async move {
159 let mut guard = stream.lock().await;
160 match guard.next().await {
161 Some(Ok(batch_vec)) => {
162 let table = lua.create_table()?;
163 for (i, batch) in batch_vec.into_iter().enumerate() {
164 let ud = lua.create_userdata(LuaLogBatch { inner: Some(batch) })?;
165 table.raw_set(i + 1, ud)?;
166 }
167 Ok(mlua::Value::Table(table))
168 }
169 Some(Err(e)) => Err(any_err(e)),
170 None => Ok(mlua::Value::Nil),
171 }
172 }
173 })
174 }
175}
176
177impl UserData for LuaMultiConsumerTailer {
178 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
179 methods.add_async_method("batches", Self::batches);
180 methods.add_async_method_mut("close", Self::close);
181 methods.add_async_meta_method_mut(MetaMethod::Close, Self::close);
182 }
183}
184
185#[derive(Deserialize)]
190struct LuaConsumerConfig {
191 name: String,
192 #[serde(default = "default_max_batch_size")]
193 max_batch_size: usize,
194 #[serde(default = "default_max_batch_latency", with = "duration_serde")]
195 max_batch_latency: Duration,
196 #[serde(default)]
197 checkpoint_name: Option<String>,
198}
199
200fn default_max_batch_size() -> usize {
201 100
202}
203
204fn default_max_batch_latency() -> Duration {
205 Duration::from_secs(1)
206}
207
208#[derive(Deserialize)]
209struct LuaMultiConsumerTailerConfig {
210 directory: String,
211 #[serde(default = "default_pattern")]
212 pattern: String,
213 #[serde(
214 default,
215 with = "duration_serde",
216 skip_serializing_if = "Option::is_none"
217 )]
218 poll_watcher: Option<Duration>,
219 #[serde(default)]
220 tail: bool,
221 }
225
226fn default_pattern() -> String {
227 "*".to_string()
228}
229
230fn make_rust_filter(
232 lua: &Lua,
233 func: mlua::Function,
234) -> Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send> {
235 let lua = lua.clone();
236 Box::new(move |value: &serde_json::Value| -> anyhow::Result<bool> {
237 let options = config::serialize_options();
238 let lua_value = lua.to_value_with(value, options)?;
239 let result: bool = func.call(lua_value.clone())?;
240 Ok(result)
241 })
242}
243
244pub fn register(lua: &Lua) -> anyhow::Result<()> {
249 let tailer_mod = get_or_create_sub_module(lua, "jsonl")?;
250
251 tailer_mod.set(
253 "new_tailer",
254 lua.create_async_function(
255 |lua,
256 (cfg, lua_filter): (
257 SerdeWrappedValue<LogTailerConfig>,
258 Option<mlua::Function>,
259 )| async move {
260 let cfg = cfg.0;
261 let filter = lua_filter.map(|f| make_rust_filter(&lua, f));
262 let tailer = cfg.build_with_filter(filter).await.map_err(any_err)?;
263 let close_handle = tailer.close_handle();
264
265 Ok(LuaLogTailer {
266 stream: Arc::new(Mutex::new(Box::pin(tailer))),
267 close_handle,
268 })
269 },
270 )?,
271 )?;
272
273 tailer_mod.set(
294 "new_multi_tailer",
295 lua.create_async_function(|lua, cfg_value: mlua::Value| async move {
296 let cfg_table = match &cfg_value {
297 mlua::Value::Table(t) => t,
298 _ => return Err(any_err("expected a table")),
299 };
300
301 let consumers_table: mlua::Table = cfg_table.get("consumers")?;
305 cfg_table.set("consumers", mlua::Value::Nil)?;
306
307 let cfg: LuaMultiConsumerTailerConfig = from_lua_value(&lua, cfg_value)?;
309
310 let num_consumers = consumers_table.len()? as usize;
311
312 let mut consumers = Vec::with_capacity(num_consumers);
313 for idx in 0..num_consumers {
314 let entry: mlua::Table = consumers_table.get(idx + 1)?;
315 let filter_func: Option<mlua::Function> =
317 entry.get::<mlua::Function>("filter").ok();
318 entry.set("filter", mlua::Value::Nil)?;
319 let lc: LuaConsumerConfig = from_lua_value(&lua, mlua::Value::Table(entry))?;
320
321 let mut consumer = ConsumerConfig::new(&lc.name)
322 .max_batch_size(lc.max_batch_size)
323 .max_batch_latency(lc.max_batch_latency);
324 if let Some(cp) = lc.checkpoint_name {
325 consumer = consumer.checkpoint_name(cp);
326 }
327 if let Some(func) = filter_func {
328 consumer = consumer.filter(make_rust_filter(&lua, func));
329 }
330 consumers.push(consumer);
331 }
332
333 let multi_cfg = MultiConsumerTailerConfig::new(cfg.directory.into(), consumers)
334 .pattern(cfg.pattern)
335 .tail(cfg.tail);
336 let multi_cfg = match cfg.poll_watcher {
337 Some(interval) => multi_cfg.poll_watcher(interval),
338 None => multi_cfg,
339 };
340
341 let tailer = multi_cfg.build().await.map_err(any_err)?;
342 let close_handle = tailer.close_handle();
343
344 Ok(LuaMultiConsumerTailer {
345 stream: Arc::new(Mutex::new(Box::pin(tailer))),
346 close_handle,
347 })
348 })?,
349 )?;
350
351 tailer_mod.set(
353 "new_writer",
354 lua.create_function(|lua, cfg: SerdeWrappedValue<LogWriterConfig>| {
355 let _ = lua;
356 Ok(LuaLogWriter {
357 inner: Some(cfg.0.build()),
358 })
359 })?,
360 )?;
361
362 Ok(())
363}
364
365struct LuaLogWriter {
371 inner: Option<crate::writer::LogWriter>,
372}
373
374impl LuaLogWriter {
375 fn write_line(_lua: &Lua, this: &mut Self, line: String) -> mlua::Result<()> {
376 let writer = this
377 .inner
378 .as_mut()
379 .ok_or_else(|| mlua::Error::external("writer has been closed"))?;
380 writer.write_line(&line).map_err(any_err)
381 }
382
383 fn write_record(lua: &Lua, this: &mut Self, value: mlua::Value) -> mlua::Result<()> {
384 let writer = this
385 .inner
386 .as_mut()
387 .ok_or_else(|| mlua::Error::external("writer has been closed"))?;
388 let json_value: serde_json::Value = lua.from_value(value)?;
390 writer.write_value(&json_value).map_err(any_err)
391 }
392
393 fn close(_lua: &Lua, this: &mut Self, _: ()) -> mlua::Result<()> {
394 if let Some(mut writer) = this.inner.take() {
395 writer.close().map_err(any_err)?;
396 }
397 Ok(())
398 }
399}
400
401impl UserData for LuaLogWriter {
402 fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
403 methods.add_method_mut("write_line", Self::write_line);
404 methods.add_method_mut("write_record", Self::write_record);
405 methods.add_method_mut("close", Self::close);
406 methods.add_meta_method_mut(MetaMethod::Close, Self::close);
407 }
408}