kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_http::register,
31        mod_regex::register,
32        mod_serde::register,
33        mod_sqlite::register,
34        mod_string::register,
35        mod_time::register,
36        mod_dns_resolver::register,
37        mod_kafka::register,
38        mod_memoize::register,
39        mod_mimepart::register,
40        mod_mpsc::register,
41        mod_uuid::register,
42        kumo_api_types::shaping::register,
43        regex_set_map::register,
44    ] {
45        func(lua)?;
46    }
47
48    let kumo_mod = get_or_create_module(lua, "kumo")?;
49
50    fn event_registrar_name(name: &str) -> String {
51        format!("kumomta-event-registrars-{name}")
52    }
53
54    // Record the call stack of the code calling kumo.on so that
55    // kumo.get_event_registrars can retrieve it later
56    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
57        let decorated_name = event_registrar_name(name);
58        let mut call_stack = vec![];
59        for n in 1.. {
60            match lua.inspect_stack(n) {
61                Some(info) => {
62                    let source = info.source();
63                    call_stack.push(format!(
64                        "{}:{}",
65                        source
66                            .short_src
67                            .as_ref()
68                            .map(|b| b.to_string())
69                            .unwrap_or_else(String::new),
70                        info.curr_line()
71                    ));
72                }
73                None => break,
74            }
75        }
76
77        let tbl: Value = lua.named_registry_value(&decorated_name)?;
78        match tbl {
79            Value::Nil => {
80                let tbl = lua.create_table()?;
81                tbl.set(1, call_stack)?;
82                lua.set_named_registry_value(&decorated_name, tbl)?;
83                Ok(())
84            }
85            Value::Table(tbl) => {
86                let len = tbl.raw_len();
87                tbl.set(len + 1, call_stack)?;
88                Ok(())
89            }
90            _ => Err(mlua::Error::external(format!(
91                "registry key for {decorated_name} has invalid type",
92            ))),
93        }
94    }
95
96    // Returns the list of call-stacks of the code that registered
97    // for a specific named event
98    kumo_mod.set(
99        "get_event_registrars",
100        lua.create_function(move |lua, name: String| {
101            let decorated_name = event_registrar_name(&name);
102            let value: Value = lua.named_registry_value(&decorated_name)?;
103            Ok(value)
104        })?,
105    )?;
106
107    kumo_mod.set(
108        "on",
109        lua.create_function(move |lua, (name, func): (String, Function)| {
110            let decorated_name = decorate_callback_name(&name);
111
112            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
113                if current_event != "main" {
114                    return Err(mlua::Error::external(format!(
115                        "Attempting to register an event handler via \
116                    `kumo.on('{name}', ...)` from within the event handler \
117                    '{current_event}'. You must move your event handler registration \
118                    so that it is setup directly when the policy is loaded \
119                    in order for it to consistently trigger and handle events."
120                    )));
121                }
122            }
123
124            register_event_caller(lua, &name)?;
125
126            if config::does_callback_allow_multiple(&name) {
127                let tbl: Value = lua.named_registry_value(&decorated_name)?;
128                return match tbl {
129                    Value::Nil => {
130                        let tbl = lua.create_table()?;
131                        tbl.set(1, func)?;
132                        lua.set_named_registry_value(&decorated_name, tbl)?;
133                        Ok(())
134                    }
135                    Value::Table(tbl) => {
136                        let len = tbl.raw_len();
137                        tbl.set(len + 1, func)?;
138                        Ok(())
139                    }
140                    _ => Err(mlua::Error::external(format!(
141                        "registry key for {decorated_name} has invalid type",
142                    ))),
143                };
144            }
145
146            let existing: Value = lua.named_registry_value(&decorated_name)?;
147            match existing {
148                Value::Nil => {}
149                Value::Function(func) => {
150                    let info = func.info();
151                    let src = info.source.unwrap_or_else(|| "?".into());
152                    let line = info.line_defined.unwrap_or(0);
153                    return Err(mlua::Error::external(format!(
154                        "{name} event already has a handler defined at {src}:{line}"
155                    )));
156                }
157                _ => {
158                    return Err(mlua::Error::external(format!(
159                        "{name} event already has a handler"
160                    )));
161                }
162            }
163
164            lua.set_named_registry_value(&decorated_name, func)?;
165            Ok(())
166        })?,
167    )?;
168
169    kumo_mod.set(
170        "set_diagnostic_log_filter",
171        lua.create_function(move |_, filter: String| {
172            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
173        })?,
174    )?;
175
176    fn variadic_to_string(args: Variadic<Value>) -> String {
177        let mut output = String::new();
178        for (idx, item) in args.into_iter().enumerate() {
179            if idx > 0 {
180                output.push(' ');
181            }
182
183            match item {
184                Value::String(s) => match s.to_str() {
185                    Ok(s) => output.push_str(&s),
186                    Err(_) => {
187                        let item = s.to_string_lossy();
188                        output.push_str(&item);
189                    }
190                },
191                item => match item.to_string() {
192                    Ok(s) => output.push_str(&s),
193                    Err(_) => output.push_str(&format!("{item:?}")),
194                },
195            }
196        }
197        output
198    }
199
200    fn get_caller(lua: &Lua) -> String {
201        match lua.inspect_stack(1) {
202            Some(info) => {
203                let source = info.source();
204                let file_name = source
205                    .short_src
206                    .as_ref()
207                    .map(|b| b.to_string())
208                    .unwrap_or_else(String::new);
209                // Lua returns the somewhat obnoxious `[string "source.lua"]`
210                // Let's fix that up to be a bit nicer
211                let file_name = match file_name.strip_prefix("[string \"") {
212                    Some(name) => name.strip_suffix("\"]").unwrap_or(name),
213                    None => &file_name,
214                };
215
216                format!("{file_name}:{}", info.curr_line())
217            }
218            None => "?".to_string(),
219        }
220    }
221
222    kumo_mod.set(
223        "log_error",
224        lua.create_function(move |lua, args: Variadic<Value>| {
225            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
226                let src = get_caller(lua);
227                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
228            }
229            Ok(())
230        })?,
231    )?;
232    kumo_mod.set(
233        "log_info",
234        lua.create_function(move |lua, args: Variadic<Value>| {
235            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
236                let src = get_caller(lua);
237                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
238            }
239            Ok(())
240        })?,
241    )?;
242    kumo_mod.set(
243        "log_warn",
244        lua.create_function(move |lua, args: Variadic<Value>| {
245            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
246                let src = get_caller(lua);
247                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
248            }
249            Ok(())
250        })?,
251    )?;
252    kumo_mod.set(
253        "log_debug",
254        lua.create_function(move |lua, args: Variadic<Value>| {
255            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
256                let src = get_caller(lua);
257                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
258            }
259            Ok(())
260        })?,
261    )?;
262
263    kumo_mod.set(
264        "set_max_spare_lua_contexts",
265        lua.create_function(move |_, limit: usize| {
266            config::set_max_spare(limit);
267            Ok(())
268        })?,
269    )?;
270
271    kumo_mod.set(
272        "set_max_lua_context_use_count",
273        lua.create_function(move |_, limit: usize| {
274            config::set_max_use(limit);
275            Ok(())
276        })?,
277    )?;
278
279    kumo_mod.set(
280        "set_max_lua_context_age",
281        lua.create_function(move |_, limit: usize| {
282            config::set_max_age(limit);
283            Ok(())
284        })?,
285    )?;
286
287    kumo_mod.set(
288        "set_lua_gc_on_put",
289        lua.create_function(move |_, enable: u8| {
290            config::set_gc_on_put(enable);
291            Ok(())
292        })?,
293    )?;
294
295    kumo_mod.set(
296        "set_lruttl_cache_capacity",
297        lua.create_function(move |_, (name, capacity): (String, usize)| {
298            if lruttl::set_cache_capacity(&name, capacity) {
299                Ok(())
300            } else {
301                Err(mlua::Error::external(format!(
302                    "could not set capacity for cache {name} \
303                    as that is not a pre-defined lruttl cache name"
304                )))
305            }
306        })?,
307    )?;
308
309    kumo_mod.set(
310        "set_config_monitor_globs",
311        lua.create_function(move |_, globs: Vec<String>| {
312            config::epoch::set_globs(globs).map_err(any_err)?;
313            Ok(())
314        })?,
315    )?;
316    kumo_mod.set(
317        "eval_config_monitor_globs",
318        lua.create_async_function(|_, _: ()| async move {
319            config::epoch::eval_globs().await.map_err(any_err)
320        })?,
321    )?;
322    kumo_mod.set(
323        "bump_config_epoch",
324        lua.create_function(move |_, _: ()| {
325            config::epoch::bump_current_epoch();
326            Ok(())
327        })?,
328    )?;
329
330    kumo_mod.set(
331        "available_parallelism",
332        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
333    )?;
334
335    kumo_mod.set(
336        "set_memory_hard_limit",
337        lua.create_function(move |_, limit: usize| {
338            kumo_server_memory::set_hard_limit(limit);
339            Ok(())
340        })?,
341    )?;
342
343    kumo_mod.set(
344        "set_memory_low_thresh",
345        lua.create_function(move |_, limit: usize| {
346            kumo_server_memory::set_low_memory_thresh(limit);
347            Ok(())
348        })?,
349    )?;
350
351    kumo_mod.set(
352        "set_memory_soft_limit",
353        lua.create_function(move |_, limit: usize| {
354            kumo_server_memory::set_soft_limit(limit);
355            Ok(())
356        })?,
357    )?;
358
359    kumo_mod.set(
360        "configure_redis_throttles",
361        lua.create_async_function(|lua, params: Value| async move {
362            let key: RedisConnKey = from_lua_value(&lua, params)?;
363            let conn = key.open().map_err(any_err)?;
364            conn.ping().await.map_err(any_err)?;
365            throttle::use_redis(conn).await.map_err(any_err)
366        })?,
367    )?;
368
369    kumo_mod.set(
370        "traceback",
371        lua.create_function(move |lua: &Lua, level: usize| {
372            #[derive(Debug, Serialize)]
373            struct Frame {
374                event: String,
375                name: Option<String>,
376                name_what: Option<String>,
377                source: Option<String>,
378                short_src: Option<String>,
379                line_defined: Option<usize>,
380                last_line_defined: Option<usize>,
381                what: &'static str,
382                curr_line: i32,
383                is_tail_call: bool,
384            }
385
386            let mut frames = vec![];
387            for n in level.. {
388                match lua.inspect_stack(n) {
389                    Some(info) => {
390                        let source = info.source();
391                        let names = info.names();
392                        frames.push(Frame {
393                            curr_line: info.curr_line(),
394                            is_tail_call: info.is_tail_call(),
395                            event: format!("{:?}", info.event()),
396                            last_line_defined: source.last_line_defined,
397                            line_defined: source.line_defined,
398                            name: names.name.as_ref().map(|b| b.to_string()),
399                            name_what: names.name_what.as_ref().map(|b| b.to_string()),
400                            source: source.source.as_ref().map(|b| b.to_string()),
401                            short_src: source.short_src.as_ref().map(|b| b.to_string()),
402                            what: source.what,
403                        });
404                    }
405                    None => break,
406                }
407            }
408
409            lua.to_value(&frames)
410        })?,
411    )?;
412
413    // TODO: options like restarting on error, delay between
414    // restarts and so on
415    #[derive(Deserialize, Debug)]
416    struct TaskParams {
417        event_name: String,
418        args: Vec<serde_json::Value>,
419    }
420
421    impl TaskParams {
422        async fn run(&self) -> anyhow::Result<()> {
423            let mut config = load_config().await?;
424
425            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
426
427            config
428                .convert_args_and_call_callback(&sig, &self.args)
429                .await?;
430
431            config.put();
432
433            Ok(())
434        }
435    }
436
437    kumo_mod.set(
438        "spawn_task",
439        lua.create_function(|lua, params: Value| {
440            let params: TaskParams = lua.from_value(params)?;
441
442            if !config::is_validating() {
443                std::thread::Builder::new()
444                    .name(format!("spawned-task-{}", params.event_name))
445                    .spawn(move || {
446                        let runtime = tokio::runtime::Builder::new_current_thread()
447                            .enable_io()
448                            .enable_time()
449                            .on_thread_park(kumo_server_memory::purge_thread_cache)
450                            .build()
451                            .unwrap();
452                        let event_name = params.event_name.clone();
453
454                        let result = runtime.block_on(async move { params.run().await });
455                        if let Err(err) = result {
456                            tracing::error!("Error while dispatching {event_name}: {err:#}");
457                        }
458                    })?;
459            }
460
461            Ok(())
462        })?,
463    )?;
464
465    kumo_mod.set(
466        "spawn_thread_pool",
467        lua.create_function(|lua, params: Value| {
468            #[derive(Deserialize, Debug)]
469            struct ThreadPoolParams {
470                name: String,
471                num_threads: usize,
472            }
473
474            let params: ThreadPoolParams = lua.from_value(params)?;
475            let num_threads = AtomicUsize::new(params.num_threads);
476
477            if !config::is_validating() {
478                // Create the runtime. We don't need to hold on
479                // to it here, as it will be kept alive in the
480                // runtimes map in that crate
481                let _runtime = kumo_server_runtime::Runtime::new(
482                    &params.name,
483                    |_| params.num_threads,
484                    &num_threads,
485                )
486                .map_err(any_err)?;
487            }
488
489            Ok(())
490        })?,
491    )?;
492
493    kumo_mod.set(
494        "validation_failed",
495        lua.create_function(|_, ()| {
496            config::set_validation_failed();
497            Ok(())
498        })?,
499    )?;
500
501    kumo_mod.set(
502        "enable_memory_callstack_tracking",
503        lua.create_function(|_, enable: bool| {
504            kumo_server_memory::set_tracking_callstacks(enable);
505            Ok(())
506        })?,
507    )?;
508
509    Ok(())
510}