kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_file_type::register,
31        mod_http::register,
32        mod_regex::register,
33        mod_serde::register,
34        mod_sqlite::register,
35        mod_crypto::register,
36        mod_string::register,
37        mod_time::register,
38        mod_dns_resolver::register,
39        mod_kafka::register,
40        mod_memoize::register,
41        mod_mimepart::register,
42        mod_mpsc::register,
43        mod_uuid::register,
44        kumo_api_types::shaping::register,
45        regex_set_map::register,
46    ] {
47        func(lua)?;
48    }
49
50    let kumo_mod = get_or_create_module(lua, "kumo")?;
51
52    fn event_registrar_name(name: &str) -> String {
53        format!("kumomta-event-registrars-{name}")
54    }
55
56    // Record the call stack of the code calling kumo.on so that
57    // kumo.get_event_registrars can retrieve it later
58    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
59        let decorated_name = event_registrar_name(name);
60        let mut call_stack = vec![];
61        for n in 1.. {
62            match lua.inspect_stack(n, |info| {
63                let source = info.source();
64                format!(
65                    "{}:{}",
66                    source
67                        .short_src
68                        .as_ref()
69                        .map(|b| b.to_string())
70                        .unwrap_or_else(String::new),
71                    info.current_line().unwrap_or(0)
72                )
73            }) {
74                Some(info) => {
75                    call_stack.push(info);
76                }
77                None => break,
78            }
79        }
80
81        let tbl: Value = lua.named_registry_value(&decorated_name)?;
82        match tbl {
83            Value::Nil => {
84                let tbl = lua.create_table()?;
85                tbl.set(1, call_stack)?;
86                lua.set_named_registry_value(&decorated_name, tbl)?;
87                Ok(())
88            }
89            Value::Table(tbl) => {
90                let len = tbl.raw_len();
91                tbl.set(len + 1, call_stack)?;
92                Ok(())
93            }
94            _ => Err(mlua::Error::external(format!(
95                "registry key for {decorated_name} has invalid type",
96            ))),
97        }
98    }
99
100    // Returns the list of call-stacks of the code that registered
101    // for a specific named event
102    kumo_mod.set(
103        "get_event_registrars",
104        lua.create_function(move |lua, name: String| {
105            let decorated_name = event_registrar_name(&name);
106            let value: Value = lua.named_registry_value(&decorated_name)?;
107            Ok(value)
108        })?,
109    )?;
110
111    kumo_mod.set(
112        "on",
113        lua.create_function(move |lua, (name, func): (String, Function)| {
114            let decorated_name = decorate_callback_name(&name);
115
116            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
117                if current_event != "main" {
118                    return Err(mlua::Error::external(format!(
119                        "Attempting to register an event handler via \
120                    `kumo.on('{name}', ...)` from within the event handler \
121                    '{current_event}'. You must move your event handler registration \
122                    so that it is setup directly when the policy is loaded \
123                    in order for it to consistently trigger and handle events."
124                    )));
125                }
126            }
127
128            register_event_caller(lua, &name)?;
129
130            if config::does_callback_allow_multiple(&name) {
131                let tbl: Value = lua.named_registry_value(&decorated_name)?;
132                return match tbl {
133                    Value::Nil => {
134                        let tbl = lua.create_table()?;
135                        tbl.set(1, func)?;
136                        lua.set_named_registry_value(&decorated_name, tbl)?;
137                        Ok(())
138                    }
139                    Value::Table(tbl) => {
140                        let len = tbl.raw_len();
141                        tbl.set(len + 1, func)?;
142                        Ok(())
143                    }
144                    _ => Err(mlua::Error::external(format!(
145                        "registry key for {decorated_name} has invalid type",
146                    ))),
147                };
148            }
149
150            let existing: Value = lua.named_registry_value(&decorated_name)?;
151            match existing {
152                Value::Nil => {}
153                Value::Function(func) => {
154                    let info = func.info();
155                    let src = info.source.unwrap_or_else(|| "?".into());
156                    let line = info.line_defined.unwrap_or(0);
157                    return Err(mlua::Error::external(format!(
158                        "{name} event already has a handler defined at {src}:{line}"
159                    )));
160                }
161                _ => {
162                    return Err(mlua::Error::external(format!(
163                        "{name} event already has a handler"
164                    )));
165                }
166            }
167
168            lua.set_named_registry_value(&decorated_name, func)?;
169            Ok(())
170        })?,
171    )?;
172
173    kumo_mod.set(
174        "set_diagnostic_log_filter",
175        lua.create_function(move |_, filter: String| {
176            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
177        })?,
178    )?;
179
180    fn variadic_to_string(args: Variadic<Value>) -> String {
181        let mut output = String::new();
182        for (idx, item) in args.into_iter().enumerate() {
183            if idx > 0 {
184                output.push(' ');
185            }
186
187            match item {
188                Value::String(s) => match s.to_str() {
189                    Ok(s) => output.push_str(&s),
190                    Err(_) => {
191                        let item = s.to_string_lossy();
192                        output.push_str(&item);
193                    }
194                },
195                item => match item.to_string() {
196                    Ok(s) => output.push_str(&s),
197                    Err(_) => output.push_str(&format!("{item:?}")),
198                },
199            }
200        }
201        output
202    }
203
204    fn get_caller(lua: &Lua) -> String {
205        match lua.inspect_stack(1, |info| {
206            let source = info.source();
207            let file_name = source
208                .short_src
209                .as_ref()
210                .map(|b| b.to_string())
211                .unwrap_or_else(String::new);
212            // Lua returns the somewhat obnoxious `[string "source.lua"]`
213            // Let's fix that up to be a bit nicer
214            let file_name = match file_name.strip_prefix("[string \"") {
215                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
216                None => &file_name,
217            };
218
219            format!("{file_name}:{}", info.current_line().unwrap_or(0))
220        }) {
221            Some(info) => info,
222            None => "?".to_string(),
223        }
224    }
225
226    kumo_mod.set(
227        "log_error",
228        lua.create_function(move |lua, args: Variadic<Value>| {
229            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
230                let src = get_caller(lua);
231                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
232            }
233            Ok(())
234        })?,
235    )?;
236    kumo_mod.set(
237        "log_info",
238        lua.create_function(move |lua, args: Variadic<Value>| {
239            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
240                let src = get_caller(lua);
241                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
242            }
243            Ok(())
244        })?,
245    )?;
246    kumo_mod.set(
247        "log_warn",
248        lua.create_function(move |lua, args: Variadic<Value>| {
249            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
250                let src = get_caller(lua);
251                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
252            }
253            Ok(())
254        })?,
255    )?;
256    kumo_mod.set(
257        "log_debug",
258        lua.create_function(move |lua, args: Variadic<Value>| {
259            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
260                let src = get_caller(lua);
261                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
262            }
263            Ok(())
264        })?,
265    )?;
266
267    kumo_mod.set(
268        "set_max_spare_lua_contexts",
269        lua.create_function(move |_, limit: usize| {
270            config::set_max_spare(limit);
271            Ok(())
272        })?,
273    )?;
274
275    kumo_mod.set(
276        "set_max_lua_context_use_count",
277        lua.create_function(move |_, limit: usize| {
278            config::set_max_use(limit);
279            Ok(())
280        })?,
281    )?;
282
283    kumo_mod.set(
284        "set_max_lua_context_age",
285        lua.create_function(move |_, limit: usize| {
286            config::set_max_age(limit);
287            Ok(())
288        })?,
289    )?;
290
291    kumo_mod.set(
292        "set_lua_gc_on_put",
293        lua.create_function(move |_, enable: u8| {
294            config::set_gc_on_put(enable);
295            Ok(())
296        })?,
297    )?;
298
299    kumo_mod.set(
300        "set_lruttl_cache_capacity",
301        lua.create_function(move |_, (name, capacity): (String, usize)| {
302            if lruttl::set_cache_capacity(&name, capacity) {
303                Ok(())
304            } else {
305                Err(mlua::Error::external(format!(
306                    "could not set capacity for cache {name} \
307                    as that is not a pre-defined lruttl cache name"
308                )))
309            }
310        })?,
311    )?;
312
313    kumo_mod.set(
314        "set_config_monitor_globs",
315        lua.create_function(move |_, globs: Vec<String>| {
316            config::epoch::set_globs(globs).map_err(any_err)?;
317            Ok(())
318        })?,
319    )?;
320    kumo_mod.set(
321        "eval_config_monitor_globs",
322        lua.create_async_function(|_, _: ()| async move {
323            config::epoch::eval_globs().await.map_err(any_err)
324        })?,
325    )?;
326    kumo_mod.set(
327        "bump_config_epoch",
328        lua.create_function(move |_, _: ()| {
329            config::epoch::bump_current_epoch();
330            Ok(())
331        })?,
332    )?;
333
334    kumo_mod.set(
335        "available_parallelism",
336        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
337    )?;
338
339    kumo_mod.set(
340        "set_memory_hard_limit",
341        lua.create_function(move |_, limit: usize| {
342            kumo_server_memory::set_hard_limit(limit);
343            Ok(())
344        })?,
345    )?;
346
347    kumo_mod.set(
348        "set_memory_low_thresh",
349        lua.create_function(move |_, limit: usize| {
350            kumo_server_memory::set_low_memory_thresh(limit);
351            Ok(())
352        })?,
353    )?;
354
355    kumo_mod.set(
356        "set_memory_soft_limit",
357        lua.create_function(move |_, limit: usize| {
358            kumo_server_memory::set_soft_limit(limit);
359            Ok(())
360        })?,
361    )?;
362
363    kumo_mod.set(
364        "configure_redis_throttles",
365        lua.create_async_function(|lua, params: Value| async move {
366            let key: RedisConnKey = from_lua_value(&lua, params)?;
367            let conn = key.open().map_err(any_err)?;
368            conn.ping().await.map_err(any_err)?;
369            throttle::use_redis(conn).await.map_err(any_err)
370        })?,
371    )?;
372
373    kumo_mod.set(
374        "traceback",
375        lua.create_function(move |lua: &Lua, level: usize| {
376            #[derive(Debug, Serialize)]
377            struct Frame {
378                event: String,
379                name: Option<String>,
380                name_what: Option<String>,
381                source: Option<String>,
382                short_src: Option<String>,
383                line_defined: Option<usize>,
384                last_line_defined: Option<usize>,
385                what: &'static str,
386                curr_line: Option<usize>,
387                is_tail_call: bool,
388            }
389
390            let mut frames = vec![];
391            for n in level.. {
392                match lua.inspect_stack(n, |info| {
393                    let source = info.source();
394                    let names = info.names();
395                    Frame {
396                        curr_line: info.current_line(),
397                        is_tail_call: info.is_tail_call(),
398                        event: format!("{:?}", info.event()),
399                        last_line_defined: source.last_line_defined,
400                        line_defined: source.line_defined,
401                        name: names.name.as_ref().map(|b| b.to_string()),
402                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
403                        source: source.source.as_ref().map(|b| b.to_string()),
404                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
405                        what: source.what,
406                    }
407                }) {
408                    Some(frame) => {
409                        frames.push(frame);
410                    }
411                    None => break,
412                }
413            }
414
415            lua.to_value(&frames)
416        })?,
417    )?;
418
419    // TODO: options like restarting on error, delay between
420    // restarts and so on
421    #[derive(Deserialize, Debug)]
422    struct TaskParams {
423        event_name: String,
424        args: Vec<serde_json::Value>,
425    }
426
427    impl TaskParams {
428        async fn run(&self) -> anyhow::Result<()> {
429            let mut config = load_config().await?;
430
431            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
432
433            config
434                .convert_args_and_call_callback(&sig, &self.args)
435                .await?;
436
437            config.put();
438
439            Ok(())
440        }
441    }
442
443    kumo_mod.set(
444        "spawn_task",
445        lua.create_function(|lua, params: Value| {
446            let params: TaskParams = lua.from_value(params)?;
447
448            if !config::is_validating() {
449                std::thread::Builder::new()
450                    .name(format!("spawned-task-{}", params.event_name))
451                    .spawn(move || {
452                        let runtime = tokio::runtime::Builder::new_current_thread()
453                            .enable_io()
454                            .enable_time()
455                            .on_thread_park(kumo_server_memory::purge_thread_cache)
456                            .build()
457                            .unwrap();
458                        let event_name = params.event_name.clone();
459
460                        let result = runtime.block_on(async move { params.run().await });
461                        if let Err(err) = result {
462                            tracing::error!("Error while dispatching {event_name}: {err:#}");
463                        }
464                    })?;
465            }
466
467            Ok(())
468        })?,
469    )?;
470
471    kumo_mod.set(
472        "spawn_thread_pool",
473        lua.create_function(|lua, params: Value| {
474            #[derive(Deserialize, Debug)]
475            struct ThreadPoolParams {
476                name: String,
477                num_threads: usize,
478            }
479
480            let params: ThreadPoolParams = lua.from_value(params)?;
481            let num_threads = AtomicUsize::new(params.num_threads);
482
483            if !config::is_validating() {
484                // Create the runtime. We don't need to hold on
485                // to it here, as it will be kept alive in the
486                // runtimes map in that crate
487                let _runtime = kumo_server_runtime::Runtime::new(
488                    &params.name,
489                    |_| params.num_threads,
490                    &num_threads,
491                )
492                .map_err(any_err)?;
493            }
494
495            Ok(())
496        })?,
497    )?;
498
499    kumo_mod.set(
500        "validation_failed",
501        lua.create_function(|_, ()| {
502            config::set_validation_failed();
503            Ok(())
504        })?,
505    )?;
506
507    kumo_mod.set(
508        "enable_memory_callstack_tracking",
509        lua.create_function(|_, enable: bool| {
510            kumo_server_memory::set_tracking_callstacks(enable);
511            Ok(())
512        })?,
513    )?;
514
515    // This function is intended for debugging and testing purposes only.
516    // It is potentially very expensive on a production system with many
517    // thousands of queues.
518    kumo_mod.set(
519        "prometheus_metrics",
520        lua.create_async_function(|lua, ()| async move {
521            use tokio_stream::StreamExt;
522            let mut json_text = String::new();
523            let mut stream = kumo_prometheus::registry::Registry::stream_json();
524            while let Some(text) = stream.next().await {
525                json_text.push_str(&text);
526            }
527            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
528            lua.to_value_with(&value, serialize_options())
529        })?,
530    )?;
531
532    Ok(())
533}