kumo_server_common/
lib.rs

1use bstr::ByteSlice;
2use config::{
3    any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
4    serialize_options, CallbackSignature,
5};
6use kumo_server_runtime::available_parallelism;
7use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
8use mod_redis::RedisConnKey;
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::AtomicUsize;
11
12pub mod acct;
13pub mod authn_authz;
14pub mod config_handle;
15pub mod diagnostic_logging;
16pub mod disk_space;
17pub mod http_server;
18pub mod log;
19pub mod nodeid;
20pub mod panic;
21pub mod start;
22pub mod tls_helpers;
23
24pub fn register(lua: &Lua) -> anyhow::Result<()> {
25    for func in [
26        mod_redis::register,
27        data_loader::register,
28        mod_digest::register,
29        mod_encode::register,
30        mod_aws_sigv4::register,
31        cidr_map::register,
32        domain_map::register,
33        mod_amqp::register,
34        mod_filesystem::register,
35        mod_file_type::register,
36        mod_http::register,
37        mod_regex::register,
38        mod_serde::register,
39        mod_sqlite::register,
40        mod_crypto::register,
41        mod_smtp_response_normalize::register,
42        kumo_jsonl::lua::register,
43        mod_counter_series::register,
44        mod_string::register,
45        mod_time::register,
46        mod_dns_resolver::register,
47        mod_kafka::register,
48        mod_memoize::register,
49        mod_mimepart::register,
50        mod_mpsc::register,
51        mod_nats::register,
52        mod_uuid::register,
53        kumo_api_types::shaping::register,
54        regex_set_map::register,
55        crate::authn_authz::register,
56    ] {
57        func(lua)?;
58    }
59
60    let kumo_mod = get_or_create_module(lua, "kumo")?;
61    kumo_mod.set("version", version_info::kumo_version())?;
62
63    fn event_registrar_name(name: &str) -> String {
64        format!("kumomta-event-registrars-{name}")
65    }
66
67    // Record the call stack of the code calling kumo.on so that
68    // kumo.get_event_registrars can retrieve it later
69    fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
70        let decorated_name = event_registrar_name(name);
71        let mut call_stack = vec![];
72        for n in 1.. {
73            match lua.inspect_stack(n, |info| {
74                let source = info.source();
75                format!(
76                    "{}:{}",
77                    source
78                        .short_src
79                        .as_ref()
80                        .map(|b| b.to_string())
81                        .unwrap_or_else(String::new),
82                    info.current_line().unwrap_or(0)
83                )
84            }) {
85                Some(info) => {
86                    call_stack.push(info);
87                }
88                None => break,
89            }
90        }
91
92        let tbl: Value = lua.named_registry_value(&decorated_name)?;
93        match tbl {
94            Value::Nil => {
95                let tbl = lua.create_table()?;
96                tbl.set(1, call_stack)?;
97                lua.set_named_registry_value(&decorated_name, tbl)?;
98                Ok(())
99            }
100            Value::Table(tbl) => {
101                let len = tbl.raw_len();
102                tbl.set(len + 1, call_stack)?;
103                Ok(())
104            }
105            _ => Err(mlua::Error::external(format!(
106                "registry key for {decorated_name} has invalid type",
107            ))),
108        }
109    }
110
111    // Returns the list of call-stacks of the code that registered
112    // for a specific named event
113    kumo_mod.set(
114        "get_event_registrars",
115        lua.create_function(move |lua, name: String| {
116            let decorated_name = event_registrar_name(&name);
117            let value: Value = lua.named_registry_value(&decorated_name)?;
118            Ok(value)
119        })?,
120    )?;
121
122    kumo_mod.set(
123        "on",
124        lua.create_function(move |lua, (name, func): (String, Function)| {
125            let decorated_name = decorate_callback_name(&name);
126
127            if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
128                if current_event != "main" {
129                    return Err(mlua::Error::external(format!(
130                        "Attempting to register an event handler via \
131                    `kumo.on('{name}', ...)` from within the event handler \
132                    '{current_event}'. You must move your event handler registration \
133                    so that it is setup directly when the policy is loaded \
134                    in order for it to consistently trigger and handle events."
135                    )));
136                }
137            }
138
139            register_event_caller(lua, &name)?;
140
141            if config::does_callback_allow_multiple(&name) {
142                let tbl: Value = lua.named_registry_value(&decorated_name)?;
143                return match tbl {
144                    Value::Nil => {
145                        let tbl = lua.create_table()?;
146                        tbl.set(1, func)?;
147                        lua.set_named_registry_value(&decorated_name, tbl)?;
148                        Ok(())
149                    }
150                    Value::Table(tbl) => {
151                        let len = tbl.raw_len();
152                        tbl.set(len + 1, func)?;
153                        Ok(())
154                    }
155                    _ => Err(mlua::Error::external(format!(
156                        "registry key for {decorated_name} has invalid type",
157                    ))),
158                };
159            }
160
161            let existing: Value = lua.named_registry_value(&decorated_name)?;
162            match existing {
163                Value::Nil => {}
164                Value::Function(func) => {
165                    let info = func.info();
166                    let src = info.source.unwrap_or_else(|| "?".into());
167                    let line = info.line_defined.unwrap_or(0);
168                    return Err(mlua::Error::external(format!(
169                        "{name} event already has a handler defined at {src}:{line}"
170                    )));
171                }
172                _ => {
173                    return Err(mlua::Error::external(format!(
174                        "{name} event already has a handler"
175                    )));
176                }
177            }
178
179            lua.set_named_registry_value(&decorated_name, func)?;
180            Ok(())
181        })?,
182    )?;
183
184    kumo_mod.set(
185        "set_diagnostic_log_filter",
186        lua.create_function(move |_, filter: String| {
187            diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
188        })?,
189    )?;
190
191    fn variadic_to_string(args: Variadic<Value>) -> String {
192        let mut output = String::new();
193        for (idx, item) in args.into_iter().enumerate() {
194            if idx > 0 {
195                output.push(' ');
196            }
197
198            match item {
199                Value::String(s) => {
200                    let bytes = s.as_bytes();
201                    for (start, end, c) in bytes.char_indices() {
202                        if c == std::char::REPLACEMENT_CHARACTER {
203                            let c_slice = &bytes[start..end];
204                            for &b in c_slice.iter() {
205                                output.push_str(&format!("\\x{b:02X}"));
206                            }
207                        } else {
208                            output.push(c);
209                        }
210                    }
211                }
212                item => match item.to_string() {
213                    Ok(s) => output.push_str(&s),
214                    Err(_) => output.push_str(&format!("{item:?}")),
215                },
216            }
217        }
218        output
219    }
220
221    fn get_caller(lua: &Lua) -> String {
222        match lua.inspect_stack(1, |info| {
223            let source = info.source();
224            let file_name = source
225                .short_src
226                .as_ref()
227                .map(|b| b.to_string())
228                .unwrap_or_else(String::new);
229            // Lua returns the somewhat obnoxious `[string "source.lua"]`
230            // Let's fix that up to be a bit nicer
231            let file_name = match file_name.strip_prefix("[string \"") {
232                Some(name) => name.strip_suffix("\"]").unwrap_or(name),
233                None => &file_name,
234            };
235
236            format!("{file_name}:{}", info.current_line().unwrap_or(0))
237        }) {
238            Some(info) => info,
239            None => "?".to_string(),
240        }
241    }
242
243    kumo_mod.set(
244        "log_error",
245        lua.create_function(move |lua, args: Variadic<Value>| {
246            if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
247                let src = get_caller(lua);
248                tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
249            }
250            Ok(())
251        })?,
252    )?;
253    kumo_mod.set(
254        "log_info",
255        lua.create_function(move |lua, args: Variadic<Value>| {
256            if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
257                let src = get_caller(lua);
258                tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
259            }
260            Ok(())
261        })?,
262    )?;
263    kumo_mod.set(
264        "log_warn",
265        lua.create_function(move |lua, args: Variadic<Value>| {
266            if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
267                let src = get_caller(lua);
268                tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
269            }
270            Ok(())
271        })?,
272    )?;
273    kumo_mod.set(
274        "log_debug",
275        lua.create_function(move |lua, args: Variadic<Value>| {
276            if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
277                let src = get_caller(lua);
278                tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
279            }
280            Ok(())
281        })?,
282    )?;
283
284    kumo_mod.set(
285        "set_max_spare_lua_contexts",
286        lua.create_function(move |_, limit: usize| {
287            config::set_max_spare(limit);
288            Ok(())
289        })?,
290    )?;
291
292    kumo_mod.set(
293        "set_max_lua_context_use_count",
294        lua.create_function(move |_, limit: usize| {
295            config::set_max_use(limit);
296            Ok(())
297        })?,
298    )?;
299
300    kumo_mod.set(
301        "set_max_lua_context_age",
302        lua.create_function(move |_, limit: usize| {
303            config::set_max_age(limit);
304            Ok(())
305        })?,
306    )?;
307
308    kumo_mod.set(
309        "set_lua_gc_on_put",
310        lua.create_function(move |_, enable: u8| {
311            config::set_gc_on_put(enable);
312            Ok(())
313        })?,
314    )?;
315
316    kumo_mod.set(
317        "set_lruttl_cache_capacity",
318        lua.create_function(move |_, (name, capacity): (String, usize)| {
319            if lruttl::set_cache_capacity(&name, capacity) {
320                Ok(())
321            } else {
322                Err(mlua::Error::external(format!(
323                    "could not set capacity for cache {name} \
324                    as that is not a pre-defined lruttl cache name"
325                )))
326            }
327        })?,
328    )?;
329
330    kumo_mod.set(
331        "set_config_monitor_globs",
332        lua.create_function(move |_, globs: Vec<String>| {
333            config::epoch::set_globs(globs).map_err(any_err)?;
334            Ok(())
335        })?,
336    )?;
337    kumo_mod.set(
338        "eval_config_monitor_globs",
339        lua.create_async_function(|_, _: ()| async move {
340            config::epoch::eval_globs().await.map_err(any_err)
341        })?,
342    )?;
343    kumo_mod.set(
344        "bump_config_epoch",
345        lua.create_function(move |_, _: ()| {
346            config::epoch::bump_current_epoch();
347            Ok(())
348        })?,
349    )?;
350
351    kumo_mod.set(
352        "available_parallelism",
353        lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
354    )?;
355
356    kumo_mod.set(
357        "set_memory_hard_limit",
358        lua.create_function(move |_, limit: usize| {
359            kumo_server_memory::set_hard_limit(limit);
360            Ok(())
361        })?,
362    )?;
363
364    kumo_mod.set(
365        "set_memory_low_thresh",
366        lua.create_function(move |_, limit: usize| {
367            kumo_server_memory::set_low_memory_thresh(limit);
368            Ok(())
369        })?,
370    )?;
371
372    kumo_mod.set(
373        "set_memory_soft_limit",
374        lua.create_function(move |_, limit: usize| {
375            kumo_server_memory::set_soft_limit(limit);
376            Ok(())
377        })?,
378    )?;
379
380    kumo_mod.set(
381        "get_memory_hard_limit",
382        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
383    )?;
384
385    kumo_mod.set(
386        "get_memory_soft_limit",
387        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
388    )?;
389
390    kumo_mod.set(
391        "get_memory_low_thresh",
392        lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
393    )?;
394
395    kumo_mod.set(
396        "configure_redis_throttles",
397        lua.create_async_function(|lua, params: Value| async move {
398            let key: RedisConnKey = from_lua_value(&lua, params)?;
399            let conn = key.open().map_err(any_err)?;
400            conn.ping().await.map_err(any_err)?;
401            throttle::use_redis(conn).await.map_err(any_err)
402        })?,
403    )?;
404
405    kumo_mod.set(
406        "traceback",
407        lua.create_function(move |lua: &Lua, level: usize| {
408            #[derive(Debug, Serialize)]
409            struct Frame {
410                event: String,
411                name: Option<String>,
412                name_what: Option<String>,
413                source: Option<String>,
414                short_src: Option<String>,
415                line_defined: Option<usize>,
416                last_line_defined: Option<usize>,
417                what: &'static str,
418                curr_line: Option<usize>,
419                is_tail_call: bool,
420            }
421
422            let mut frames = vec![];
423            for n in level.. {
424                match lua.inspect_stack(n, |info| {
425                    let source = info.source();
426                    let names = info.names();
427                    Frame {
428                        curr_line: info.current_line(),
429                        is_tail_call: info.is_tail_call(),
430                        event: format!("{:?}", info.event()),
431                        last_line_defined: source.last_line_defined,
432                        line_defined: source.line_defined,
433                        name: names.name.as_ref().map(|b| b.to_string()),
434                        name_what: names.name_what.as_ref().map(|b| b.to_string()),
435                        source: source.source.as_ref().map(|b| b.to_string()),
436                        short_src: source.short_src.as_ref().map(|b| b.to_string()),
437                        what: source.what,
438                    }
439                }) {
440                    Some(frame) => {
441                        frames.push(frame);
442                    }
443                    None => break,
444                }
445            }
446
447            lua.to_value(&frames)
448        })?,
449    )?;
450
451    // TODO: options like restarting on error, delay between
452    // restarts and so on
453    #[derive(Deserialize, Debug)]
454    struct TaskParams {
455        event_name: String,
456        args: Vec<serde_json::Value>,
457    }
458
459    impl TaskParams {
460        async fn run(&self) -> anyhow::Result<()> {
461            let mut config = load_config().await?;
462
463            let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
464
465            config
466                .convert_args_and_call_callback(&sig, &self.args)
467                .await?;
468
469            config.put();
470
471            Ok(())
472        }
473    }
474
475    kumo_mod.set(
476        "spawn_task",
477        lua.create_function(|lua, params: Value| {
478            let params: TaskParams = lua.from_value(params)?;
479
480            if !config::is_validating() {
481                std::thread::Builder::new()
482                    .name(format!("spawned-task-{}", params.event_name))
483                    .spawn(move || {
484                        let runtime = tokio::runtime::Builder::new_current_thread()
485                            .enable_io()
486                            .enable_time()
487                            .on_thread_park(kumo_server_memory::purge_thread_cache)
488                            .build()
489                            .unwrap();
490                        let event_name = params.event_name.clone();
491
492                        let result = runtime.block_on(async move { params.run().await });
493                        if let Err(err) = result {
494                            tracing::error!("Error while dispatching {event_name}: {err:#}");
495                        }
496                    })?;
497            }
498
499            Ok(())
500        })?,
501    )?;
502
503    kumo_mod.set(
504        "spawn_thread_pool",
505        lua.create_function(|lua, params: Value| {
506            #[derive(Deserialize, Debug)]
507            struct ThreadPoolParams {
508                name: String,
509                num_threads: usize,
510            }
511
512            let params: ThreadPoolParams = lua.from_value(params)?;
513            let num_threads = AtomicUsize::new(params.num_threads);
514
515            if !config::is_validating() {
516                // Create the runtime. We don't need to hold on
517                // to it here, as it will be kept alive in the
518                // runtimes map in that crate
519                let _runtime = kumo_server_runtime::Runtime::new(
520                    &params.name,
521                    |_| params.num_threads,
522                    &num_threads,
523                )
524                .map_err(any_err)?;
525            }
526
527            Ok(())
528        })?,
529    )?;
530
531    kumo_mod.set(
532        "validation_failed",
533        lua.create_function(|_, ()| {
534            config::set_validation_failed();
535            Ok(())
536        })?,
537    )?;
538
539    kumo_mod.set(
540        "enable_memory_callstack_tracking",
541        lua.create_function(|_, enable: bool| {
542            kumo_server_memory::set_tracking_callstacks(enable);
543            Ok(())
544        })?,
545    )?;
546
547    // This function is intended for debugging and testing purposes only.
548    // It is potentially very expensive on a production system with many
549    // thousands of queues.
550    kumo_mod.set(
551        "prometheus_metrics",
552        lua.create_async_function(|lua, ()| async move {
553            use tokio_stream::StreamExt;
554            let mut json_text = String::new();
555            let mut stream = kumo_prometheus::registry::Registry::stream_json();
556            while let Some(text) = stream.next().await {
557                json_text.push_str(&text);
558            }
559            let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
560            lua.to_value_with(&value, serialize_options())
561        })?,
562    )?;
563
564    Ok(())
565}