kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_file_type::register,
31        mod_http::register,
32        mod_regex::register,
33        mod_serde::register,
34        mod_sqlite::register,
35        mod_string::register,
36        mod_time::register,
37        mod_dns_resolver::register,
38        mod_kafka::register,
39        mod_memoize::register,
40        mod_mimepart::register,
41        mod_mpsc::register,
42        mod_uuid::register,
43        kumo_api_types::shaping::register,
44        regex_set_map::register,
45    ] {
46        func(lua)?;
47    }
48
49    let kumo_mod = get_or_create_module(lua, "kumo")?;
50
51    fn event_registrar_name(name: &str) -> String {
52        format!("kumomta-event-registrars-{name}")
53    }
54
55    // Record the call stack of the code calling kumo.on so that
56    // kumo.get_event_registrars can retrieve it later
57    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
58        let decorated_name = event_registrar_name(name);
59        let mut call_stack = vec![];
60        for n in 1.. {
61            match lua.inspect_stack(n) {
62                Some(info) => {
63                    let source = info.source();
64                    call_stack.push(format!(
65                        "{}:{}",
66                        source
67                            .short_src
68                            .as_ref()
69                            .map(|b| b.to_string())
70                            .unwrap_or_else(String::new),
71                        info.curr_line()
72                    ));
73                }
74                None => break,
75            }
76        }
77
78        let tbl: Value = lua.named_registry_value(&decorated_name)?;
79        match tbl {
80            Value::Nil => {
81                let tbl = lua.create_table()?;
82                tbl.set(1, call_stack)?;
83                lua.set_named_registry_value(&decorated_name, tbl)?;
84                Ok(())
85            }
86            Value::Table(tbl) => {
87                let len = tbl.raw_len();
88                tbl.set(len + 1, call_stack)?;
89                Ok(())
90            }
91            _ => Err(mlua::Error::external(format!(
92                "registry key for {decorated_name} has invalid type",
93            ))),
94        }
95    }
96
97    // Returns the list of call-stacks of the code that registered
98    // for a specific named event
99    kumo_mod.set(
100        "get_event_registrars",
101        lua.create_function(move |lua, name: String| {
102            let decorated_name = event_registrar_name(&name);
103            let value: Value = lua.named_registry_value(&decorated_name)?;
104            Ok(value)
105        })?,
106    )?;
107
108    kumo_mod.set(
109        "on",
110        lua.create_function(move |lua, (name, func): (String, Function)| {
111            let decorated_name = decorate_callback_name(&name);
112
113            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
114                if current_event != "main" {
115                    return Err(mlua::Error::external(format!(
116                        "Attempting to register an event handler via \
117                    `kumo.on('{name}', ...)` from within the event handler \
118                    '{current_event}'. You must move your event handler registration \
119                    so that it is setup directly when the policy is loaded \
120                    in order for it to consistently trigger and handle events."
121                    )));
122                }
123            }
124
125            register_event_caller(lua, &name)?;
126
127            if config::does_callback_allow_multiple(&name) {
128                let tbl: Value = lua.named_registry_value(&decorated_name)?;
129                return match tbl {
130                    Value::Nil => {
131                        let tbl = lua.create_table()?;
132                        tbl.set(1, func)?;
133                        lua.set_named_registry_value(&decorated_name, tbl)?;
134                        Ok(())
135                    }
136                    Value::Table(tbl) => {
137                        let len = tbl.raw_len();
138                        tbl.set(len + 1, func)?;
139                        Ok(())
140                    }
141                    _ => Err(mlua::Error::external(format!(
142                        "registry key for {decorated_name} has invalid type",
143                    ))),
144                };
145            }
146
147            let existing: Value = lua.named_registry_value(&decorated_name)?;
148            match existing {
149                Value::Nil => {}
150                Value::Function(func) => {
151                    let info = func.info();
152                    let src = info.source.unwrap_or_else(|| "?".into());
153                    let line = info.line_defined.unwrap_or(0);
154                    return Err(mlua::Error::external(format!(
155                        "{name} event already has a handler defined at {src}:{line}"
156                    )));
157                }
158                _ => {
159                    return Err(mlua::Error::external(format!(
160                        "{name} event already has a handler"
161                    )));
162                }
163            }
164
165            lua.set_named_registry_value(&decorated_name, func)?;
166            Ok(())
167        })?,
168    )?;
169
170    kumo_mod.set(
171        "set_diagnostic_log_filter",
172        lua.create_function(move |_, filter: String| {
173            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
174        })?,
175    )?;
176
177    fn variadic_to_string(args: Variadic<Value>) -> String {
178        let mut output = String::new();
179        for (idx, item) in args.into_iter().enumerate() {
180            if idx > 0 {
181                output.push(' ');
182            }
183
184            match item {
185                Value::String(s) => match s.to_str() {
186                    Ok(s) => output.push_str(&s),
187                    Err(_) => {
188                        let item = s.to_string_lossy();
189                        output.push_str(&item);
190                    }
191                },
192                item => match item.to_string() {
193                    Ok(s) => output.push_str(&s),
194                    Err(_) => output.push_str(&format!("{item:?}")),
195                },
196            }
197        }
198        output
199    }
200
201    fn get_caller(lua: &Lua) -> String {
202        match lua.inspect_stack(1) {
203            Some(info) => {
204                let source = info.source();
205                let file_name = source
206                    .short_src
207                    .as_ref()
208                    .map(|b| b.to_string())
209                    .unwrap_or_else(String::new);
210                // Lua returns the somewhat obnoxious `[string "source.lua"]`
211                // Let's fix that up to be a bit nicer
212                let file_name = match file_name.strip_prefix("[string \"") {
213                    Some(name) => name.strip_suffix("\"]").unwrap_or(name),
214                    None => &file_name,
215                };
216
217                format!("{file_name}:{}", info.curr_line())
218            }
219            None => "?".to_string(),
220        }
221    }
222
223    kumo_mod.set(
224        "log_error",
225        lua.create_function(move |lua, args: Variadic<Value>| {
226            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
227                let src = get_caller(lua);
228                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
229            }
230            Ok(())
231        })?,
232    )?;
233    kumo_mod.set(
234        "log_info",
235        lua.create_function(move |lua, args: Variadic<Value>| {
236            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
237                let src = get_caller(lua);
238                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
239            }
240            Ok(())
241        })?,
242    )?;
243    kumo_mod.set(
244        "log_warn",
245        lua.create_function(move |lua, args: Variadic<Value>| {
246            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
247                let src = get_caller(lua);
248                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
249            }
250            Ok(())
251        })?,
252    )?;
253    kumo_mod.set(
254        "log_debug",
255        lua.create_function(move |lua, args: Variadic<Value>| {
256            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
257                let src = get_caller(lua);
258                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
259            }
260            Ok(())
261        })?,
262    )?;
263
264    kumo_mod.set(
265        "set_max_spare_lua_contexts",
266        lua.create_function(move |_, limit: usize| {
267            config::set_max_spare(limit);
268            Ok(())
269        })?,
270    )?;
271
272    kumo_mod.set(
273        "set_max_lua_context_use_count",
274        lua.create_function(move |_, limit: usize| {
275            config::set_max_use(limit);
276            Ok(())
277        })?,
278    )?;
279
280    kumo_mod.set(
281        "set_max_lua_context_age",
282        lua.create_function(move |_, limit: usize| {
283            config::set_max_age(limit);
284            Ok(())
285        })?,
286    )?;
287
288    kumo_mod.set(
289        "set_lua_gc_on_put",
290        lua.create_function(move |_, enable: u8| {
291            config::set_gc_on_put(enable);
292            Ok(())
293        })?,
294    )?;
295
296    kumo_mod.set(
297        "set_lruttl_cache_capacity",
298        lua.create_function(move |_, (name, capacity): (String, usize)| {
299            if lruttl::set_cache_capacity(&name, capacity) {
300                Ok(())
301            } else {
302                Err(mlua::Error::external(format!(
303                    "could not set capacity for cache {name} \
304                    as that is not a pre-defined lruttl cache name"
305                )))
306            }
307        })?,
308    )?;
309
310    kumo_mod.set(
311        "set_config_monitor_globs",
312        lua.create_function(move |_, globs: Vec<String>| {
313            config::epoch::set_globs(globs).map_err(any_err)?;
314            Ok(())
315        })?,
316    )?;
317    kumo_mod.set(
318        "eval_config_monitor_globs",
319        lua.create_async_function(|_, _: ()| async move {
320            config::epoch::eval_globs().await.map_err(any_err)
321        })?,
322    )?;
323    kumo_mod.set(
324        "bump_config_epoch",
325        lua.create_function(move |_, _: ()| {
326            config::epoch::bump_current_epoch();
327            Ok(())
328        })?,
329    )?;
330
331    kumo_mod.set(
332        "available_parallelism",
333        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
334    )?;
335
336    kumo_mod.set(
337        "set_memory_hard_limit",
338        lua.create_function(move |_, limit: usize| {
339            kumo_server_memory::set_hard_limit(limit);
340            Ok(())
341        })?,
342    )?;
343
344    kumo_mod.set(
345        "set_memory_low_thresh",
346        lua.create_function(move |_, limit: usize| {
347            kumo_server_memory::set_low_memory_thresh(limit);
348            Ok(())
349        })?,
350    )?;
351
352    kumo_mod.set(
353        "set_memory_soft_limit",
354        lua.create_function(move |_, limit: usize| {
355            kumo_server_memory::set_soft_limit(limit);
356            Ok(())
357        })?,
358    )?;
359
360    kumo_mod.set(
361        "configure_redis_throttles",
362        lua.create_async_function(|lua, params: Value| async move {
363            let key: RedisConnKey = from_lua_value(&lua, params)?;
364            let conn = key.open().map_err(any_err)?;
365            conn.ping().await.map_err(any_err)?;
366            throttle::use_redis(conn).await.map_err(any_err)
367        })?,
368    )?;
369
370    kumo_mod.set(
371        "traceback",
372        lua.create_function(move |lua: &Lua, level: usize| {
373            #[derive(Debug, Serialize)]
374            struct Frame {
375                event: String,
376                name: Option<String>,
377                name_what: Option<String>,
378                source: Option<String>,
379                short_src: Option<String>,
380                line_defined: Option<usize>,
381                last_line_defined: Option<usize>,
382                what: &'static str,
383                curr_line: i32,
384                is_tail_call: bool,
385            }
386
387            let mut frames = vec![];
388            for n in level.. {
389                match lua.inspect_stack(n) {
390                    Some(info) => {
391                        let source = info.source();
392                        let names = info.names();
393                        frames.push(Frame {
394                            curr_line: info.curr_line(),
395                            is_tail_call: info.is_tail_call(),
396                            event: format!("{:?}", info.event()),
397                            last_line_defined: source.last_line_defined,
398                            line_defined: source.line_defined,
399                            name: names.name.as_ref().map(|b| b.to_string()),
400                            name_what: names.name_what.as_ref().map(|b| b.to_string()),
401                            source: source.source.as_ref().map(|b| b.to_string()),
402                            short_src: source.short_src.as_ref().map(|b| b.to_string()),
403                            what: source.what,
404                        });
405                    }
406                    None => break,
407                }
408            }
409
410            lua.to_value(&frames)
411        })?,
412    )?;
413
414    // TODO: options like restarting on error, delay between
415    // restarts and so on
416    #[derive(Deserialize, Debug)]
417    struct TaskParams {
418        event_name: String,
419        args: Vec<serde_json::Value>,
420    }
421
422    impl TaskParams {
423        async fn run(&self) -> anyhow::Result<()> {
424            let mut config = load_config().await?;
425
426            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
427
428            config
429                .convert_args_and_call_callback(&sig, &self.args)
430                .await?;
431
432            config.put();
433
434            Ok(())
435        }
436    }
437
438    kumo_mod.set(
439        "spawn_task",
440        lua.create_function(|lua, params: Value| {
441            let params: TaskParams = lua.from_value(params)?;
442
443            if !config::is_validating() {
444                std::thread::Builder::new()
445                    .name(format!("spawned-task-{}", params.event_name))
446                    .spawn(move || {
447                        let runtime = tokio::runtime::Builder::new_current_thread()
448                            .enable_io()
449                            .enable_time()
450                            .on_thread_park(kumo_server_memory::purge_thread_cache)
451                            .build()
452                            .unwrap();
453                        let event_name = params.event_name.clone();
454
455                        let result = runtime.block_on(async move { params.run().await });
456                        if let Err(err) = result {
457                            tracing::error!("Error while dispatching {event_name}: {err:#}");
458                        }
459                    })?;
460            }
461
462            Ok(())
463        })?,
464    )?;
465
466    kumo_mod.set(
467        "spawn_thread_pool",
468        lua.create_function(|lua, params: Value| {
469            #[derive(Deserialize, Debug)]
470            struct ThreadPoolParams {
471                name: String,
472                num_threads: usize,
473            }
474
475            let params: ThreadPoolParams = lua.from_value(params)?;
476            let num_threads = AtomicUsize::new(params.num_threads);
477
478            if !config::is_validating() {
479                // Create the runtime. We don't need to hold on
480                // to it here, as it will be kept alive in the
481                // runtimes map in that crate
482                let _runtime = kumo_server_runtime::Runtime::new(
483                    &params.name,
484                    |_| params.num_threads,
485                    &num_threads,
486                )
487                .map_err(any_err)?;
488            }
489
490            Ok(())
491        })?,
492    )?;
493
494    kumo_mod.set(
495        "validation_failed",
496        lua.create_function(|_, ()| {
497            config::set_validation_failed();
498            Ok(())
499        })?,
500    )?;
501
502    kumo_mod.set(
503        "enable_memory_callstack_tracking",
504        lua.create_function(|_, enable: bool| {
505            kumo_server_memory::set_tracking_callstacks(enable);
506            Ok(())
507        })?,
508    )?;
509
510    Ok(())
511}