1use bstr::ByteSlice;
2use config::{
3 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
4 serialize_options, CallbackSignature,
5};
6use kumo_server_runtime::available_parallelism;
7use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
8use mod_redis::RedisConnKey;
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::AtomicUsize;
11
12pub mod acct;
13pub mod authn_authz;
14pub mod config_handle;
15pub mod diagnostic_logging;
16pub mod disk_space;
17pub mod hashable_weak;
18pub mod http_server;
19pub mod log;
20pub mod nodeid;
21pub mod panic;
22pub mod start;
23pub mod tls_helpers;
24
25pub fn register(lua: &Lua) -> anyhow::Result<()> {
26 for func in [
27 mod_redis::register,
28 data_loader::register,
29 mod_digest::register,
30 mod_encode::register,
31 mod_aws_sigv4::register,
32 cidr_map::register,
33 domain_map::register,
34 mod_amqp::register,
35 mod_filesystem::register,
36 mod_file_type::register,
37 mod_http::register,
38 mod_regex::register,
39 mod_serde::register,
40 mod_sqlite::register,
41 mod_crypto::register,
42 mod_smtp_response_normalize::register,
43 kumo_jsonl::lua::register,
44 mod_counter_series::register,
45 mod_string::register,
46 mod_time::register,
47 mod_dns_resolver::register,
48 mod_kafka::register,
49 mod_memoize::register,
50 mod_mimepart::register,
51 mod_mpsc::register,
52 mod_nats::register,
53 mod_uuid::register,
54 kumo_api_types::shaping::register,
55 regex_set_map::register,
56 crate::authn_authz::register,
57 ] {
58 func(lua)?;
59 }
60
61 let kumo_mod = get_or_create_module(lua, "kumo")?;
62 kumo_mod.set("version", version_info::kumo_version())?;
63
64 fn event_registrar_name(name: &str) -> String {
65 format!("kumomta-event-registrars-{name}")
66 }
67
68 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
71 let decorated_name = event_registrar_name(name);
72 let mut call_stack = vec![];
73 for n in 1.. {
74 match lua.inspect_stack(n, |info| {
75 let source = info.source();
76 format!(
77 "{}:{}",
78 source
79 .short_src
80 .as_ref()
81 .map(|b| b.to_string())
82 .unwrap_or_else(String::new),
83 info.current_line().unwrap_or(0)
84 )
85 }) {
86 Some(info) => {
87 call_stack.push(info);
88 }
89 None => break,
90 }
91 }
92
93 let tbl: Value = lua.named_registry_value(&decorated_name)?;
94 match tbl {
95 Value::Nil => {
96 let tbl = lua.create_table()?;
97 tbl.set(1, call_stack)?;
98 lua.set_named_registry_value(&decorated_name, tbl)?;
99 Ok(())
100 }
101 Value::Table(tbl) => {
102 let len = tbl.raw_len();
103 tbl.set(len + 1, call_stack)?;
104 Ok(())
105 }
106 _ => Err(mlua::Error::external(format!(
107 "registry key for {decorated_name} has invalid type",
108 ))),
109 }
110 }
111
112 kumo_mod.set(
115 "get_event_registrars",
116 lua.create_function(move |lua, name: String| {
117 let decorated_name = event_registrar_name(&name);
118 let value: Value = lua.named_registry_value(&decorated_name)?;
119 Ok(value)
120 })?,
121 )?;
122
123 kumo_mod.set(
124 "on",
125 lua.create_function(move |lua, (name, func): (String, Function)| {
126 let decorated_name = decorate_callback_name(&name);
127
128 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
129 if current_event != "main" {
130 return Err(mlua::Error::external(format!(
131 "Attempting to register an event handler via \
132 `kumo.on('{name}', ...)` from within the event handler \
133 '{current_event}'. You must move your event handler registration \
134 so that it is setup directly when the policy is loaded \
135 in order for it to consistently trigger and handle events."
136 )));
137 }
138 }
139
140 register_event_caller(lua, &name)?;
141
142 if config::does_callback_allow_multiple(&name) {
143 let tbl: Value = lua.named_registry_value(&decorated_name)?;
144 return match tbl {
145 Value::Nil => {
146 let tbl = lua.create_table()?;
147 tbl.set(1, func)?;
148 lua.set_named_registry_value(&decorated_name, tbl)?;
149 Ok(())
150 }
151 Value::Table(tbl) => {
152 let len = tbl.raw_len();
153 tbl.set(len + 1, func)?;
154 Ok(())
155 }
156 _ => Err(mlua::Error::external(format!(
157 "registry key for {decorated_name} has invalid type",
158 ))),
159 };
160 }
161
162 let existing: Value = lua.named_registry_value(&decorated_name)?;
163 match existing {
164 Value::Nil => {}
165 Value::Function(func) => {
166 let info = func.info();
167 let src = info.source.unwrap_or_else(|| "?".into());
168 let line = info.line_defined.unwrap_or(0);
169 return Err(mlua::Error::external(format!(
170 "{name} event already has a handler defined at {src}:{line}"
171 )));
172 }
173 _ => {
174 return Err(mlua::Error::external(format!(
175 "{name} event already has a handler"
176 )));
177 }
178 }
179
180 lua.set_named_registry_value(&decorated_name, func)?;
181 Ok(())
182 })?,
183 )?;
184
185 kumo_mod.set(
186 "set_diagnostic_log_filter",
187 lua.create_function(move |_, filter: String| {
188 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
189 })?,
190 )?;
191
192 fn variadic_to_string(args: Variadic<Value>) -> String {
193 let mut output = String::new();
194 for (idx, item) in args.into_iter().enumerate() {
195 if idx > 0 {
196 output.push(' ');
197 }
198
199 match item {
200 Value::String(s) => {
201 let bytes = s.as_bytes();
202 for (start, end, c) in bytes.char_indices() {
203 if c == std::char::REPLACEMENT_CHARACTER {
204 let c_slice = &bytes[start..end];
205 for &b in c_slice.iter() {
206 output.push_str(&format!("\\x{b:02X}"));
207 }
208 } else {
209 output.push(c);
210 }
211 }
212 }
213 item => match item.to_string() {
214 Ok(s) => output.push_str(&s),
215 Err(_) => output.push_str(&format!("{item:?}")),
216 },
217 }
218 }
219 output
220 }
221
222 fn get_caller(lua: &Lua) -> String {
223 match lua.inspect_stack(1, |info| {
224 let source = info.source();
225 let file_name = source
226 .short_src
227 .as_ref()
228 .map(|b| b.to_string())
229 .unwrap_or_else(String::new);
230 let file_name = match file_name.strip_prefix("[string \"") {
233 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
234 None => &file_name,
235 };
236
237 format!("{file_name}:{}", info.current_line().unwrap_or(0))
238 }) {
239 Some(info) => info,
240 None => "?".to_string(),
241 }
242 }
243
244 kumo_mod.set(
245 "log_error",
246 lua.create_function(move |lua, args: Variadic<Value>| {
247 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
248 let src = get_caller(lua);
249 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
250 }
251 Ok(())
252 })?,
253 )?;
254 kumo_mod.set(
255 "log_info",
256 lua.create_function(move |lua, args: Variadic<Value>| {
257 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
258 let src = get_caller(lua);
259 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
260 }
261 Ok(())
262 })?,
263 )?;
264 kumo_mod.set(
265 "log_warn",
266 lua.create_function(move |lua, args: Variadic<Value>| {
267 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
268 let src = get_caller(lua);
269 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
270 }
271 Ok(())
272 })?,
273 )?;
274 kumo_mod.set(
275 "log_debug",
276 lua.create_function(move |lua, args: Variadic<Value>| {
277 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
278 let src = get_caller(lua);
279 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
280 }
281 Ok(())
282 })?,
283 )?;
284
285 kumo_mod.set(
286 "set_max_spare_lua_contexts",
287 lua.create_function(move |_, limit: usize| {
288 config::set_max_spare(limit);
289 Ok(())
290 })?,
291 )?;
292
293 kumo_mod.set(
294 "set_max_lua_context_use_count",
295 lua.create_function(move |_, limit: usize| {
296 config::set_max_use(limit);
297 Ok(())
298 })?,
299 )?;
300
301 kumo_mod.set(
302 "set_max_lua_context_age",
303 lua.create_function(move |_, limit: usize| {
304 config::set_max_age(limit);
305 Ok(())
306 })?,
307 )?;
308
309 kumo_mod.set(
310 "set_lua_gc_on_put",
311 lua.create_function(move |_, enable: u8| {
312 config::set_gc_on_put(enable);
313 Ok(())
314 })?,
315 )?;
316
317 kumo_mod.set(
318 "set_lruttl_cache_capacity",
319 lua.create_function(move |_, (name, capacity): (String, usize)| {
320 if lruttl::set_cache_capacity(&name, capacity) {
321 Ok(())
322 } else {
323 Err(mlua::Error::external(format!(
324 "could not set capacity for cache {name} \
325 as that is not a pre-defined lruttl cache name"
326 )))
327 }
328 })?,
329 )?;
330
331 kumo_mod.set(
332 "set_config_monitor_globs",
333 lua.create_function(move |_, globs: Vec<String>| {
334 config::epoch::set_globs(globs).map_err(any_err)?;
335 Ok(())
336 })?,
337 )?;
338 kumo_mod.set(
339 "eval_config_monitor_globs",
340 lua.create_async_function(|_, _: ()| async move {
341 config::epoch::eval_globs().await.map_err(any_err)
342 })?,
343 )?;
344 kumo_mod.set(
345 "bump_config_epoch",
346 lua.create_function(move |_, _: ()| {
347 config::epoch::bump_current_epoch();
348 Ok(())
349 })?,
350 )?;
351
352 kumo_mod.set(
353 "available_parallelism",
354 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
355 )?;
356
357 kumo_mod.set(
358 "set_memory_hard_limit",
359 lua.create_function(move |_, limit: usize| {
360 kumo_server_memory::set_hard_limit(limit);
361 Ok(())
362 })?,
363 )?;
364
365 kumo_mod.set(
366 "set_memory_low_thresh",
367 lua.create_function(move |_, limit: usize| {
368 kumo_server_memory::set_low_memory_thresh(limit);
369 Ok(())
370 })?,
371 )?;
372
373 kumo_mod.set(
374 "set_memory_soft_limit",
375 lua.create_function(move |_, limit: usize| {
376 kumo_server_memory::set_soft_limit(limit);
377 Ok(())
378 })?,
379 )?;
380
381 kumo_mod.set(
382 "get_memory_hard_limit",
383 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
384 )?;
385
386 kumo_mod.set(
387 "get_memory_soft_limit",
388 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
389 )?;
390
391 kumo_mod.set(
392 "get_memory_low_thresh",
393 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
394 )?;
395
396 kumo_mod.set(
397 "configure_redis_throttles",
398 lua.create_async_function(|lua, params: Value| async move {
399 let key: RedisConnKey = from_lua_value(&lua, params)?;
400 let conn = key.open().map_err(any_err)?;
401 conn.ping().await.map_err(any_err)?;
402 throttle::use_redis(conn).await.map_err(any_err)
403 })?,
404 )?;
405
406 kumo_mod.set(
407 "traceback",
408 lua.create_function(move |lua: &Lua, level: usize| {
409 #[derive(Debug, Serialize)]
410 struct Frame {
411 event: String,
412 name: Option<String>,
413 name_what: Option<String>,
414 source: Option<String>,
415 short_src: Option<String>,
416 line_defined: Option<usize>,
417 last_line_defined: Option<usize>,
418 what: &'static str,
419 curr_line: Option<usize>,
420 is_tail_call: bool,
421 }
422
423 let mut frames = vec![];
424 for n in level.. {
425 match lua.inspect_stack(n, |info| {
426 let source = info.source();
427 let names = info.names();
428 Frame {
429 curr_line: info.current_line(),
430 is_tail_call: info.is_tail_call(),
431 event: format!("{:?}", info.event()),
432 last_line_defined: source.last_line_defined,
433 line_defined: source.line_defined,
434 name: names.name.as_ref().map(|b| b.to_string()),
435 name_what: names.name_what.as_ref().map(|b| b.to_string()),
436 source: source.source.as_ref().map(|b| b.to_string()),
437 short_src: source.short_src.as_ref().map(|b| b.to_string()),
438 what: source.what,
439 }
440 }) {
441 Some(frame) => {
442 frames.push(frame);
443 }
444 None => break,
445 }
446 }
447
448 lua.to_value(&frames)
449 })?,
450 )?;
451
452 #[derive(Deserialize, Debug)]
455 struct TaskParams {
456 event_name: String,
457 args: Vec<serde_json::Value>,
458 }
459
460 impl TaskParams {
461 async fn run(&self) -> anyhow::Result<()> {
462 let mut config = load_config().await?;
463
464 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
465
466 config
467 .convert_args_and_call_callback(&sig, &self.args)
468 .await?;
469
470 config.put();
471
472 Ok(())
473 }
474 }
475
476 kumo_mod.set(
477 "spawn_task",
478 lua.create_function(|lua, params: Value| {
479 let params: TaskParams = lua.from_value(params)?;
480
481 if !config::is_validating() {
482 std::thread::Builder::new()
483 .name(format!("spawned-task-{}", params.event_name))
484 .spawn(move || {
485 let runtime = tokio::runtime::Builder::new_current_thread()
486 .enable_io()
487 .enable_time()
488 .on_thread_park(kumo_server_memory::purge_thread_cache)
489 .build()
490 .unwrap();
491 let event_name = params.event_name.clone();
492
493 let result = runtime.block_on(async move { params.run().await });
494 if let Err(err) = result {
495 tracing::error!("Error while dispatching {event_name}: {err:#}");
496 }
497 })?;
498 }
499
500 Ok(())
501 })?,
502 )?;
503
504 kumo_mod.set(
505 "spawn_thread_pool",
506 lua.create_function(|lua, params: Value| {
507 #[derive(Deserialize, Debug)]
508 struct ThreadPoolParams {
509 name: String,
510 num_threads: usize,
511 }
512
513 let params: ThreadPoolParams = lua.from_value(params)?;
514 let num_threads = AtomicUsize::new(params.num_threads);
515
516 if !config::is_validating() {
517 let _runtime = kumo_server_runtime::Runtime::new(
521 ¶ms.name,
522 |_| params.num_threads,
523 &num_threads,
524 )
525 .map_err(any_err)?;
526 }
527
528 Ok(())
529 })?,
530 )?;
531
532 kumo_mod.set(
533 "validation_failed",
534 lua.create_function(|_, ()| {
535 config::set_validation_failed();
536 Ok(())
537 })?,
538 )?;
539
540 kumo_mod.set(
541 "enable_memory_callstack_tracking",
542 lua.create_function(|_, enable: bool| {
543 kumo_server_memory::set_tracking_callstacks(enable);
544 Ok(())
545 })?,
546 )?;
547
548 kumo_mod.set(
552 "prometheus_metrics",
553 lua.create_async_function(|lua, ()| async move {
554 use tokio_stream::StreamExt;
555 let mut json_text = String::new();
556 let mut stream = kumo_prometheus::registry::Registry::stream_json();
557 while let Some(text) = stream.next().await {
558 json_text.push_str(&text);
559 }
560 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
561 lua.to_value_with(&value, serialize_options())
562 })?,
563 )?;
564
565 Ok(())
566}