kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_file_type::register,
31        mod_http::register,
32        mod_regex::register,
33        mod_serde::register,
34        mod_sqlite::register,
35        mod_crypto::register,
36        mod_smtp_response_normalize::register,
37        mod_string::register,
38        mod_time::register,
39        mod_dns_resolver::register,
40        mod_kafka::register,
41        mod_memoize::register,
42        mod_mimepart::register,
43        mod_mpsc::register,
44        mod_uuid::register,
45        kumo_api_types::shaping::register,
46        regex_set_map::register,
47    ] {
48        func(lua)?;
49    }
50
51    let kumo_mod = get_or_create_module(lua, "kumo")?;
52    kumo_mod.set("version", version_info::kumo_version())?;
53
54    fn event_registrar_name(name: &str) -> String {
55        format!("kumomta-event-registrars-{name}")
56    }
57
58    // Record the call stack of the code calling kumo.on so that
59    // kumo.get_event_registrars can retrieve it later
60    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
61        let decorated_name = event_registrar_name(name);
62        let mut call_stack = vec![];
63        for n in 1.. {
64            match lua.inspect_stack(n, |info| {
65                let source = info.source();
66                format!(
67                    "{}:{}",
68                    source
69                        .short_src
70                        .as_ref()
71                        .map(|b| b.to_string())
72                        .unwrap_or_else(String::new),
73                    info.current_line().unwrap_or(0)
74                )
75            }) {
76                Some(info) => {
77                    call_stack.push(info);
78                }
79                None => break,
80            }
81        }
82
83        let tbl: Value = lua.named_registry_value(&decorated_name)?;
84        match tbl {
85            Value::Nil => {
86                let tbl = lua.create_table()?;
87                tbl.set(1, call_stack)?;
88                lua.set_named_registry_value(&decorated_name, tbl)?;
89                Ok(())
90            }
91            Value::Table(tbl) => {
92                let len = tbl.raw_len();
93                tbl.set(len + 1, call_stack)?;
94                Ok(())
95            }
96            _ => Err(mlua::Error::external(format!(
97                "registry key for {decorated_name} has invalid type",
98            ))),
99        }
100    }
101
102    // Returns the list of call-stacks of the code that registered
103    // for a specific named event
104    kumo_mod.set(
105        "get_event_registrars",
106        lua.create_function(move |lua, name: String| {
107            let decorated_name = event_registrar_name(&name);
108            let value: Value = lua.named_registry_value(&decorated_name)?;
109            Ok(value)
110        })?,
111    )?;
112
113    kumo_mod.set(
114        "on",
115        lua.create_function(move |lua, (name, func): (String, Function)| {
116            let decorated_name = decorate_callback_name(&name);
117
118            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
119                if current_event != "main" {
120                    return Err(mlua::Error::external(format!(
121                        "Attempting to register an event handler via \
122                    `kumo.on('{name}', ...)` from within the event handler \
123                    '{current_event}'. You must move your event handler registration \
124                    so that it is setup directly when the policy is loaded \
125                    in order for it to consistently trigger and handle events."
126                    )));
127                }
128            }
129
130            register_event_caller(lua, &name)?;
131
132            if config::does_callback_allow_multiple(&name) {
133                let tbl: Value = lua.named_registry_value(&decorated_name)?;
134                return match tbl {
135                    Value::Nil => {
136                        let tbl = lua.create_table()?;
137                        tbl.set(1, func)?;
138                        lua.set_named_registry_value(&decorated_name, tbl)?;
139                        Ok(())
140                    }
141                    Value::Table(tbl) => {
142                        let len = tbl.raw_len();
143                        tbl.set(len + 1, func)?;
144                        Ok(())
145                    }
146                    _ => Err(mlua::Error::external(format!(
147                        "registry key for {decorated_name} has invalid type",
148                    ))),
149                };
150            }
151
152            let existing: Value = lua.named_registry_value(&decorated_name)?;
153            match existing {
154                Value::Nil => {}
155                Value::Function(func) => {
156                    let info = func.info();
157                    let src = info.source.unwrap_or_else(|| "?".into());
158                    let line = info.line_defined.unwrap_or(0);
159                    return Err(mlua::Error::external(format!(
160                        "{name} event already has a handler defined at {src}:{line}"
161                    )));
162                }
163                _ => {
164                    return Err(mlua::Error::external(format!(
165                        "{name} event already has a handler"
166                    )));
167                }
168            }
169
170            lua.set_named_registry_value(&decorated_name, func)?;
171            Ok(())
172        })?,
173    )?;
174
175    kumo_mod.set(
176        "set_diagnostic_log_filter",
177        lua.create_function(move |_, filter: String| {
178            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
179        })?,
180    )?;
181
182    fn variadic_to_string(args: Variadic<Value>) -> String {
183        let mut output = String::new();
184        for (idx, item) in args.into_iter().enumerate() {
185            if idx > 0 {
186                output.push(' ');
187            }
188
189            match item {
190                Value::String(s) => match s.to_str() {
191                    Ok(s) => output.push_str(&s),
192                    Err(_) => {
193                        let item = s.to_string_lossy();
194                        output.push_str(&item);
195                    }
196                },
197                item => match item.to_string() {
198                    Ok(s) => output.push_str(&s),
199                    Err(_) => output.push_str(&format!("{item:?}")),
200                },
201            }
202        }
203        output
204    }
205
206    fn get_caller(lua: &Lua) -> String {
207        match lua.inspect_stack(1, |info| {
208            let source = info.source();
209            let file_name = source
210                .short_src
211                .as_ref()
212                .map(|b| b.to_string())
213                .unwrap_or_else(String::new);
214            // Lua returns the somewhat obnoxious `[string "source.lua"]`
215            // Let's fix that up to be a bit nicer
216            let file_name = match file_name.strip_prefix("[string \"") {
217                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
218                None => &file_name,
219            };
220
221            format!("{file_name}:{}", info.current_line().unwrap_or(0))
222        }) {
223            Some(info) => info,
224            None => "?".to_string(),
225        }
226    }
227
228    kumo_mod.set(
229        "log_error",
230        lua.create_function(move |lua, args: Variadic<Value>| {
231            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
232                let src = get_caller(lua);
233                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
234            }
235            Ok(())
236        })?,
237    )?;
238    kumo_mod.set(
239        "log_info",
240        lua.create_function(move |lua, args: Variadic<Value>| {
241            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
242                let src = get_caller(lua);
243                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
244            }
245            Ok(())
246        })?,
247    )?;
248    kumo_mod.set(
249        "log_warn",
250        lua.create_function(move |lua, args: Variadic<Value>| {
251            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
252                let src = get_caller(lua);
253                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
254            }
255            Ok(())
256        })?,
257    )?;
258    kumo_mod.set(
259        "log_debug",
260        lua.create_function(move |lua, args: Variadic<Value>| {
261            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
262                let src = get_caller(lua);
263                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
264            }
265            Ok(())
266        })?,
267    )?;
268
269    kumo_mod.set(
270        "set_max_spare_lua_contexts",
271        lua.create_function(move |_, limit: usize| {
272            config::set_max_spare(limit);
273            Ok(())
274        })?,
275    )?;
276
277    kumo_mod.set(
278        "set_max_lua_context_use_count",
279        lua.create_function(move |_, limit: usize| {
280            config::set_max_use(limit);
281            Ok(())
282        })?,
283    )?;
284
285    kumo_mod.set(
286        "set_max_lua_context_age",
287        lua.create_function(move |_, limit: usize| {
288            config::set_max_age(limit);
289            Ok(())
290        })?,
291    )?;
292
293    kumo_mod.set(
294        "set_lua_gc_on_put",
295        lua.create_function(move |_, enable: u8| {
296            config::set_gc_on_put(enable);
297            Ok(())
298        })?,
299    )?;
300
301    kumo_mod.set(
302        "set_lruttl_cache_capacity",
303        lua.create_function(move |_, (name, capacity): (String, usize)| {
304            if lruttl::set_cache_capacity(&name, capacity) {
305                Ok(())
306            } else {
307                Err(mlua::Error::external(format!(
308                    "could not set capacity for cache {name} \
309                    as that is not a pre-defined lruttl cache name"
310                )))
311            }
312        })?,
313    )?;
314
315    kumo_mod.set(
316        "set_config_monitor_globs",
317        lua.create_function(move |_, globs: Vec<String>| {
318            config::epoch::set_globs(globs).map_err(any_err)?;
319            Ok(())
320        })?,
321    )?;
322    kumo_mod.set(
323        "eval_config_monitor_globs",
324        lua.create_async_function(|_, _: ()| async move {
325            config::epoch::eval_globs().await.map_err(any_err)
326        })?,
327    )?;
328    kumo_mod.set(
329        "bump_config_epoch",
330        lua.create_function(move |_, _: ()| {
331            config::epoch::bump_current_epoch();
332            Ok(())
333        })?,
334    )?;
335
336    kumo_mod.set(
337        "available_parallelism",
338        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
339    )?;
340
341    kumo_mod.set(
342        "set_memory_hard_limit",
343        lua.create_function(move |_, limit: usize| {
344            kumo_server_memory::set_hard_limit(limit);
345            Ok(())
346        })?,
347    )?;
348
349    kumo_mod.set(
350        "set_memory_low_thresh",
351        lua.create_function(move |_, limit: usize| {
352            kumo_server_memory::set_low_memory_thresh(limit);
353            Ok(())
354        })?,
355    )?;
356
357    kumo_mod.set(
358        "set_memory_soft_limit",
359        lua.create_function(move |_, limit: usize| {
360            kumo_server_memory::set_soft_limit(limit);
361            Ok(())
362        })?,
363    )?;
364
365    kumo_mod.set(
366        "configure_redis_throttles",
367        lua.create_async_function(|lua, params: Value| async move {
368            let key: RedisConnKey = from_lua_value(&lua, params)?;
369            let conn = key.open().map_err(any_err)?;
370            conn.ping().await.map_err(any_err)?;
371            throttle::use_redis(conn).await.map_err(any_err)
372        })?,
373    )?;
374
375    kumo_mod.set(
376        "traceback",
377        lua.create_function(move |lua: &Lua, level: usize| {
378            #[derive(Debug, Serialize)]
379            struct Frame {
380                event: String,
381                name: Option<String>,
382                name_what: Option<String>,
383                source: Option<String>,
384                short_src: Option<String>,
385                line_defined: Option<usize>,
386                last_line_defined: Option<usize>,
387                what: &'static str,
388                curr_line: Option<usize>,
389                is_tail_call: bool,
390            }
391
392            let mut frames = vec![];
393            for n in level.. {
394                match lua.inspect_stack(n, |info| {
395                    let source = info.source();
396                    let names = info.names();
397                    Frame {
398                        curr_line: info.current_line(),
399                        is_tail_call: info.is_tail_call(),
400                        event: format!("{:?}", info.event()),
401                        last_line_defined: source.last_line_defined,
402                        line_defined: source.line_defined,
403                        name: names.name.as_ref().map(|b| b.to_string()),
404                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
405                        source: source.source.as_ref().map(|b| b.to_string()),
406                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
407                        what: source.what,
408                    }
409                }) {
410                    Some(frame) => {
411                        frames.push(frame);
412                    }
413                    None => break,
414                }
415            }
416
417            lua.to_value(&frames)
418        })?,
419    )?;
420
421    // TODO: options like restarting on error, delay between
422    // restarts and so on
423    #[derive(Deserialize, Debug)]
424    struct TaskParams {
425        event_name: String,
426        args: Vec<serde_json::Value>,
427    }
428
429    impl TaskParams {
430        async fn run(&self) -> anyhow::Result<()> {
431            let mut config = load_config().await?;
432
433            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
434
435            config
436                .convert_args_and_call_callback(&sig, &self.args)
437                .await?;
438
439            config.put();
440
441            Ok(())
442        }
443    }
444
445    kumo_mod.set(
446        "spawn_task",
447        lua.create_function(|lua, params: Value| {
448            let params: TaskParams = lua.from_value(params)?;
449
450            if !config::is_validating() {
451                std::thread::Builder::new()
452                    .name(format!("spawned-task-{}", params.event_name))
453                    .spawn(move || {
454                        let runtime = tokio::runtime::Builder::new_current_thread()
455                            .enable_io()
456                            .enable_time()
457                            .on_thread_park(kumo_server_memory::purge_thread_cache)
458                            .build()
459                            .unwrap();
460                        let event_name = params.event_name.clone();
461
462                        let result = runtime.block_on(async move { params.run().await });
463                        if let Err(err) = result {
464                            tracing::error!("Error while dispatching {event_name}: {err:#}");
465                        }
466                    })?;
467            }
468
469            Ok(())
470        })?,
471    )?;
472
473    kumo_mod.set(
474        "spawn_thread_pool",
475        lua.create_function(|lua, params: Value| {
476            #[derive(Deserialize, Debug)]
477            struct ThreadPoolParams {
478                name: String,
479                num_threads: usize,
480            }
481
482            let params: ThreadPoolParams = lua.from_value(params)?;
483            let num_threads = AtomicUsize::new(params.num_threads);
484
485            if !config::is_validating() {
486                // Create the runtime. We don't need to hold on
487                // to it here, as it will be kept alive in the
488                // runtimes map in that crate
489                let _runtime = kumo_server_runtime::Runtime::new(
490                    &params.name,
491                    |_| params.num_threads,
492                    &num_threads,
493                )
494                .map_err(any_err)?;
495            }
496
497            Ok(())
498        })?,
499    )?;
500
501    kumo_mod.set(
502        "validation_failed",
503        lua.create_function(|_, ()| {
504            config::set_validation_failed();
505            Ok(())
506        })?,
507    )?;
508
509    kumo_mod.set(
510        "enable_memory_callstack_tracking",
511        lua.create_function(|_, enable: bool| {
512            kumo_server_memory::set_tracking_callstacks(enable);
513            Ok(())
514        })?,
515    )?;
516
517    // This function is intended for debugging and testing purposes only.
518    // It is potentially very expensive on a production system with many
519    // thousands of queues.
520    kumo_mod.set(
521        "prometheus_metrics",
522        lua.create_async_function(|lua, ()| async move {
523            use tokio_stream::StreamExt;
524            let mut json_text = String::new();
525            let mut stream = kumo_prometheus::registry::Registry::stream_json();
526            while let Some(text) = stream.next().await {
527                json_text.push_str(&text);
528            }
529            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
530            lua.to_value_with(&value, serialize_options())
531        })?,
532    )?;
533
534    Ok(())
535}