kumo_server_common/
lib.rs

1use config::{
2    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3    CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21    for func in [
22        mod_redis::register,
23        data_loader::register,
24        mod_digest::register,
25        mod_encode::register,
26        cidr_map::register,
27        domain_map::register,
28        mod_amqp::register,
29        mod_filesystem::register,
30        mod_file_type::register,
31        mod_http::register,
32        mod_regex::register,
33        mod_serde::register,
34        mod_sqlite::register,
35        mod_string::register,
36        mod_time::register,
37        mod_dns_resolver::register,
38        mod_kafka::register,
39        mod_memoize::register,
40        mod_mimepart::register,
41        mod_mpsc::register,
42        mod_uuid::register,
43        kumo_api_types::shaping::register,
44        regex_set_map::register,
45    ] {
46        func(lua)?;
47    }
48
49    let kumo_mod = get_or_create_module(lua, "kumo")?;
50
51    fn event_registrar_name(name: &str) -> String {
52        format!("kumomta-event-registrars-{name}")
53    }
54
55    // Record the call stack of the code calling kumo.on so that
56    // kumo.get_event_registrars can retrieve it later
57    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
58        let decorated_name = event_registrar_name(name);
59        let mut call_stack = vec![];
60        for n in 1.. {
61            match lua.inspect_stack(n, |info| {
62                let source = info.source();
63                format!(
64                    "{}:{}",
65                    source
66                        .short_src
67                        .as_ref()
68                        .map(|b| b.to_string())
69                        .unwrap_or_else(String::new),
70                    info.current_line().unwrap_or(0)
71                )
72            }) {
73                Some(info) => {
74                    call_stack.push(info);
75                }
76                None => break,
77            }
78        }
79
80        let tbl: Value = lua.named_registry_value(&decorated_name)?;
81        match tbl {
82            Value::Nil => {
83                let tbl = lua.create_table()?;
84                tbl.set(1, call_stack)?;
85                lua.set_named_registry_value(&decorated_name, tbl)?;
86                Ok(())
87            }
88            Value::Table(tbl) => {
89                let len = tbl.raw_len();
90                tbl.set(len + 1, call_stack)?;
91                Ok(())
92            }
93            _ => Err(mlua::Error::external(format!(
94                "registry key for {decorated_name} has invalid type",
95            ))),
96        }
97    }
98
99    // Returns the list of call-stacks of the code that registered
100    // for a specific named event
101    kumo_mod.set(
102        "get_event_registrars",
103        lua.create_function(move |lua, name: String| {
104            let decorated_name = event_registrar_name(&name);
105            let value: Value = lua.named_registry_value(&decorated_name)?;
106            Ok(value)
107        })?,
108    )?;
109
110    kumo_mod.set(
111        "on",
112        lua.create_function(move |lua, (name, func): (String, Function)| {
113            let decorated_name = decorate_callback_name(&name);
114
115            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
116                if current_event != "main" {
117                    return Err(mlua::Error::external(format!(
118                        "Attempting to register an event handler via \
119                    `kumo.on('{name}', ...)` from within the event handler \
120                    '{current_event}'. You must move your event handler registration \
121                    so that it is setup directly when the policy is loaded \
122                    in order for it to consistently trigger and handle events."
123                    )));
124                }
125            }
126
127            register_event_caller(lua, &name)?;
128
129            if config::does_callback_allow_multiple(&name) {
130                let tbl: Value = lua.named_registry_value(&decorated_name)?;
131                return match tbl {
132                    Value::Nil => {
133                        let tbl = lua.create_table()?;
134                        tbl.set(1, func)?;
135                        lua.set_named_registry_value(&decorated_name, tbl)?;
136                        Ok(())
137                    }
138                    Value::Table(tbl) => {
139                        let len = tbl.raw_len();
140                        tbl.set(len + 1, func)?;
141                        Ok(())
142                    }
143                    _ => Err(mlua::Error::external(format!(
144                        "registry key for {decorated_name} has invalid type",
145                    ))),
146                };
147            }
148
149            let existing: Value = lua.named_registry_value(&decorated_name)?;
150            match existing {
151                Value::Nil => {}
152                Value::Function(func) => {
153                    let info = func.info();
154                    let src = info.source.unwrap_or_else(|| "?".into());
155                    let line = info.line_defined.unwrap_or(0);
156                    return Err(mlua::Error::external(format!(
157                        "{name} event already has a handler defined at {src}:{line}"
158                    )));
159                }
160                _ => {
161                    return Err(mlua::Error::external(format!(
162                        "{name} event already has a handler"
163                    )));
164                }
165            }
166
167            lua.set_named_registry_value(&decorated_name, func)?;
168            Ok(())
169        })?,
170    )?;
171
172    kumo_mod.set(
173        "set_diagnostic_log_filter",
174        lua.create_function(move |_, filter: String| {
175            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
176        })?,
177    )?;
178
179    fn variadic_to_string(args: Variadic<Value>) -> String {
180        let mut output = String::new();
181        for (idx, item) in args.into_iter().enumerate() {
182            if idx > 0 {
183                output.push(' ');
184            }
185
186            match item {
187                Value::String(s) => match s.to_str() {
188                    Ok(s) => output.push_str(&s),
189                    Err(_) => {
190                        let item = s.to_string_lossy();
191                        output.push_str(&item);
192                    }
193                },
194                item => match item.to_string() {
195                    Ok(s) => output.push_str(&s),
196                    Err(_) => output.push_str(&format!("{item:?}")),
197                },
198            }
199        }
200        output
201    }
202
203    fn get_caller(lua: &Lua) -> String {
204        match lua.inspect_stack(1, |info| {
205            let source = info.source();
206            let file_name = source
207                .short_src
208                .as_ref()
209                .map(|b| b.to_string())
210                .unwrap_or_else(String::new);
211            // Lua returns the somewhat obnoxious `[string "source.lua"]`
212            // Let's fix that up to be a bit nicer
213            let file_name = match file_name.strip_prefix("[string \"") {
214                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
215                None => &file_name,
216            };
217
218            format!("{file_name}:{}", info.current_line().unwrap_or(0))
219        }) {
220            Some(info) => info,
221            None => "?".to_string(),
222        }
223    }
224
225    kumo_mod.set(
226        "log_error",
227        lua.create_function(move |lua, args: Variadic<Value>| {
228            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
229                let src = get_caller(lua);
230                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
231            }
232            Ok(())
233        })?,
234    )?;
235    kumo_mod.set(
236        "log_info",
237        lua.create_function(move |lua, args: Variadic<Value>| {
238            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
239                let src = get_caller(lua);
240                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
241            }
242            Ok(())
243        })?,
244    )?;
245    kumo_mod.set(
246        "log_warn",
247        lua.create_function(move |lua, args: Variadic<Value>| {
248            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
249                let src = get_caller(lua);
250                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
251            }
252            Ok(())
253        })?,
254    )?;
255    kumo_mod.set(
256        "log_debug",
257        lua.create_function(move |lua, args: Variadic<Value>| {
258            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
259                let src = get_caller(lua);
260                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
261            }
262            Ok(())
263        })?,
264    )?;
265
266    kumo_mod.set(
267        "set_max_spare_lua_contexts",
268        lua.create_function(move |_, limit: usize| {
269            config::set_max_spare(limit);
270            Ok(())
271        })?,
272    )?;
273
274    kumo_mod.set(
275        "set_max_lua_context_use_count",
276        lua.create_function(move |_, limit: usize| {
277            config::set_max_use(limit);
278            Ok(())
279        })?,
280    )?;
281
282    kumo_mod.set(
283        "set_max_lua_context_age",
284        lua.create_function(move |_, limit: usize| {
285            config::set_max_age(limit);
286            Ok(())
287        })?,
288    )?;
289
290    kumo_mod.set(
291        "set_lua_gc_on_put",
292        lua.create_function(move |_, enable: u8| {
293            config::set_gc_on_put(enable);
294            Ok(())
295        })?,
296    )?;
297
298    kumo_mod.set(
299        "set_lruttl_cache_capacity",
300        lua.create_function(move |_, (name, capacity): (String, usize)| {
301            if lruttl::set_cache_capacity(&name, capacity) {
302                Ok(())
303            } else {
304                Err(mlua::Error::external(format!(
305                    "could not set capacity for cache {name} \
306                    as that is not a pre-defined lruttl cache name"
307                )))
308            }
309        })?,
310    )?;
311
312    kumo_mod.set(
313        "set_config_monitor_globs",
314        lua.create_function(move |_, globs: Vec<String>| {
315            config::epoch::set_globs(globs).map_err(any_err)?;
316            Ok(())
317        })?,
318    )?;
319    kumo_mod.set(
320        "eval_config_monitor_globs",
321        lua.create_async_function(|_, _: ()| async move {
322            config::epoch::eval_globs().await.map_err(any_err)
323        })?,
324    )?;
325    kumo_mod.set(
326        "bump_config_epoch",
327        lua.create_function(move |_, _: ()| {
328            config::epoch::bump_current_epoch();
329            Ok(())
330        })?,
331    )?;
332
333    kumo_mod.set(
334        "available_parallelism",
335        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
336    )?;
337
338    kumo_mod.set(
339        "set_memory_hard_limit",
340        lua.create_function(move |_, limit: usize| {
341            kumo_server_memory::set_hard_limit(limit);
342            Ok(())
343        })?,
344    )?;
345
346    kumo_mod.set(
347        "set_memory_low_thresh",
348        lua.create_function(move |_, limit: usize| {
349            kumo_server_memory::set_low_memory_thresh(limit);
350            Ok(())
351        })?,
352    )?;
353
354    kumo_mod.set(
355        "set_memory_soft_limit",
356        lua.create_function(move |_, limit: usize| {
357            kumo_server_memory::set_soft_limit(limit);
358            Ok(())
359        })?,
360    )?;
361
362    kumo_mod.set(
363        "configure_redis_throttles",
364        lua.create_async_function(|lua, params: Value| async move {
365            let key: RedisConnKey = from_lua_value(&lua, params)?;
366            let conn = key.open().map_err(any_err)?;
367            conn.ping().await.map_err(any_err)?;
368            throttle::use_redis(conn).await.map_err(any_err)
369        })?,
370    )?;
371
372    kumo_mod.set(
373        "traceback",
374        lua.create_function(move |lua: &Lua, level: usize| {
375            #[derive(Debug, Serialize)]
376            struct Frame {
377                event: String,
378                name: Option<String>,
379                name_what: Option<String>,
380                source: Option<String>,
381                short_src: Option<String>,
382                line_defined: Option<usize>,
383                last_line_defined: Option<usize>,
384                what: &'static str,
385                curr_line: Option<usize>,
386                is_tail_call: bool,
387            }
388
389            let mut frames = vec![];
390            for n in level.. {
391                match lua.inspect_stack(n, |info| {
392                    let source = info.source();
393                    let names = info.names();
394                    Frame {
395                        curr_line: info.current_line(),
396                        is_tail_call: info.is_tail_call(),
397                        event: format!("{:?}", info.event()),
398                        last_line_defined: source.last_line_defined,
399                        line_defined: source.line_defined,
400                        name: names.name.as_ref().map(|b| b.to_string()),
401                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
402                        source: source.source.as_ref().map(|b| b.to_string()),
403                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
404                        what: source.what,
405                    }
406                }) {
407                    Some(frame) => {
408                        frames.push(frame);
409                    }
410                    None => break,
411                }
412            }
413
414            lua.to_value(&frames)
415        })?,
416    )?;
417
418    // TODO: options like restarting on error, delay between
419    // restarts and so on
420    #[derive(Deserialize, Debug)]
421    struct TaskParams {
422        event_name: String,
423        args: Vec<serde_json::Value>,
424    }
425
426    impl TaskParams {
427        async fn run(&self) -> anyhow::Result<()> {
428            let mut config = load_config().await?;
429
430            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
431
432            config
433                .convert_args_and_call_callback(&sig, &self.args)
434                .await?;
435
436            config.put();
437
438            Ok(())
439        }
440    }
441
442    kumo_mod.set(
443        "spawn_task",
444        lua.create_function(|lua, params: Value| {
445            let params: TaskParams = lua.from_value(params)?;
446
447            if !config::is_validating() {
448                std::thread::Builder::new()
449                    .name(format!("spawned-task-{}", params.event_name))
450                    .spawn(move || {
451                        let runtime = tokio::runtime::Builder::new_current_thread()
452                            .enable_io()
453                            .enable_time()
454                            .on_thread_park(kumo_server_memory::purge_thread_cache)
455                            .build()
456                            .unwrap();
457                        let event_name = params.event_name.clone();
458
459                        let result = runtime.block_on(async move { params.run().await });
460                        if let Err(err) = result {
461                            tracing::error!("Error while dispatching {event_name}: {err:#}");
462                        }
463                    })?;
464            }
465
466            Ok(())
467        })?,
468    )?;
469
470    kumo_mod.set(
471        "spawn_thread_pool",
472        lua.create_function(|lua, params: Value| {
473            #[derive(Deserialize, Debug)]
474            struct ThreadPoolParams {
475                name: String,
476                num_threads: usize,
477            }
478
479            let params: ThreadPoolParams = lua.from_value(params)?;
480            let num_threads = AtomicUsize::new(params.num_threads);
481
482            if !config::is_validating() {
483                // Create the runtime. We don't need to hold on
484                // to it here, as it will be kept alive in the
485                // runtimes map in that crate
486                let _runtime = kumo_server_runtime::Runtime::new(
487                    &params.name,
488                    |_| params.num_threads,
489                    &num_threads,
490                )
491                .map_err(any_err)?;
492            }
493
494            Ok(())
495        })?,
496    )?;
497
498    kumo_mod.set(
499        "validation_failed",
500        lua.create_function(|_, ()| {
501            config::set_validation_failed();
502            Ok(())
503        })?,
504    )?;
505
506    kumo_mod.set(
507        "enable_memory_callstack_tracking",
508        lua.create_function(|_, enable: bool| {
509            kumo_server_memory::set_tracking_callstacks(enable);
510            Ok(())
511        })?,
512    )?;
513
514    Ok(())
515}