kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        mod_aws_sigv4::register,
27        cidr_map::register,
28        domain_map::register,
29        mod_amqp::register,
30        mod_filesystem::register,
31        mod_file_type::register,
32        mod_http::register,
33        mod_regex::register,
34        mod_serde::register,
35        mod_sqlite::register,
36        mod_crypto::register,
37        mod_smtp_response_normalize::register,
38        mod_string::register,
39        mod_time::register,
40        mod_dns_resolver::register,
41        mod_kafka::register,
42        mod_memoize::register,
43        mod_mimepart::register,
44        mod_mpsc::register,
45        mod_uuid::register,
46        kumo_api_types::shaping::register,
47        regex_set_map::register,
48    ] {
49        func(lua)?;
50    }
51
52    let kumo_mod = get_or_create_module(lua, "kumo")?;
53    kumo_mod.set("version", version_info::kumo_version())?;
54
55    fn event_registrar_name(name: &str) -> String {
56        format!("kumomta-event-registrars-{name}")
57    }
58
59    // Record the call stack of the code calling kumo.on so that
60    // kumo.get_event_registrars can retrieve it later
61    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
62        let decorated_name = event_registrar_name(name);
63        let mut call_stack = vec![];
64        for n in 1.. {
65            match lua.inspect_stack(n, |info| {
66                let source = info.source();
67                format!(
68                    "{}:{}",
69                    source
70                        .short_src
71                        .as_ref()
72                        .map(|b| b.to_string())
73                        .unwrap_or_else(String::new),
74                    info.current_line().unwrap_or(0)
75                )
76            }) {
77                Some(info) => {
78                    call_stack.push(info);
79                }
80                None => break,
81            }
82        }
83
84        let tbl: Value = lua.named_registry_value(&decorated_name)?;
85        match tbl {
86            Value::Nil => {
87                let tbl = lua.create_table()?;
88                tbl.set(1, call_stack)?;
89                lua.set_named_registry_value(&decorated_name, tbl)?;
90                Ok(())
91            }
92            Value::Table(tbl) => {
93                let len = tbl.raw_len();
94                tbl.set(len + 1, call_stack)?;
95                Ok(())
96            }
97            _ => Err(mlua::Error::external(format!(
98                "registry key for {decorated_name} has invalid type",
99            ))),
100        }
101    }
102
103    // Returns the list of call-stacks of the code that registered
104    // for a specific named event
105    kumo_mod.set(
106        "get_event_registrars",
107        lua.create_function(move |lua, name: String| {
108            let decorated_name = event_registrar_name(&name);
109            let value: Value = lua.named_registry_value(&decorated_name)?;
110            Ok(value)
111        })?,
112    )?;
113
114    kumo_mod.set(
115        "on",
116        lua.create_function(move |lua, (name, func): (String, Function)| {
117            let decorated_name = decorate_callback_name(&name);
118
119            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
120                if current_event != "main" {
121                    return Err(mlua::Error::external(format!(
122                        "Attempting to register an event handler via \
123                    `kumo.on('{name}', ...)` from within the event handler \
124                    '{current_event}'. You must move your event handler registration \
125                    so that it is setup directly when the policy is loaded \
126                    in order for it to consistently trigger and handle events."
127                    )));
128                }
129            }
130
131            register_event_caller(lua, &name)?;
132
133            if config::does_callback_allow_multiple(&name) {
134                let tbl: Value = lua.named_registry_value(&decorated_name)?;
135                return match tbl {
136                    Value::Nil => {
137                        let tbl = lua.create_table()?;
138                        tbl.set(1, func)?;
139                        lua.set_named_registry_value(&decorated_name, tbl)?;
140                        Ok(())
141                    }
142                    Value::Table(tbl) => {
143                        let len = tbl.raw_len();
144                        tbl.set(len + 1, func)?;
145                        Ok(())
146                    }
147                    _ => Err(mlua::Error::external(format!(
148                        "registry key for {decorated_name} has invalid type",
149                    ))),
150                };
151            }
152
153            let existing: Value = lua.named_registry_value(&decorated_name)?;
154            match existing {
155                Value::Nil => {}
156                Value::Function(func) => {
157                    let info = func.info();
158                    let src = info.source.unwrap_or_else(|| "?".into());
159                    let line = info.line_defined.unwrap_or(0);
160                    return Err(mlua::Error::external(format!(
161                        "{name} event already has a handler defined at {src}:{line}"
162                    )));
163                }
164                _ => {
165                    return Err(mlua::Error::external(format!(
166                        "{name} event already has a handler"
167                    )));
168                }
169            }
170
171            lua.set_named_registry_value(&decorated_name, func)?;
172            Ok(())
173        })?,
174    )?;
175
176    kumo_mod.set(
177        "set_diagnostic_log_filter",
178        lua.create_function(move |_, filter: String| {
179            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
180        })?,
181    )?;
182
183    fn variadic_to_string(args: Variadic<Value>) -> String {
184        let mut output = String::new();
185        for (idx, item) in args.into_iter().enumerate() {
186            if idx > 0 {
187                output.push(' ');
188            }
189
190            match item {
191                Value::String(s) => match s.to_str() {
192                    Ok(s) => output.push_str(&s),
193                    Err(_) => {
194                        let item = s.to_string_lossy();
195                        output.push_str(&item);
196                    }
197                },
198                item => match item.to_string() {
199                    Ok(s) => output.push_str(&s),
200                    Err(_) => output.push_str(&format!("{item:?}")),
201                },
202            }
203        }
204        output
205    }
206
207    fn get_caller(lua: &Lua) -> String {
208        match lua.inspect_stack(1, |info| {
209            let source = info.source();
210            let file_name = source
211                .short_src
212                .as_ref()
213                .map(|b| b.to_string())
214                .unwrap_or_else(String::new);
215            // Lua returns the somewhat obnoxious `[string "source.lua"]`
216            // Let's fix that up to be a bit nicer
217            let file_name = match file_name.strip_prefix("[string \"") {
218                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
219                None => &file_name,
220            };
221
222            format!("{file_name}:{}", info.current_line().unwrap_or(0))
223        }) {
224            Some(info) => info,
225            None => "?".to_string(),
226        }
227    }
228
229    kumo_mod.set(
230        "log_error",
231        lua.create_function(move |lua, args: Variadic<Value>| {
232            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
233                let src = get_caller(lua);
234                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
235            }
236            Ok(())
237        })?,
238    )?;
239    kumo_mod.set(
240        "log_info",
241        lua.create_function(move |lua, args: Variadic<Value>| {
242            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
243                let src = get_caller(lua);
244                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
245            }
246            Ok(())
247        })?,
248    )?;
249    kumo_mod.set(
250        "log_warn",
251        lua.create_function(move |lua, args: Variadic<Value>| {
252            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
253                let src = get_caller(lua);
254                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
255            }
256            Ok(())
257        })?,
258    )?;
259    kumo_mod.set(
260        "log_debug",
261        lua.create_function(move |lua, args: Variadic<Value>| {
262            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
263                let src = get_caller(lua);
264                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
265            }
266            Ok(())
267        })?,
268    )?;
269
270    kumo_mod.set(
271        "set_max_spare_lua_contexts",
272        lua.create_function(move |_, limit: usize| {
273            config::set_max_spare(limit);
274            Ok(())
275        })?,
276    )?;
277
278    kumo_mod.set(
279        "set_max_lua_context_use_count",
280        lua.create_function(move |_, limit: usize| {
281            config::set_max_use(limit);
282            Ok(())
283        })?,
284    )?;
285
286    kumo_mod.set(
287        "set_max_lua_context_age",
288        lua.create_function(move |_, limit: usize| {
289            config::set_max_age(limit);
290            Ok(())
291        })?,
292    )?;
293
294    kumo_mod.set(
295        "set_lua_gc_on_put",
296        lua.create_function(move |_, enable: u8| {
297            config::set_gc_on_put(enable);
298            Ok(())
299        })?,
300    )?;
301
302    kumo_mod.set(
303        "set_lruttl_cache_capacity",
304        lua.create_function(move |_, (name, capacity): (String, usize)| {
305            if lruttl::set_cache_capacity(&name, capacity) {
306                Ok(())
307            } else {
308                Err(mlua::Error::external(format!(
309                    "could not set capacity for cache {name} \
310                    as that is not a pre-defined lruttl cache name"
311                )))
312            }
313        })?,
314    )?;
315
316    kumo_mod.set(
317        "set_config_monitor_globs",
318        lua.create_function(move |_, globs: Vec<String>| {
319            config::epoch::set_globs(globs).map_err(any_err)?;
320            Ok(())
321        })?,
322    )?;
323    kumo_mod.set(
324        "eval_config_monitor_globs",
325        lua.create_async_function(|_, _: ()| async move {
326            config::epoch::eval_globs().await.map_err(any_err)
327        })?,
328    )?;
329    kumo_mod.set(
330        "bump_config_epoch",
331        lua.create_function(move |_, _: ()| {
332            config::epoch::bump_current_epoch();
333            Ok(())
334        })?,
335    )?;
336
337    kumo_mod.set(
338        "available_parallelism",
339        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
340    )?;
341
342    kumo_mod.set(
343        "set_memory_hard_limit",
344        lua.create_function(move |_, limit: usize| {
345            kumo_server_memory::set_hard_limit(limit);
346            Ok(())
347        })?,
348    )?;
349
350    kumo_mod.set(
351        "set_memory_low_thresh",
352        lua.create_function(move |_, limit: usize| {
353            kumo_server_memory::set_low_memory_thresh(limit);
354            Ok(())
355        })?,
356    )?;
357
358    kumo_mod.set(
359        "set_memory_soft_limit",
360        lua.create_function(move |_, limit: usize| {
361            kumo_server_memory::set_soft_limit(limit);
362            Ok(())
363        })?,
364    )?;
365
366    kumo_mod.set(
367        "configure_redis_throttles",
368        lua.create_async_function(|lua, params: Value| async move {
369            let key: RedisConnKey = from_lua_value(&lua, params)?;
370            let conn = key.open().map_err(any_err)?;
371            conn.ping().await.map_err(any_err)?;
372            throttle::use_redis(conn).await.map_err(any_err)
373        })?,
374    )?;
375
376    kumo_mod.set(
377        "traceback",
378        lua.create_function(move |lua: &Lua, level: usize| {
379            #[derive(Debug, Serialize)]
380            struct Frame {
381                event: String,
382                name: Option<String>,
383                name_what: Option<String>,
384                source: Option<String>,
385                short_src: Option<String>,
386                line_defined: Option<usize>,
387                last_line_defined: Option<usize>,
388                what: &'static str,
389                curr_line: Option<usize>,
390                is_tail_call: bool,
391            }
392
393            let mut frames = vec![];
394            for n in level.. {
395                match lua.inspect_stack(n, |info| {
396                    let source = info.source();
397                    let names = info.names();
398                    Frame {
399                        curr_line: info.current_line(),
400                        is_tail_call: info.is_tail_call(),
401                        event: format!("{:?}", info.event()),
402                        last_line_defined: source.last_line_defined,
403                        line_defined: source.line_defined,
404                        name: names.name.as_ref().map(|b| b.to_string()),
405                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
406                        source: source.source.as_ref().map(|b| b.to_string()),
407                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
408                        what: source.what,
409                    }
410                }) {
411                    Some(frame) => {
412                        frames.push(frame);
413                    }
414                    None => break,
415                }
416            }
417
418            lua.to_value(&frames)
419        })?,
420    )?;
421
422    // TODO: options like restarting on error, delay between
423    // restarts and so on
424    #[derive(Deserialize, Debug)]
425    struct TaskParams {
426        event_name: String,
427        args: Vec<serde_json::Value>,
428    }
429
430    impl TaskParams {
431        async fn run(&self) -> anyhow::Result<()> {
432            let mut config = load_config().await?;
433
434            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
435
436            config
437                .convert_args_and_call_callback(&sig, &self.args)
438                .await?;
439
440            config.put();
441
442            Ok(())
443        }
444    }
445
446    kumo_mod.set(
447        "spawn_task",
448        lua.create_function(|lua, params: Value| {
449            let params: TaskParams = lua.from_value(params)?;
450
451            if !config::is_validating() {
452                std::thread::Builder::new()
453                    .name(format!("spawned-task-{}", params.event_name))
454                    .spawn(move || {
455                        let runtime = tokio::runtime::Builder::new_current_thread()
456                            .enable_io()
457                            .enable_time()
458                            .on_thread_park(kumo_server_memory::purge_thread_cache)
459                            .build()
460                            .unwrap();
461                        let event_name = params.event_name.clone();
462
463                        let result = runtime.block_on(async move { params.run().await });
464                        if let Err(err) = result {
465                            tracing::error!("Error while dispatching {event_name}: {err:#}");
466                        }
467                    })?;
468            }
469
470            Ok(())
471        })?,
472    )?;
473
474    kumo_mod.set(
475        "spawn_thread_pool",
476        lua.create_function(|lua, params: Value| {
477            #[derive(Deserialize, Debug)]
478            struct ThreadPoolParams {
479                name: String,
480                num_threads: usize,
481            }
482
483            let params: ThreadPoolParams = lua.from_value(params)?;
484            let num_threads = AtomicUsize::new(params.num_threads);
485
486            if !config::is_validating() {
487                // Create the runtime. We don't need to hold on
488                // to it here, as it will be kept alive in the
489                // runtimes map in that crate
490                let _runtime = kumo_server_runtime::Runtime::new(
491                    &params.name,
492                    |_| params.num_threads,
493                    &num_threads,
494                )
495                .map_err(any_err)?;
496            }
497
498            Ok(())
499        })?,
500    )?;
501
502    kumo_mod.set(
503        "validation_failed",
504        lua.create_function(|_, ()| {
505            config::set_validation_failed();
506            Ok(())
507        })?,
508    )?;
509
510    kumo_mod.set(
511        "enable_memory_callstack_tracking",
512        lua.create_function(|_, enable: bool| {
513            kumo_server_memory::set_tracking_callstacks(enable);
514            Ok(())
515        })?,
516    )?;
517
518    // This function is intended for debugging and testing purposes only.
519    // It is potentially very expensive on a production system with many
520    // thousands of queues.
521    kumo_mod.set(
522        "prometheus_metrics",
523        lua.create_async_function(|lua, ()| async move {
524            use tokio_stream::StreamExt;
525            let mut json_text = String::new();
526            let mut stream = kumo_prometheus::registry::Registry::stream_json();
527            while let Some(text) = stream.next().await {
528                json_text.push_str(&text);
529            }
530            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
531            lua.to_value_with(&value, serialize_options())
532        })?,
533    )?;
534
535    Ok(())
536}