kumo_server_common/
lib.rs

1use bstr::ByteSlice;
2use config::{
3    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
4    serialize_options, CallbackSignature,
5};
6use kumo_server_runtime::available_parallelism;
7use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
8use mod_redis::RedisConnKey;
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::AtomicUsize;
11
12pub mod acct;
13pub mod authn_authz;
14pub mod config_handle;
15pub mod diagnostic_logging;
16pub mod disk_space;
17pub mod hashable_weak;
18pub mod http_server;
19pub mod log;
20pub mod nodeid;
21pub mod panic;
22pub mod start;
23pub mod tls_helpers;
24
25pub fn register(lua: &Lua) -> anyhow::Result<()> {
26    for func in [
27        mod_redis::register,
28        data_loader::register,
29        mod_digest::register,
30        mod_encode::register,
31        mod_aws_sigv4::register,
32        cidr_map::register,
33        domain_map::register,
34        mod_amqp::register,
35        mod_filesystem::register,
36        mod_file_type::register,
37        mod_http::register,
38        mod_regex::register,
39        mod_serde::register,
40        mod_sqlite::register,
41        mod_crypto::register,
42        mod_smtp_response_normalize::register,
43        kumo_jsonl::lua::register,
44        mod_counter_series::register,
45        mod_string::register,
46        mod_time::register,
47        mod_dns_resolver::register,
48        mod_kafka::register,
49        mod_memoize::register,
50        mod_mimepart::register,
51        mod_mpsc::register,
52        mod_nats::register,
53        mod_uuid::register,
54        kumo_api_types::shaping::register,
55        regex_set_map::register,
56        crate::authn_authz::register,
57    ] {
58        func(lua)?;
59    }
60
61    let kumo_mod = get_or_create_module(lua, "kumo")?;
62    kumo_mod.set("version", version_info::kumo_version())?;
63
64    fn event_registrar_name(name: &str) -> String {
65        format!("kumomta-event-registrars-{name}")
66    }
67
68    // Record the call stack of the code calling kumo.on so that
69    // kumo.get_event_registrars can retrieve it later
70    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
71        let decorated_name = event_registrar_name(name);
72        let mut call_stack = vec![];
73        for n in 1.. {
74            match lua.inspect_stack(n, |info| {
75                let source = info.source();
76                format!(
77                    "{}:{}",
78                    source
79                        .short_src
80                        .as_ref()
81                        .map(|b| b.to_string())
82                        .unwrap_or_else(String::new),
83                    info.current_line().unwrap_or(0)
84                )
85            }) {
86                Some(info) => {
87                    call_stack.push(info);
88                }
89                None => break,
90            }
91        }
92
93        let tbl: Value = lua.named_registry_value(&decorated_name)?;
94        match tbl {
95            Value::Nil => {
96                let tbl = lua.create_table()?;
97                tbl.set(1, call_stack)?;
98                lua.set_named_registry_value(&decorated_name, tbl)?;
99                Ok(())
100            }
101            Value::Table(tbl) => {
102                let len = tbl.raw_len();
103                tbl.set(len + 1, call_stack)?;
104                Ok(())
105            }
106            _ => Err(mlua::Error::external(format!(
107                "registry key for {decorated_name} has invalid type",
108            ))),
109        }
110    }
111
112    // Returns the list of call-stacks of the code that registered
113    // for a specific named event
114    kumo_mod.set(
115        "get_event_registrars",
116        lua.create_function(move |lua, name: String| {
117            let decorated_name = event_registrar_name(&name);
118            let value: Value = lua.named_registry_value(&decorated_name)?;
119            Ok(value)
120        })?,
121    )?;
122
123    kumo_mod.set(
124        "on",
125        lua.create_function(move |lua, (name, func): (String, Function)| {
126            let decorated_name = decorate_callback_name(&name);
127
128            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
129                if current_event != "main" {
130                    return Err(mlua::Error::external(format!(
131                        "Attempting to register an event handler via \
132                    `kumo.on('{name}', ...)` from within the event handler \
133                    '{current_event}'. You must move your event handler registration \
134                    so that it is setup directly when the policy is loaded \
135                    in order for it to consistently trigger and handle events."
136                    )));
137                }
138            }
139
140            register_event_caller(lua, &name)?;
141
142            if config::does_callback_allow_multiple(&name) {
143                let tbl: Value = lua.named_registry_value(&decorated_name)?;
144                return match tbl {
145                    Value::Nil => {
146                        let tbl = lua.create_table()?;
147                        tbl.set(1, func)?;
148                        lua.set_named_registry_value(&decorated_name, tbl)?;
149                        Ok(())
150                    }
151                    Value::Table(tbl) => {
152                        let len = tbl.raw_len();
153                        tbl.set(len + 1, func)?;
154                        Ok(())
155                    }
156                    _ => Err(mlua::Error::external(format!(
157                        "registry key for {decorated_name} has invalid type",
158                    ))),
159                };
160            }
161
162            let existing: Value = lua.named_registry_value(&decorated_name)?;
163            match existing {
164                Value::Nil => {}
165                Value::Function(func) => {
166                    let info = func.info();
167                    let src = info.source.unwrap_or_else(|| "?".into());
168                    let line = info.line_defined.unwrap_or(0);
169                    return Err(mlua::Error::external(format!(
170                        "{name} event already has a handler defined at {src}:{line}"
171                    )));
172                }
173                _ => {
174                    return Err(mlua::Error::external(format!(
175                        "{name} event already has a handler"
176                    )));
177                }
178            }
179
180            lua.set_named_registry_value(&decorated_name, func)?;
181            Ok(())
182        })?,
183    )?;
184
185    kumo_mod.set(
186        "set_diagnostic_log_filter",
187        lua.create_function(move |_, filter: String| {
188            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
189        })?,
190    )?;
191
192    fn variadic_to_string(args: Variadic<Value>) -> String {
193        let mut output = String::new();
194        for (idx, item) in args.into_iter().enumerate() {
195            if idx > 0 {
196                output.push(' ');
197            }
198
199            match item {
200                Value::String(s) => {
201                    let bytes = s.as_bytes();
202                    for (start, end, c) in bytes.char_indices() {
203                        if c == std::char::REPLACEMENT_CHARACTER {
204                            let c_slice = &bytes[start..end];
205                            for &b in c_slice.iter() {
206                                output.push_str(&format!("\\x{b:02X}"));
207                            }
208                        } else {
209                            output.push(c);
210                        }
211                    }
212                }
213                item => match item.to_string() {
214                    Ok(s) => output.push_str(&s),
215                    Err(_) => output.push_str(&format!("{item:?}")),
216                },
217            }
218        }
219        output
220    }
221
222    fn get_caller(lua: &Lua) -> String {
223        match lua.inspect_stack(1, |info| {
224            let source = info.source();
225            let file_name = source
226                .short_src
227                .as_ref()
228                .map(|b| b.to_string())
229                .unwrap_or_else(String::new);
230            // Lua returns the somewhat obnoxious `[string "source.lua"]`
231            // Let's fix that up to be a bit nicer
232            let file_name = match file_name.strip_prefix("[string \"") {
233                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
234                None => &file_name,
235            };
236
237            format!("{file_name}:{}", info.current_line().unwrap_or(0))
238        }) {
239            Some(info) => info,
240            None => "?".to_string(),
241        }
242    }
243
244    kumo_mod.set(
245        "log_error",
246        lua.create_function(move |lua, args: Variadic<Value>| {
247            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
248                let src = get_caller(lua);
249                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
250            }
251            Ok(())
252        })?,
253    )?;
254    kumo_mod.set(
255        "log_info",
256        lua.create_function(move |lua, args: Variadic<Value>| {
257            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
258                let src = get_caller(lua);
259                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
260            }
261            Ok(())
262        })?,
263    )?;
264    kumo_mod.set(
265        "log_warn",
266        lua.create_function(move |lua, args: Variadic<Value>| {
267            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
268                let src = get_caller(lua);
269                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
270            }
271            Ok(())
272        })?,
273    )?;
274    kumo_mod.set(
275        "log_debug",
276        lua.create_function(move |lua, args: Variadic<Value>| {
277            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
278                let src = get_caller(lua);
279                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
280            }
281            Ok(())
282        })?,
283    )?;
284
285    kumo_mod.set(
286        "set_max_spare_lua_contexts",
287        lua.create_function(move |_, limit: usize| {
288            config::set_max_spare(limit);
289            Ok(())
290        })?,
291    )?;
292
293    kumo_mod.set(
294        "set_max_lua_context_use_count",
295        lua.create_function(move |_, limit: usize| {
296            config::set_max_use(limit);
297            Ok(())
298        })?,
299    )?;
300
301    kumo_mod.set(
302        "set_max_lua_context_age",
303        lua.create_function(move |_, limit: usize| {
304            config::set_max_age(limit);
305            Ok(())
306        })?,
307    )?;
308
309    kumo_mod.set(
310        "set_lua_gc_on_put",
311        lua.create_function(move |_, enable: u8| {
312            config::set_gc_on_put(enable);
313            Ok(())
314        })?,
315    )?;
316
317    kumo_mod.set(
318        "set_lruttl_cache_capacity",
319        lua.create_function(move |_, (name, capacity): (String, usize)| {
320            if lruttl::set_cache_capacity(&name, capacity) {
321                Ok(())
322            } else {
323                Err(mlua::Error::external(format!(
324                    "could not set capacity for cache {name} \
325                    as that is not a pre-defined lruttl cache name"
326                )))
327            }
328        })?,
329    )?;
330
331    kumo_mod.set(
332        "set_config_monitor_globs",
333        lua.create_function(move |_, globs: Vec<String>| {
334            config::epoch::set_globs(globs).map_err(any_err)?;
335            Ok(())
336        })?,
337    )?;
338    kumo_mod.set(
339        "eval_config_monitor_globs",
340        lua.create_async_function(|_, _: ()| async move {
341            config::epoch::eval_globs().await.map_err(any_err)
342        })?,
343    )?;
344    kumo_mod.set(
345        "bump_config_epoch",
346        lua.create_function(move |_, _: ()| {
347            config::epoch::bump_current_epoch();
348            Ok(())
349        })?,
350    )?;
351
352    kumo_mod.set(
353        "available_parallelism",
354        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
355    )?;
356
357    kumo_mod.set(
358        "set_memory_hard_limit",
359        lua.create_function(move |_, limit: usize| {
360            kumo_server_memory::set_hard_limit(limit);
361            Ok(())
362        })?,
363    )?;
364
365    kumo_mod.set(
366        "set_memory_low_thresh",
367        lua.create_function(move |_, limit: usize| {
368            kumo_server_memory::set_low_memory_thresh(limit);
369            Ok(())
370        })?,
371    )?;
372
373    kumo_mod.set(
374        "set_memory_soft_limit",
375        lua.create_function(move |_, limit: usize| {
376            kumo_server_memory::set_soft_limit(limit);
377            Ok(())
378        })?,
379    )?;
380
381    kumo_mod.set(
382        "get_memory_hard_limit",
383        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
384    )?;
385
386    kumo_mod.set(
387        "get_memory_soft_limit",
388        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
389    )?;
390
391    kumo_mod.set(
392        "get_memory_low_thresh",
393        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
394    )?;
395
396    kumo_mod.set(
397        "configure_redis_throttles",
398        lua.create_async_function(|lua, params: Value| async move {
399            let key: RedisConnKey = from_lua_value(&lua, params)?;
400            let conn = key.open().map_err(any_err)?;
401            conn.ping().await.map_err(any_err)?;
402            throttle::use_redis(conn).await.map_err(any_err)
403        })?,
404    )?;
405
406    kumo_mod.set(
407        "traceback",
408        lua.create_function(move |lua: &Lua, level: usize| {
409            #[derive(Debug, Serialize)]
410            struct Frame {
411                event: String,
412                name: Option<String>,
413                name_what: Option<String>,
414                source: Option<String>,
415                short_src: Option<String>,
416                line_defined: Option<usize>,
417                last_line_defined: Option<usize>,
418                what: &'static str,
419                curr_line: Option<usize>,
420                is_tail_call: bool,
421            }
422
423            let mut frames = vec![];
424            for n in level.. {
425                match lua.inspect_stack(n, |info| {
426                    let source = info.source();
427                    let names = info.names();
428                    Frame {
429                        curr_line: info.current_line(),
430                        is_tail_call: info.is_tail_call(),
431                        event: format!("{:?}", info.event()),
432                        last_line_defined: source.last_line_defined,
433                        line_defined: source.line_defined,
434                        name: names.name.as_ref().map(|b| b.to_string()),
435                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
436                        source: source.source.as_ref().map(|b| b.to_string()),
437                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
438                        what: source.what,
439                    }
440                }) {
441                    Some(frame) => {
442                        frames.push(frame);
443                    }
444                    None => break,
445                }
446            }
447
448            lua.to_value(&frames)
449        })?,
450    )?;
451
452    // TODO: options like restarting on error, delay between
453    // restarts and so on
454    #[derive(Deserialize, Debug)]
455    struct TaskParams {
456        event_name: String,
457        args: Vec<serde_json::Value>,
458    }
459
460    impl TaskParams {
461        async fn run(&self) -> anyhow::Result<()> {
462            let mut config = load_config().await?;
463
464            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
465
466            config
467                .convert_args_and_call_callback(&sig, &self.args)
468                .await?;
469
470            config.put();
471
472            Ok(())
473        }
474    }
475
476    kumo_mod.set(
477        "spawn_task",
478        lua.create_function(|lua, params: Value| {
479            let params: TaskParams = lua.from_value(params)?;
480
481            if !config::is_validating() {
482                std::thread::Builder::new()
483                    .name(format!("spawned-task-{}", params.event_name))
484                    .spawn(move || {
485                        let runtime = tokio::runtime::Builder::new_current_thread()
486                            .enable_io()
487                            .enable_time()
488                            .on_thread_park(kumo_server_memory::purge_thread_cache)
489                            .build()
490                            .unwrap();
491                        let event_name = params.event_name.clone();
492
493                        let result = runtime.block_on(async move { params.run().await });
494                        if let Err(err) = result {
495                            tracing::error!("Error while dispatching {event_name}: {err:#}");
496                        }
497                    })?;
498            }
499
500            Ok(())
501        })?,
502    )?;
503
504    kumo_mod.set(
505        "spawn_thread_pool",
506        lua.create_function(|lua, params: Value| {
507            #[derive(Deserialize, Debug)]
508            struct ThreadPoolParams {
509                name: String,
510                num_threads: usize,
511            }
512
513            let params: ThreadPoolParams = lua.from_value(params)?;
514            let num_threads = AtomicUsize::new(params.num_threads);
515
516            if !config::is_validating() {
517                // Create the runtime. We don't need to hold on
518                // to it here, as it will be kept alive in the
519                // runtimes map in that crate
520                let _runtime = kumo_server_runtime::Runtime::new(
521                    &params.name,
522                    |_| params.num_threads,
523                    &num_threads,
524                )
525                .map_err(any_err)?;
526            }
527
528            Ok(())
529        })?,
530    )?;
531
532    kumo_mod.set(
533        "validation_failed",
534        lua.create_function(|_, ()| {
535            config::set_validation_failed();
536            Ok(())
537        })?,
538    )?;
539
540    kumo_mod.set(
541        "enable_memory_callstack_tracking",
542        lua.create_function(|_, enable: bool| {
543            kumo_server_memory::set_tracking_callstacks(enable);
544            Ok(())
545        })?,
546    )?;
547
548    // This function is intended for debugging and testing purposes only.
549    // It is potentially very expensive on a production system with many
550    // thousands of queues.
551    kumo_mod.set(
552        "prometheus_metrics",
553        lua.create_async_function(|lua, ()| async move {
554            use tokio_stream::StreamExt;
555            let mut json_text = String::new();
556            let mut stream = kumo_prometheus::registry::Registry::stream_json();
557            while let Some(text) = stream.next().await {
558                json_text.push_str(&text);
559            }
560            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
561            lua.to_value_with(&value, serialize_options())
562        })?,
563    )?;
564
565    Ok(())
566}