kumo_jsonl/
lua.rs

1use crate::{
2    CloseHandle, ConsumerConfig, LogBatch, LogTailerConfig, LogWriterConfig,
3    MultiConsumerTailerConfig,
4};
5use config::{any_err, from_lua_value, get_or_create_sub_module, SerdeWrappedValue};
6use futures::StreamExt;
7use mlua::{Lua, LuaSerdeExt, MetaMethod, UserData, UserDataMethods, UserDataRef, UserDataRefMut};
8use serde::Deserialize;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14// ---------------------------------------------------------------------------
15// LuaLogBatch
16// ---------------------------------------------------------------------------
17
18/// Lua wrapper around [`LogBatch`].
19///
20/// Exposes `:records()` to iterate over the parsed JSON records,
21/// `:iter_records()` for lazy iteration, `:consumer_name()` to
22/// identify which consumer the batch belongs to, and `:commit()`
23/// to advance the checkpoint.
24struct LuaLogBatch {
25    inner: Option<LogBatch>,
26}
27
28impl LuaLogBatch {
29    /// Return the consumer name for this batch.
30    /// Returns an empty string if the batch has already been committed.
31    fn consumer_name(_lua: &Lua, this: &Self, _: ()) -> mlua::Result<String> {
32        Ok(this
33            .inner
34            .as_ref()
35            .map(|b| b.consumer_name().to_string())
36            .unwrap_or_default())
37    }
38
39    /// Return the records as a lua table (sequence of parsed JSON values).
40    /// Returns an empty table if the batch has already been committed.
41    fn records(lua: &Lua, this: &Self, _: ()) -> mlua::Result<mlua::Value> {
42        let table = lua.create_table()?;
43        if let Some(batch) = &this.inner {
44            let options = config::serialize_options();
45            for (i, value) in batch.records().iter().enumerate() {
46                let lua_value = lua.to_value_with(value, options)?;
47                table.raw_set(i + 1, lua_value)?;
48            }
49        }
50        Ok(mlua::Value::Table(table))
51    }
52
53    /// Advance the checkpoint to the end of this batch.
54    /// No-op if the batch has already been committed.
55    fn commit(_lua: &Lua, this: &mut Self, _: ()) -> mlua::Result<()> {
56        if let Some(mut batch) = this.inner.take() {
57            batch.commit().map_err(any_err)?;
58        }
59        Ok(())
60    }
61
62    /// Return an iterator function that yields one record at a time,
63    /// converting each JSON value to a lua value lazily on demand.
64    /// Returns an iterator that immediately yields nil if the batch
65    /// has already been committed.
66    fn iter_records(lua: &Lua, this: &Self, _: ()) -> mlua::Result<mlua::Function> {
67        let records = match &this.inner {
68            Some(batch) => batch.records().to_vec(),
69            None => Vec::new(),
70        };
71        let records = std::sync::Arc::new(records);
72        let idx = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
73        lua.create_function(move |lua, ()| {
74            let i = idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
75            if let Some(value) = records.get(i) {
76                let options = config::serialize_options();
77                let lua_value = lua.to_value_with(value, options)?;
78                Ok(lua_value)
79            } else {
80                Ok(mlua::Value::Nil)
81            }
82        })
83    }
84}
85
86impl UserData for LuaLogBatch {
87    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
88        methods.add_method("consumer_name", Self::consumer_name);
89        methods.add_method("records", Self::records);
90        methods.add_method("iter_records", Self::iter_records);
91        methods.add_method_mut("commit", Self::commit);
92    }
93}
94
95// ---------------------------------------------------------------------------
96// LuaLogTailer (single-consumer)
97// ---------------------------------------------------------------------------
98
99struct LuaLogTailer {
100    stream: Arc<Mutex<Pin<Box<crate::LogTailer>>>>,
101    close_handle: CloseHandle,
102}
103
104impl LuaLogTailer {
105    async fn close(_lua: Lua, this: UserDataRefMut<Self>, _: ()) -> mlua::Result<()> {
106        this.close_handle.close();
107        Ok(())
108    }
109
110    async fn batches(lua: Lua, this: UserDataRef<Self>, _: ()) -> mlua::Result<mlua::Function> {
111        let stream = this.stream.clone();
112        lua.create_async_function(move |lua, ()| {
113            let stream = stream.clone();
114            async move {
115                let mut guard = stream.lock().await;
116                match guard.next().await {
117                    Some(Ok(batch)) => Ok(mlua::Value::UserData(
118                        lua.create_userdata(LuaLogBatch { inner: Some(batch) })?,
119                    )),
120                    Some(Err(e)) => Err(any_err(e)),
121                    None => Ok(mlua::Value::Nil),
122                }
123            }
124        })
125    }
126}
127
128impl UserData for LuaLogTailer {
129    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
130        methods.add_async_method("batches", Self::batches);
131        methods.add_async_method_mut("close", Self::close);
132        methods.add_async_meta_method_mut(MetaMethod::Close, Self::close);
133    }
134}
135
136// ---------------------------------------------------------------------------
137// LuaMultiConsumerTailer
138// ---------------------------------------------------------------------------
139
140struct LuaMultiConsumerTailer {
141    stream: Arc<Mutex<Pin<Box<crate::MultiConsumerTailer>>>>,
142    close_handle: CloseHandle,
143}
144
145impl LuaMultiConsumerTailer {
146    async fn close(_lua: Lua, this: UserDataRefMut<Self>, _: ()) -> mlua::Result<()> {
147        this.close_handle.close();
148        Ok(())
149    }
150
151    /// Returns an async iterator function that yields a lua table
152    /// (sequence) of [`LuaLogBatch`] userdata values on each call,
153    /// or nil when the stream is exhausted.
154    async fn batches(lua: Lua, this: UserDataRef<Self>, _: ()) -> mlua::Result<mlua::Function> {
155        let stream = this.stream.clone();
156        lua.create_async_function(move |lua, ()| {
157            let stream = stream.clone();
158            async move {
159                let mut guard = stream.lock().await;
160                match guard.next().await {
161                    Some(Ok(batch_vec)) => {
162                        let table = lua.create_table()?;
163                        for (i, batch) in batch_vec.into_iter().enumerate() {
164                            let ud = lua.create_userdata(LuaLogBatch { inner: Some(batch) })?;
165                            table.raw_set(i + 1, ud)?;
166                        }
167                        Ok(mlua::Value::Table(table))
168                    }
169                    Some(Err(e)) => Err(any_err(e)),
170                    None => Ok(mlua::Value::Nil),
171                }
172            }
173        })
174    }
175}
176
177impl UserData for LuaMultiConsumerTailer {
178    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
179        methods.add_async_method("batches", Self::batches);
180        methods.add_async_method_mut("close", Self::close);
181        methods.add_async_meta_method_mut(MetaMethod::Close, Self::close);
182    }
183}
184
185// ---------------------------------------------------------------------------
186// Lua-facing config for multi-consumer (deserialized from a lua table)
187// ---------------------------------------------------------------------------
188
189#[derive(Deserialize)]
190struct LuaConsumerConfig {
191    name: String,
192    #[serde(default = "default_max_batch_size")]
193    max_batch_size: usize,
194    #[serde(default = "default_max_batch_latency", with = "duration_serde")]
195    max_batch_latency: Duration,
196    #[serde(default)]
197    checkpoint_name: Option<String>,
198}
199
200fn default_max_batch_size() -> usize {
201    100
202}
203
204fn default_max_batch_latency() -> Duration {
205    Duration::from_secs(1)
206}
207
208#[derive(Deserialize)]
209struct LuaMultiConsumerTailerConfig {
210    directory: String,
211    #[serde(default = "default_pattern")]
212    pattern: String,
213    #[serde(
214        default,
215        with = "duration_serde",
216        skip_serializing_if = "Option::is_none"
217    )]
218    poll_watcher: Option<Duration>,
219    #[serde(default)]
220    tail: bool,
221    // consumers is extracted manually from the lua table so that
222    // we can handle the filter function field which serde can't
223    // deserialize.
224}
225
226fn default_pattern() -> String {
227    "*".to_string()
228}
229
230/// Helper to build a Rust filter closure from a lua function.
231fn make_rust_filter(
232    lua: &Lua,
233    func: mlua::Function,
234) -> Box<dyn Fn(&serde_json::Value) -> anyhow::Result<bool> + Send> {
235    let lua = lua.clone();
236    Box::new(move |value: &serde_json::Value| -> anyhow::Result<bool> {
237        let options = config::serialize_options();
238        let lua_value = lua.to_value_with(value, options)?;
239        let result: bool = func.call(lua_value.clone())?;
240        Ok(result)
241    })
242}
243
244// ---------------------------------------------------------------------------
245// Registration
246// ---------------------------------------------------------------------------
247
248pub fn register(lua: &Lua) -> anyhow::Result<()> {
249    let tailer_mod = get_or_create_sub_module(lua, "jsonl")?;
250
251    // kumo.jsonl.new_tailer — single-consumer tailer
252    tailer_mod.set(
253        "new_tailer",
254        lua.create_async_function(
255            |lua,
256             (cfg, lua_filter): (
257                SerdeWrappedValue<LogTailerConfig>,
258                Option<mlua::Function>,
259            )| async move {
260                let cfg = cfg.0;
261                let filter = lua_filter.map(|f| make_rust_filter(&lua, f));
262                let tailer = cfg.build_with_filter(filter).await.map_err(any_err)?;
263                let close_handle = tailer.close_handle();
264
265                Ok(LuaLogTailer {
266                    stream: Arc::new(Mutex::new(Box::pin(tailer))),
267                    close_handle,
268                })
269            },
270        )?,
271    )?;
272
273    // kumo.jsonl.new_multi_tailer — multi-consumer tailer
274    //
275    // Usage:
276    //   local tailer <close> = kumo.jsonl.new_multi_tailer {
277    //     directory = '/var/log/kumomta',
278    //     consumers = {
279    //       { name = 'deliveries', checkpoint_name = 'cp-del',
280    //         filter = function(record) return record.type == 'Delivery' end },
281    //       { name = 'bounces', checkpoint_name = 'cp-bounce',
282    //         filter = function(record) return record.type == 'Bounce' end },
283    //     },
284    //   }
285    //
286    //   for batches in tailer:batches() do
287    //     for _, batch in ipairs(batches) do
288    //       print(batch:consumer_name())
289    //       for record in batch:iter_records() do ... end
290    //       batch:commit()
291    //     end
292    //   end
293    tailer_mod.set(
294        "new_multi_tailer",
295        lua.create_async_function(|lua, cfg_value: mlua::Value| async move {
296            let cfg_table = match &cfg_value {
297                mlua::Value::Table(t) => t,
298                _ => return Err(any_err("expected a table")),
299            };
300
301            // Remove consumers before serde deserialization because
302            // the consumer entries may contain lua function values
303            // that serde cannot handle.
304            let consumers_table: mlua::Table = cfg_table.get("consumers")?;
305            cfg_table.set("consumers", mlua::Value::Nil)?;
306
307            // Deserialize the top-level non-function fields via serde
308            let cfg: LuaMultiConsumerTailerConfig = from_lua_value(&lua, cfg_value)?;
309
310            let num_consumers = consumers_table.len()? as usize;
311
312            let mut consumers = Vec::with_capacity(num_consumers);
313            for idx in 0..num_consumers {
314                let entry: mlua::Table = consumers_table.get(idx + 1)?;
315                // Extract the filter function before serde sees the table
316                let filter_func: Option<mlua::Function> =
317                    entry.get::<mlua::Function>("filter").ok();
318                entry.set("filter", mlua::Value::Nil)?;
319                let lc: LuaConsumerConfig = from_lua_value(&lua, mlua::Value::Table(entry))?;
320
321                let mut consumer = ConsumerConfig::new(&lc.name)
322                    .max_batch_size(lc.max_batch_size)
323                    .max_batch_latency(lc.max_batch_latency);
324                if let Some(cp) = lc.checkpoint_name {
325                    consumer = consumer.checkpoint_name(cp);
326                }
327                if let Some(func) = filter_func {
328                    consumer = consumer.filter(make_rust_filter(&lua, func));
329                }
330                consumers.push(consumer);
331            }
332
333            let multi_cfg = MultiConsumerTailerConfig::new(cfg.directory.into(), consumers)
334                .pattern(cfg.pattern)
335                .tail(cfg.tail);
336            let multi_cfg = match cfg.poll_watcher {
337                Some(interval) => multi_cfg.poll_watcher(interval),
338                None => multi_cfg,
339            };
340
341            let tailer = multi_cfg.build().await.map_err(any_err)?;
342            let close_handle = tailer.close_handle();
343
344            Ok(LuaMultiConsumerTailer {
345                stream: Arc::new(Mutex::new(Box::pin(tailer))),
346                close_handle,
347            })
348        })?,
349    )?;
350
351    // kumo.jsonl.new_writer — log file writer
352    tailer_mod.set(
353        "new_writer",
354        lua.create_function(|lua, cfg: SerdeWrappedValue<LogWriterConfig>| {
355            let _ = lua;
356            Ok(LuaLogWriter {
357                inner: Some(cfg.0.build()),
358            })
359        })?,
360    )?;
361
362    Ok(())
363}
364
365// ---------------------------------------------------------------------------
366// LuaLogWriter
367// ---------------------------------------------------------------------------
368
369/// Lua wrapper around [`crate::writer::LogWriter`].
370struct LuaLogWriter {
371    inner: Option<crate::writer::LogWriter>,
372}
373
374impl LuaLogWriter {
375    fn write_line(_lua: &Lua, this: &mut Self, line: String) -> mlua::Result<()> {
376        let writer = this
377            .inner
378            .as_mut()
379            .ok_or_else(|| mlua::Error::external("writer has been closed"))?;
380        writer.write_line(&line).map_err(any_err)
381    }
382
383    fn write_record(lua: &Lua, this: &mut Self, value: mlua::Value) -> mlua::Result<()> {
384        let writer = this
385            .inner
386            .as_mut()
387            .ok_or_else(|| mlua::Error::external("writer has been closed"))?;
388        // Convert lua value to serde_json::Value, then serialize to JSON string
389        let json_value: serde_json::Value = lua.from_value(value)?;
390        writer.write_value(&json_value).map_err(any_err)
391    }
392
393    fn close(_lua: &Lua, this: &mut Self, _: ()) -> mlua::Result<()> {
394        if let Some(mut writer) = this.inner.take() {
395            writer.close().map_err(any_err)?;
396        }
397        Ok(())
398    }
399}
400
401impl UserData for LuaLogWriter {
402    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
403        methods.add_method_mut("write_line", Self::write_line);
404        methods.add_method_mut("write_record", Self::write_record);
405        methods.add_method_mut("close", Self::close);
406        methods.add_meta_method_mut(MetaMethod::Close, Self::close);
407    }
408}