kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod acct;
12pub mod authn_authz;
13pub mod config_handle;
14pub mod diagnostic_logging;
15pub mod disk_space;
16pub mod http_server;
17pub mod log;
18pub mod nodeid;
19pub mod panic;
20pub mod start;
21pub mod tls_helpers;
22
23pub fn register(lua: &Lua) -> anyhow::Result<()> {
24    for func in [
25        mod_redis::register,
26        data_loader::register,
27        mod_digest::register,
28        mod_encode::register,
29        mod_aws_sigv4::register,
30        cidr_map::register,
31        domain_map::register,
32        mod_amqp::register,
33        mod_filesystem::register,
34        mod_file_type::register,
35        mod_http::register,
36        mod_regex::register,
37        mod_serde::register,
38        mod_sqlite::register,
39        mod_crypto::register,
40        mod_smtp_response_normalize::register,
41        mod_string::register,
42        mod_time::register,
43        mod_dns_resolver::register,
44        mod_kafka::register,
45        mod_memoize::register,
46        mod_mimepart::register,
47        mod_mpsc::register,
48        mod_nats::register,
49        mod_uuid::register,
50        kumo_api_types::shaping::register,
51        regex_set_map::register,
52        crate::authn_authz::register,
53    ] {
54        func(lua)?;
55    }
56
57    let kumo_mod = get_or_create_module(lua, "kumo")?;
58    kumo_mod.set("version", version_info::kumo_version())?;
59
60    fn event_registrar_name(name: &str) -> String {
61        format!("kumomta-event-registrars-{name}")
62    }
63
64    // Record the call stack of the code calling kumo.on so that
65    // kumo.get_event_registrars can retrieve it later
66    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
67        let decorated_name = event_registrar_name(name);
68        let mut call_stack = vec![];
69        for n in 1.. {
70            match lua.inspect_stack(n, |info| {
71                let source = info.source();
72                format!(
73                    "{}:{}",
74                    source
75                        .short_src
76                        .as_ref()
77                        .map(|b| b.to_string())
78                        .unwrap_or_else(String::new),
79                    info.current_line().unwrap_or(0)
80                )
81            }) {
82                Some(info) => {
83                    call_stack.push(info);
84                }
85                None => break,
86            }
87        }
88
89        let tbl: Value = lua.named_registry_value(&decorated_name)?;
90        match tbl {
91            Value::Nil => {
92                let tbl = lua.create_table()?;
93                tbl.set(1, call_stack)?;
94                lua.set_named_registry_value(&decorated_name, tbl)?;
95                Ok(())
96            }
97            Value::Table(tbl) => {
98                let len = tbl.raw_len();
99                tbl.set(len + 1, call_stack)?;
100                Ok(())
101            }
102            _ => Err(mlua::Error::external(format!(
103                "registry key for {decorated_name} has invalid type",
104            ))),
105        }
106    }
107
108    // Returns the list of call-stacks of the code that registered
109    // for a specific named event
110    kumo_mod.set(
111        "get_event_registrars",
112        lua.create_function(move |lua, name: String| {
113            let decorated_name = event_registrar_name(&name);
114            let value: Value = lua.named_registry_value(&decorated_name)?;
115            Ok(value)
116        })?,
117    )?;
118
119    kumo_mod.set(
120        "on",
121        lua.create_function(move |lua, (name, func): (String, Function)| {
122            let decorated_name = decorate_callback_name(&name);
123
124            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
125                if current_event != "main" {
126                    return Err(mlua::Error::external(format!(
127                        "Attempting to register an event handler via \
128                    `kumo.on('{name}', ...)` from within the event handler \
129                    '{current_event}'. You must move your event handler registration \
130                    so that it is setup directly when the policy is loaded \
131                    in order for it to consistently trigger and handle events."
132                    )));
133                }
134            }
135
136            register_event_caller(lua, &name)?;
137
138            if config::does_callback_allow_multiple(&name) {
139                let tbl: Value = lua.named_registry_value(&decorated_name)?;
140                return match tbl {
141                    Value::Nil => {
142                        let tbl = lua.create_table()?;
143                        tbl.set(1, func)?;
144                        lua.set_named_registry_value(&decorated_name, tbl)?;
145                        Ok(())
146                    }
147                    Value::Table(tbl) => {
148                        let len = tbl.raw_len();
149                        tbl.set(len + 1, func)?;
150                        Ok(())
151                    }
152                    _ => Err(mlua::Error::external(format!(
153                        "registry key for {decorated_name} has invalid type",
154                    ))),
155                };
156            }
157
158            let existing: Value = lua.named_registry_value(&decorated_name)?;
159            match existing {
160                Value::Nil => {}
161                Value::Function(func) => {
162                    let info = func.info();
163                    let src = info.source.unwrap_or_else(|| "?".into());
164                    let line = info.line_defined.unwrap_or(0);
165                    return Err(mlua::Error::external(format!(
166                        "{name} event already has a handler defined at {src}:{line}"
167                    )));
168                }
169                _ => {
170                    return Err(mlua::Error::external(format!(
171                        "{name} event already has a handler"
172                    )));
173                }
174            }
175
176            lua.set_named_registry_value(&decorated_name, func)?;
177            Ok(())
178        })?,
179    )?;
180
181    kumo_mod.set(
182        "set_diagnostic_log_filter",
183        lua.create_function(move |_, filter: String| {
184            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
185        })?,
186    )?;
187
188    fn variadic_to_string(args: Variadic<Value>) -> String {
189        let mut output = String::new();
190        for (idx, item) in args.into_iter().enumerate() {
191            if idx > 0 {
192                output.push(' ');
193            }
194
195            match item {
196                Value::String(s) => match s.to_str() {
197                    Ok(s) => output.push_str(&s),
198                    Err(_) => {
199                        let item = s.to_string_lossy();
200                        output.push_str(&item);
201                    }
202                },
203                item => match item.to_string() {
204                    Ok(s) => output.push_str(&s),
205                    Err(_) => output.push_str(&format!("{item:?}")),
206                },
207            }
208        }
209        output
210    }
211
212    fn get_caller(lua: &Lua) -> String {
213        match lua.inspect_stack(1, |info| {
214            let source = info.source();
215            let file_name = source
216                .short_src
217                .as_ref()
218                .map(|b| b.to_string())
219                .unwrap_or_else(String::new);
220            // Lua returns the somewhat obnoxious `[string "source.lua"]`
221            // Let's fix that up to be a bit nicer
222            let file_name = match file_name.strip_prefix("[string \"") {
223                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
224                None => &file_name,
225            };
226
227            format!("{file_name}:{}", info.current_line().unwrap_or(0))
228        }) {
229            Some(info) => info,
230            None => "?".to_string(),
231        }
232    }
233
234    kumo_mod.set(
235        "log_error",
236        lua.create_function(move |lua, args: Variadic<Value>| {
237            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
238                let src = get_caller(lua);
239                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
240            }
241            Ok(())
242        })?,
243    )?;
244    kumo_mod.set(
245        "log_info",
246        lua.create_function(move |lua, args: Variadic<Value>| {
247            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
248                let src = get_caller(lua);
249                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
250            }
251            Ok(())
252        })?,
253    )?;
254    kumo_mod.set(
255        "log_warn",
256        lua.create_function(move |lua, args: Variadic<Value>| {
257            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
258                let src = get_caller(lua);
259                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
260            }
261            Ok(())
262        })?,
263    )?;
264    kumo_mod.set(
265        "log_debug",
266        lua.create_function(move |lua, args: Variadic<Value>| {
267            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
268                let src = get_caller(lua);
269                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
270            }
271            Ok(())
272        })?,
273    )?;
274
275    kumo_mod.set(
276        "set_max_spare_lua_contexts",
277        lua.create_function(move |_, limit: usize| {
278            config::set_max_spare(limit);
279            Ok(())
280        })?,
281    )?;
282
283    kumo_mod.set(
284        "set_max_lua_context_use_count",
285        lua.create_function(move |_, limit: usize| {
286            config::set_max_use(limit);
287            Ok(())
288        })?,
289    )?;
290
291    kumo_mod.set(
292        "set_max_lua_context_age",
293        lua.create_function(move |_, limit: usize| {
294            config::set_max_age(limit);
295            Ok(())
296        })?,
297    )?;
298
299    kumo_mod.set(
300        "set_lua_gc_on_put",
301        lua.create_function(move |_, enable: u8| {
302            config::set_gc_on_put(enable);
303            Ok(())
304        })?,
305    )?;
306
307    kumo_mod.set(
308        "set_lruttl_cache_capacity",
309        lua.create_function(move |_, (name, capacity): (String, usize)| {
310            if lruttl::set_cache_capacity(&name, capacity) {
311                Ok(())
312            } else {
313                Err(mlua::Error::external(format!(
314                    "could not set capacity for cache {name} \
315                    as that is not a pre-defined lruttl cache name"
316                )))
317            }
318        })?,
319    )?;
320
321    kumo_mod.set(
322        "set_config_monitor_globs",
323        lua.create_function(move |_, globs: Vec<String>| {
324            config::epoch::set_globs(globs).map_err(any_err)?;
325            Ok(())
326        })?,
327    )?;
328    kumo_mod.set(
329        "eval_config_monitor_globs",
330        lua.create_async_function(|_, _: ()| async move {
331            config::epoch::eval_globs().await.map_err(any_err)
332        })?,
333    )?;
334    kumo_mod.set(
335        "bump_config_epoch",
336        lua.create_function(move |_, _: ()| {
337            config::epoch::bump_current_epoch();
338            Ok(())
339        })?,
340    )?;
341
342    kumo_mod.set(
343        "available_parallelism",
344        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
345    )?;
346
347    kumo_mod.set(
348        "set_memory_hard_limit",
349        lua.create_function(move |_, limit: usize| {
350            kumo_server_memory::set_hard_limit(limit);
351            Ok(())
352        })?,
353    )?;
354
355    kumo_mod.set(
356        "set_memory_low_thresh",
357        lua.create_function(move |_, limit: usize| {
358            kumo_server_memory::set_low_memory_thresh(limit);
359            Ok(())
360        })?,
361    )?;
362
363    kumo_mod.set(
364        "set_memory_soft_limit",
365        lua.create_function(move |_, limit: usize| {
366            kumo_server_memory::set_soft_limit(limit);
367            Ok(())
368        })?,
369    )?;
370
371    kumo_mod.set(
372        "get_memory_hard_limit",
373        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
374    )?;
375
376    kumo_mod.set(
377        "get_memory_soft_limit",
378        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
379    )?;
380
381    kumo_mod.set(
382        "get_memory_low_thresh",
383        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
384    )?;
385
386    kumo_mod.set(
387        "configure_redis_throttles",
388        lua.create_async_function(|lua, params: Value| async move {
389            let key: RedisConnKey = from_lua_value(&lua, params)?;
390            let conn = key.open().map_err(any_err)?;
391            conn.ping().await.map_err(any_err)?;
392            throttle::use_redis(conn).await.map_err(any_err)
393        })?,
394    )?;
395
396    kumo_mod.set(
397        "traceback",
398        lua.create_function(move |lua: &Lua, level: usize| {
399            #[derive(Debug, Serialize)]
400            struct Frame {
401                event: String,
402                name: Option<String>,
403                name_what: Option<String>,
404                source: Option<String>,
405                short_src: Option<String>,
406                line_defined: Option<usize>,
407                last_line_defined: Option<usize>,
408                what: &'static str,
409                curr_line: Option<usize>,
410                is_tail_call: bool,
411            }
412
413            let mut frames = vec![];
414            for n in level.. {
415                match lua.inspect_stack(n, |info| {
416                    let source = info.source();
417                    let names = info.names();
418                    Frame {
419                        curr_line: info.current_line(),
420                        is_tail_call: info.is_tail_call(),
421                        event: format!("{:?}", info.event()),
422                        last_line_defined: source.last_line_defined,
423                        line_defined: source.line_defined,
424                        name: names.name.as_ref().map(|b| b.to_string()),
425                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
426                        source: source.source.as_ref().map(|b| b.to_string()),
427                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
428                        what: source.what,
429                    }
430                }) {
431                    Some(frame) => {
432                        frames.push(frame);
433                    }
434                    None => break,
435                }
436            }
437
438            lua.to_value(&frames)
439        })?,
440    )?;
441
442    // TODO: options like restarting on error, delay between
443    // restarts and so on
444    #[derive(Deserialize, Debug)]
445    struct TaskParams {
446        event_name: String,
447        args: Vec<serde_json::Value>,
448    }
449
450    impl TaskParams {
451        async fn run(&self) -> anyhow::Result<()> {
452            let mut config = load_config().await?;
453
454            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
455
456            config
457                .convert_args_and_call_callback(&sig, &self.args)
458                .await?;
459
460            config.put();
461
462            Ok(())
463        }
464    }
465
466    kumo_mod.set(
467        "spawn_task",
468        lua.create_function(|lua, params: Value| {
469            let params: TaskParams = lua.from_value(params)?;
470
471            if !config::is_validating() {
472                std::thread::Builder::new()
473                    .name(format!("spawned-task-{}", params.event_name))
474                    .spawn(move || {
475                        let runtime = tokio::runtime::Builder::new_current_thread()
476                            .enable_io()
477                            .enable_time()
478                            .on_thread_park(kumo_server_memory::purge_thread_cache)
479                            .build()
480                            .unwrap();
481                        let event_name = params.event_name.clone();
482
483                        let result = runtime.block_on(async move { params.run().await });
484                        if let Err(err) = result {
485                            tracing::error!("Error while dispatching {event_name}: {err:#}");
486                        }
487                    })?;
488            }
489
490            Ok(())
491        })?,
492    )?;
493
494    kumo_mod.set(
495        "spawn_thread_pool",
496        lua.create_function(|lua, params: Value| {
497            #[derive(Deserialize, Debug)]
498            struct ThreadPoolParams {
499                name: String,
500                num_threads: usize,
501            }
502
503            let params: ThreadPoolParams = lua.from_value(params)?;
504            let num_threads = AtomicUsize::new(params.num_threads);
505
506            if !config::is_validating() {
507                // Create the runtime. We don't need to hold on
508                // to it here, as it will be kept alive in the
509                // runtimes map in that crate
510                let _runtime = kumo_server_runtime::Runtime::new(
511                    &params.name,
512                    |_| params.num_threads,
513                    &num_threads,
514                )
515                .map_err(any_err)?;
516            }
517
518            Ok(())
519        })?,
520    )?;
521
522    kumo_mod.set(
523        "validation_failed",
524        lua.create_function(|_, ()| {
525            config::set_validation_failed();
526            Ok(())
527        })?,
528    )?;
529
530    kumo_mod.set(
531        "enable_memory_callstack_tracking",
532        lua.create_function(|_, enable: bool| {
533            kumo_server_memory::set_tracking_callstacks(enable);
534            Ok(())
535        })?,
536    )?;
537
538    // This function is intended for debugging and testing purposes only.
539    // It is potentially very expensive on a production system with many
540    // thousands of queues.
541    kumo_mod.set(
542        "prometheus_metrics",
543        lua.create_async_function(|lua, ()| async move {
544            use tokio_stream::StreamExt;
545            let mut json_text = String::new();
546            let mut stream = kumo_prometheus::registry::Registry::stream_json();
547            while let Some(text) = stream.next().await {
548                json_text.push_str(&text);
549            }
550            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
551            lua.to_value_with(&value, serialize_options())
552        })?,
553    )?;
554
555    Ok(())
556}