kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_http::register,
31        mod_regex::register,
32        mod_serde::register,
33        mod_sqlite::register,
34        mod_string::register,
35        mod_time::register,
36        mod_dns_resolver::register,
37        mod_kafka::register,
38        mod_memoize::register,
39        mod_mpsc::register,
40        mod_uuid::register,
41        kumo_api_types::shaping::register,
42        regex_set_map::register,
43    ] {
44        func(lua)?;
45    }
46
47    let kumo_mod = get_or_create_module(lua, "kumo")?;
48
49    fn event_registrar_name(name: &str) -> String {
50        format!("kumomta-event-registrars-{name}")
51    }
52
53    // Record the call stack of the code calling kumo.on so that
54    // kumo.get_event_registrars can retrieve it later
55    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
56        let decorated_name = event_registrar_name(name);
57        let mut call_stack = vec![];
58        for n in 1.. {
59            match lua.inspect_stack(n) {
60                Some(info) => {
61                    let source = info.source();
62                    call_stack.push(format!(
63                        "{}:{}",
64                        source
65                            .short_src
66                            .as_ref()
67                            .map(|b| b.to_string())
68                            .unwrap_or_else(String::new),
69                        info.curr_line()
70                    ));
71                }
72                None => break,
73            }
74        }
75
76        let tbl: Value = lua.named_registry_value(&decorated_name)?;
77        match tbl {
78            Value::Nil => {
79                let tbl = lua.create_table()?;
80                tbl.set(1, call_stack)?;
81                lua.set_named_registry_value(&decorated_name, tbl)?;
82                Ok(())
83            }
84            Value::Table(tbl) => {
85                let len = tbl.raw_len();
86                tbl.set(len + 1, call_stack)?;
87                Ok(())
88            }
89            _ => Err(mlua::Error::external(format!(
90                "registry key for {decorated_name} has invalid type",
91            ))),
92        }
93    }
94
95    // Returns the list of call-stacks of the code that registered
96    // for a specific named event
97    kumo_mod.set(
98        "get_event_registrars",
99        lua.create_function(move |lua, name: String| {
100            let decorated_name = event_registrar_name(&name);
101            let value: Value = lua.named_registry_value(&decorated_name)?;
102            Ok(value)
103        })?,
104    )?;
105
106    kumo_mod.set(
107        "on",
108        lua.create_function(move |lua, (name, func): (String, Function)| {
109            let decorated_name = decorate_callback_name(&name);
110
111            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
112                if current_event != "main" {
113                    return Err(mlua::Error::external(format!(
114                        "Attempting to register an event handler via \
115                    `kumo.on('{name}', ...)` from within the event handler \
116                    '{current_event}'. You must move your event handler registration \
117                    so that it is setup directly when the policy is loaded \
118                    in order for it to consistently trigger and handle events."
119                    )));
120                }
121            }
122
123            register_event_caller(lua, &name)?;
124
125            if config::does_callback_allow_multiple(&name) {
126                let tbl: Value = lua.named_registry_value(&decorated_name)?;
127                return match tbl {
128                    Value::Nil => {
129                        let tbl = lua.create_table()?;
130                        tbl.set(1, func)?;
131                        lua.set_named_registry_value(&decorated_name, tbl)?;
132                        Ok(())
133                    }
134                    Value::Table(tbl) => {
135                        let len = tbl.raw_len();
136                        tbl.set(len + 1, func)?;
137                        Ok(())
138                    }
139                    _ => Err(mlua::Error::external(format!(
140                        "registry key for {decorated_name} has invalid type",
141                    ))),
142                };
143            }
144
145            let existing: Value = lua.named_registry_value(&decorated_name)?;
146            match existing {
147                Value::Nil => {}
148                Value::Function(func) => {
149                    let info = func.info();
150                    let src = info.source.unwrap_or_else(|| "?".into());
151                    let line = info.line_defined.unwrap_or(0);
152                    return Err(mlua::Error::external(format!(
153                        "{name} event already has a handler defined at {src}:{line}"
154                    )));
155                }
156                _ => {
157                    return Err(mlua::Error::external(format!(
158                        "{name} event already has a handler"
159                    )));
160                }
161            }
162
163            lua.set_named_registry_value(&decorated_name, func)?;
164            Ok(())
165        })?,
166    )?;
167
168    kumo_mod.set(
169        "set_diagnostic_log_filter",
170        lua.create_function(move |_, filter: String| {
171            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
172        })?,
173    )?;
174
175    fn variadic_to_string(args: Variadic<Value>) -> String {
176        let mut output = String::new();
177        for (idx, item) in args.into_iter().enumerate() {
178            if idx > 0 {
179                output.push(' ');
180            }
181
182            match item {
183                Value::String(s) => match s.to_str() {
184                    Ok(s) => output.push_str(&s),
185                    Err(_) => {
186                        let item = s.to_string_lossy();
187                        output.push_str(&item);
188                    }
189                },
190                item => match item.to_string() {
191                    Ok(s) => output.push_str(&s),
192                    Err(_) => output.push_str(&format!("{item:?}")),
193                },
194            }
195        }
196        output
197    }
198
199    fn get_caller(lua: &Lua) -> String {
200        match lua.inspect_stack(1) {
201            Some(info) => {
202                let source = info.source();
203                let file_name = source
204                    .short_src
205                    .as_ref()
206                    .map(|b| b.to_string())
207                    .unwrap_or_else(String::new);
208                // Lua returns the somewhat obnoxious `[string "source.lua"]`
209                // Let's fix that up to be a bit nicer
210                let file_name = match file_name.strip_prefix("[string \"") {
211                    Some(name) => name.strip_suffix("\"]").unwrap_or(name),
212                    None => &file_name,
213                };
214
215                format!("{file_name}:{}", info.curr_line())
216            }
217            None => "?".to_string(),
218        }
219    }
220
221    kumo_mod.set(
222        "log_error",
223        lua.create_function(move |lua, args: Variadic<Value>| {
224            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
225                let src = get_caller(lua);
226                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
227            }
228            Ok(())
229        })?,
230    )?;
231    kumo_mod.set(
232        "log_info",
233        lua.create_function(move |lua, args: Variadic<Value>| {
234            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
235                let src = get_caller(lua);
236                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
237            }
238            Ok(())
239        })?,
240    )?;
241    kumo_mod.set(
242        "log_warn",
243        lua.create_function(move |lua, args: Variadic<Value>| {
244            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
245                let src = get_caller(lua);
246                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
247            }
248            Ok(())
249        })?,
250    )?;
251    kumo_mod.set(
252        "log_debug",
253        lua.create_function(move |lua, args: Variadic<Value>| {
254            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
255                let src = get_caller(lua);
256                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
257            }
258            Ok(())
259        })?,
260    )?;
261
262    kumo_mod.set(
263        "set_max_spare_lua_contexts",
264        lua.create_function(move |_, limit: usize| {
265            config::set_max_spare(limit);
266            Ok(())
267        })?,
268    )?;
269
270    kumo_mod.set(
271        "set_max_lua_context_use_count",
272        lua.create_function(move |_, limit: usize| {
273            config::set_max_use(limit);
274            Ok(())
275        })?,
276    )?;
277
278    kumo_mod.set(
279        "set_max_lua_context_age",
280        lua.create_function(move |_, limit: usize| {
281            config::set_max_age(limit);
282            Ok(())
283        })?,
284    )?;
285
286    kumo_mod.set(
287        "set_lua_gc_on_put",
288        lua.create_function(move |_, enable: u8| {
289            config::set_gc_on_put(enable);
290            Ok(())
291        })?,
292    )?;
293
294    kumo_mod.set(
295        "set_lruttl_cache_capacity",
296        lua.create_function(move |_, (name, capacity): (String, usize)| {
297            if lruttl::set_cache_capacity(&name, capacity) {
298                Ok(())
299            } else {
300                Err(mlua::Error::external(format!(
301                    "could not set capacity for cache {name} \
302                    as that is not a pre-defined lruttl cache name"
303                )))
304            }
305        })?,
306    )?;
307
308    kumo_mod.set(
309        "set_config_monitor_globs",
310        lua.create_function(move |_, globs: Vec<String>| {
311            config::epoch::set_globs(globs).map_err(any_err)?;
312            Ok(())
313        })?,
314    )?;
315    kumo_mod.set(
316        "eval_config_monitor_globs",
317        lua.create_async_function(|_, _: ()| async move {
318            config::epoch::eval_globs().await.map_err(any_err)
319        })?,
320    )?;
321    kumo_mod.set(
322        "bump_config_epoch",
323        lua.create_function(move |_, _: ()| {
324            config::epoch::bump_current_epoch();
325            Ok(())
326        })?,
327    )?;
328
329    kumo_mod.set(
330        "available_parallelism",
331        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
332    )?;
333
334    kumo_mod.set(
335        "set_memory_hard_limit",
336        lua.create_function(move |_, limit: usize| {
337            kumo_server_memory::set_hard_limit(limit);
338            Ok(())
339        })?,
340    )?;
341
342    kumo_mod.set(
343        "set_memory_low_thresh",
344        lua.create_function(move |_, limit: usize| {
345            kumo_server_memory::set_low_memory_thresh(limit);
346            Ok(())
347        })?,
348    )?;
349
350    kumo_mod.set(
351        "set_memory_soft_limit",
352        lua.create_function(move |_, limit: usize| {
353            kumo_server_memory::set_soft_limit(limit);
354            Ok(())
355        })?,
356    )?;
357
358    kumo_mod.set(
359        "configure_redis_throttles",
360        lua.create_async_function(|lua, params: Value| async move {
361            let key: RedisConnKey = from_lua_value(&lua, params)?;
362            let conn = key.open().map_err(any_err)?;
363            conn.ping().await.map_err(any_err)?;
364            throttle::use_redis(conn).await.map_err(any_err)
365        })?,
366    )?;
367
368    kumo_mod.set(
369        "traceback",
370        lua.create_function(move |lua: &Lua, level: usize| {
371            #[derive(Debug, Serialize)]
372            struct Frame {
373                event: String,
374                name: Option<String>,
375                name_what: Option<String>,
376                source: Option<String>,
377                short_src: Option<String>,
378                line_defined: Option<usize>,
379                last_line_defined: Option<usize>,
380                what: &'static str,
381                curr_line: i32,
382                is_tail_call: bool,
383            }
384
385            let mut frames = vec![];
386            for n in level.. {
387                match lua.inspect_stack(n) {
388                    Some(info) => {
389                        let source = info.source();
390                        let names = info.names();
391                        frames.push(Frame {
392                            curr_line: info.curr_line(),
393                            is_tail_call: info.is_tail_call(),
394                            event: format!("{:?}", info.event()),
395                            last_line_defined: source.last_line_defined,
396                            line_defined: source.line_defined,
397                            name: names.name.as_ref().map(|b| b.to_string()),
398                            name_what: names.name_what.as_ref().map(|b| b.to_string()),
399                            source: source.source.as_ref().map(|b| b.to_string()),
400                            short_src: source.short_src.as_ref().map(|b| b.to_string()),
401                            what: source.what,
402                        });
403                    }
404                    None => break,
405                }
406            }
407
408            lua.to_value(&frames)
409        })?,
410    )?;
411
412    // TODO: options like restarting on error, delay between
413    // restarts and so on
414    #[derive(Deserialize, Debug)]
415    struct TaskParams {
416        event_name: String,
417        args: Vec<serde_json::Value>,
418    }
419
420    impl TaskParams {
421        async fn run(&self) -> anyhow::Result<()> {
422            let mut config = load_config().await?;
423
424            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
425
426            config
427                .convert_args_and_call_callback(&sig, &self.args)
428                .await?;
429
430            config.put();
431
432            Ok(())
433        }
434    }
435
436    kumo_mod.set(
437        "spawn_task",
438        lua.create_function(|lua, params: Value| {
439            let params: TaskParams = lua.from_value(params)?;
440
441            if !config::is_validating() {
442                std::thread::Builder::new()
443                    .name(format!("spawned-task-{}", params.event_name))
444                    .spawn(move || {
445                        let runtime = tokio::runtime::Builder::new_current_thread()
446                            .enable_io()
447                            .enable_time()
448                            .on_thread_park(kumo_server_memory::purge_thread_cache)
449                            .build()
450                            .unwrap();
451                        let event_name = params.event_name.clone();
452
453                        let result = runtime.block_on(async move { params.run().await });
454                        if let Err(err) = result {
455                            tracing::error!("Error while dispatching {event_name}: {err:#}");
456                        }
457                    })?;
458            }
459
460            Ok(())
461        })?,
462    )?;
463
464    kumo_mod.set(
465        "spawn_thread_pool",
466        lua.create_function(|lua, params: Value| {
467            #[derive(Deserialize, Debug)]
468            struct ThreadPoolParams {
469                name: String,
470                num_threads: usize,
471            }
472
473            let params: ThreadPoolParams = lua.from_value(params)?;
474            let num_threads = AtomicUsize::new(params.num_threads);
475
476            if !config::is_validating() {
477                // Create the runtime. We don't need to hold on
478                // to it here, as it will be kept alive in the
479                // runtimes map in that crate
480                let _runtime = kumo_server_runtime::Runtime::new(
481                    &params.name,
482                    |_| params.num_threads,
483                    &num_threads,
484                )
485                .map_err(any_err)?;
486            }
487
488            Ok(())
489        })?,
490    )?;
491
492    kumo_mod.set(
493        "validation_failed",
494        lua.create_function(|_, ()| {
495            config::set_validation_failed();
496            Ok(())
497        })?,
498    )?;
499
500    kumo_mod.set(
501        "enable_memory_callstack_tracking",
502        lua.create_function(|_, enable: bool| {
503            kumo_server_memory::set_tracking_callstacks(enable);
504            Ok(())
505        })?,
506    )?;
507
508    Ok(())
509}