1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod acct;
12pub mod authn_authz;
13pub mod config_handle;
14pub mod diagnostic_logging;
15pub mod disk_space;
16pub mod http_server;
17pub mod log;
18pub mod nodeid;
19pub mod panic;
20pub mod start;
21pub mod tls_helpers;
22
23pub fn register(lua: &Lua) -> anyhow::Result<()> {
24 for func in [
25 mod_redis::register,
26 data_loader::register,
27 mod_digest::register,
28 mod_encode::register,
29 mod_aws_sigv4::register,
30 cidr_map::register,
31 domain_map::register,
32 mod_amqp::register,
33 mod_filesystem::register,
34 mod_file_type::register,
35 mod_http::register,
36 mod_regex::register,
37 mod_serde::register,
38 mod_sqlite::register,
39 mod_crypto::register,
40 mod_smtp_response_normalize::register,
41 mod_string::register,
42 mod_time::register,
43 mod_dns_resolver::register,
44 mod_kafka::register,
45 mod_memoize::register,
46 mod_mimepart::register,
47 mod_mpsc::register,
48 mod_nats::register,
49 mod_uuid::register,
50 kumo_api_types::shaping::register,
51 regex_set_map::register,
52 crate::authn_authz::register,
53 ] {
54 func(lua)?;
55 }
56
57 let kumo_mod = get_or_create_module(lua, "kumo")?;
58 kumo_mod.set("version", version_info::kumo_version())?;
59
60 fn event_registrar_name(name: &str) -> String {
61 format!("kumomta-event-registrars-{name}")
62 }
63
64 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
67 let decorated_name = event_registrar_name(name);
68 let mut call_stack = vec![];
69 for n in 1.. {
70 match lua.inspect_stack(n, |info| {
71 let source = info.source();
72 format!(
73 "{}:{}",
74 source
75 .short_src
76 .as_ref()
77 .map(|b| b.to_string())
78 .unwrap_or_else(String::new),
79 info.current_line().unwrap_or(0)
80 )
81 }) {
82 Some(info) => {
83 call_stack.push(info);
84 }
85 None => break,
86 }
87 }
88
89 let tbl: Value = lua.named_registry_value(&decorated_name)?;
90 match tbl {
91 Value::Nil => {
92 let tbl = lua.create_table()?;
93 tbl.set(1, call_stack)?;
94 lua.set_named_registry_value(&decorated_name, tbl)?;
95 Ok(())
96 }
97 Value::Table(tbl) => {
98 let len = tbl.raw_len();
99 tbl.set(len + 1, call_stack)?;
100 Ok(())
101 }
102 _ => Err(mlua::Error::external(format!(
103 "registry key for {decorated_name} has invalid type",
104 ))),
105 }
106 }
107
108 kumo_mod.set(
111 "get_event_registrars",
112 lua.create_function(move |lua, name: String| {
113 let decorated_name = event_registrar_name(&name);
114 let value: Value = lua.named_registry_value(&decorated_name)?;
115 Ok(value)
116 })?,
117 )?;
118
119 kumo_mod.set(
120 "on",
121 lua.create_function(move |lua, (name, func): (String, Function)| {
122 let decorated_name = decorate_callback_name(&name);
123
124 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
125 if current_event != "main" {
126 return Err(mlua::Error::external(format!(
127 "Attempting to register an event handler via \
128 `kumo.on('{name}', ...)` from within the event handler \
129 '{current_event}'. You must move your event handler registration \
130 so that it is setup directly when the policy is loaded \
131 in order for it to consistently trigger and handle events."
132 )));
133 }
134 }
135
136 register_event_caller(lua, &name)?;
137
138 if config::does_callback_allow_multiple(&name) {
139 let tbl: Value = lua.named_registry_value(&decorated_name)?;
140 return match tbl {
141 Value::Nil => {
142 let tbl = lua.create_table()?;
143 tbl.set(1, func)?;
144 lua.set_named_registry_value(&decorated_name, tbl)?;
145 Ok(())
146 }
147 Value::Table(tbl) => {
148 let len = tbl.raw_len();
149 tbl.set(len + 1, func)?;
150 Ok(())
151 }
152 _ => Err(mlua::Error::external(format!(
153 "registry key for {decorated_name} has invalid type",
154 ))),
155 };
156 }
157
158 let existing: Value = lua.named_registry_value(&decorated_name)?;
159 match existing {
160 Value::Nil => {}
161 Value::Function(func) => {
162 let info = func.info();
163 let src = info.source.unwrap_or_else(|| "?".into());
164 let line = info.line_defined.unwrap_or(0);
165 return Err(mlua::Error::external(format!(
166 "{name} event already has a handler defined at {src}:{line}"
167 )));
168 }
169 _ => {
170 return Err(mlua::Error::external(format!(
171 "{name} event already has a handler"
172 )));
173 }
174 }
175
176 lua.set_named_registry_value(&decorated_name, func)?;
177 Ok(())
178 })?,
179 )?;
180
181 kumo_mod.set(
182 "set_diagnostic_log_filter",
183 lua.create_function(move |_, filter: String| {
184 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
185 })?,
186 )?;
187
188 fn variadic_to_string(args: Variadic<Value>) -> String {
189 let mut output = String::new();
190 for (idx, item) in args.into_iter().enumerate() {
191 if idx > 0 {
192 output.push(' ');
193 }
194
195 match item {
196 Value::String(s) => match s.to_str() {
197 Ok(s) => output.push_str(&s),
198 Err(_) => {
199 let item = s.to_string_lossy();
200 output.push_str(&item);
201 }
202 },
203 item => match item.to_string() {
204 Ok(s) => output.push_str(&s),
205 Err(_) => output.push_str(&format!("{item:?}")),
206 },
207 }
208 }
209 output
210 }
211
212 fn get_caller(lua: &Lua) -> String {
213 match lua.inspect_stack(1, |info| {
214 let source = info.source();
215 let file_name = source
216 .short_src
217 .as_ref()
218 .map(|b| b.to_string())
219 .unwrap_or_else(String::new);
220 let file_name = match file_name.strip_prefix("[string \"") {
223 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
224 None => &file_name,
225 };
226
227 format!("{file_name}:{}", info.current_line().unwrap_or(0))
228 }) {
229 Some(info) => info,
230 None => "?".to_string(),
231 }
232 }
233
234 kumo_mod.set(
235 "log_error",
236 lua.create_function(move |lua, args: Variadic<Value>| {
237 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
238 let src = get_caller(lua);
239 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
240 }
241 Ok(())
242 })?,
243 )?;
244 kumo_mod.set(
245 "log_info",
246 lua.create_function(move |lua, args: Variadic<Value>| {
247 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
248 let src = get_caller(lua);
249 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
250 }
251 Ok(())
252 })?,
253 )?;
254 kumo_mod.set(
255 "log_warn",
256 lua.create_function(move |lua, args: Variadic<Value>| {
257 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
258 let src = get_caller(lua);
259 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
260 }
261 Ok(())
262 })?,
263 )?;
264 kumo_mod.set(
265 "log_debug",
266 lua.create_function(move |lua, args: Variadic<Value>| {
267 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
268 let src = get_caller(lua);
269 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
270 }
271 Ok(())
272 })?,
273 )?;
274
275 kumo_mod.set(
276 "set_max_spare_lua_contexts",
277 lua.create_function(move |_, limit: usize| {
278 config::set_max_spare(limit);
279 Ok(())
280 })?,
281 )?;
282
283 kumo_mod.set(
284 "set_max_lua_context_use_count",
285 lua.create_function(move |_, limit: usize| {
286 config::set_max_use(limit);
287 Ok(())
288 })?,
289 )?;
290
291 kumo_mod.set(
292 "set_max_lua_context_age",
293 lua.create_function(move |_, limit: usize| {
294 config::set_max_age(limit);
295 Ok(())
296 })?,
297 )?;
298
299 kumo_mod.set(
300 "set_lua_gc_on_put",
301 lua.create_function(move |_, enable: u8| {
302 config::set_gc_on_put(enable);
303 Ok(())
304 })?,
305 )?;
306
307 kumo_mod.set(
308 "set_lruttl_cache_capacity",
309 lua.create_function(move |_, (name, capacity): (String, usize)| {
310 if lruttl::set_cache_capacity(&name, capacity) {
311 Ok(())
312 } else {
313 Err(mlua::Error::external(format!(
314 "could not set capacity for cache {name} \
315 as that is not a pre-defined lruttl cache name"
316 )))
317 }
318 })?,
319 )?;
320
321 kumo_mod.set(
322 "set_config_monitor_globs",
323 lua.create_function(move |_, globs: Vec<String>| {
324 config::epoch::set_globs(globs).map_err(any_err)?;
325 Ok(())
326 })?,
327 )?;
328 kumo_mod.set(
329 "eval_config_monitor_globs",
330 lua.create_async_function(|_, _: ()| async move {
331 config::epoch::eval_globs().await.map_err(any_err)
332 })?,
333 )?;
334 kumo_mod.set(
335 "bump_config_epoch",
336 lua.create_function(move |_, _: ()| {
337 config::epoch::bump_current_epoch();
338 Ok(())
339 })?,
340 )?;
341
342 kumo_mod.set(
343 "available_parallelism",
344 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
345 )?;
346
347 kumo_mod.set(
348 "set_memory_hard_limit",
349 lua.create_function(move |_, limit: usize| {
350 kumo_server_memory::set_hard_limit(limit);
351 Ok(())
352 })?,
353 )?;
354
355 kumo_mod.set(
356 "set_memory_low_thresh",
357 lua.create_function(move |_, limit: usize| {
358 kumo_server_memory::set_low_memory_thresh(limit);
359 Ok(())
360 })?,
361 )?;
362
363 kumo_mod.set(
364 "set_memory_soft_limit",
365 lua.create_function(move |_, limit: usize| {
366 kumo_server_memory::set_soft_limit(limit);
367 Ok(())
368 })?,
369 )?;
370
371 kumo_mod.set(
372 "get_memory_hard_limit",
373 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
374 )?;
375
376 kumo_mod.set(
377 "get_memory_soft_limit",
378 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
379 )?;
380
381 kumo_mod.set(
382 "get_memory_low_thresh",
383 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
384 )?;
385
386 kumo_mod.set(
387 "configure_redis_throttles",
388 lua.create_async_function(|lua, params: Value| async move {
389 let key: RedisConnKey = from_lua_value(&lua, params)?;
390 let conn = key.open().map_err(any_err)?;
391 conn.ping().await.map_err(any_err)?;
392 throttle::use_redis(conn).await.map_err(any_err)
393 })?,
394 )?;
395
396 kumo_mod.set(
397 "traceback",
398 lua.create_function(move |lua: &Lua, level: usize| {
399 #[derive(Debug, Serialize)]
400 struct Frame {
401 event: String,
402 name: Option<String>,
403 name_what: Option<String>,
404 source: Option<String>,
405 short_src: Option<String>,
406 line_defined: Option<usize>,
407 last_line_defined: Option<usize>,
408 what: &'static str,
409 curr_line: Option<usize>,
410 is_tail_call: bool,
411 }
412
413 let mut frames = vec![];
414 for n in level.. {
415 match lua.inspect_stack(n, |info| {
416 let source = info.source();
417 let names = info.names();
418 Frame {
419 curr_line: info.current_line(),
420 is_tail_call: info.is_tail_call(),
421 event: format!("{:?}", info.event()),
422 last_line_defined: source.last_line_defined,
423 line_defined: source.line_defined,
424 name: names.name.as_ref().map(|b| b.to_string()),
425 name_what: names.name_what.as_ref().map(|b| b.to_string()),
426 source: source.source.as_ref().map(|b| b.to_string()),
427 short_src: source.short_src.as_ref().map(|b| b.to_string()),
428 what: source.what,
429 }
430 }) {
431 Some(frame) => {
432 frames.push(frame);
433 }
434 None => break,
435 }
436 }
437
438 lua.to_value(&frames)
439 })?,
440 )?;
441
442 #[derive(Deserialize, Debug)]
445 struct TaskParams {
446 event_name: String,
447 args: Vec<serde_json::Value>,
448 }
449
450 impl TaskParams {
451 async fn run(&self) -> anyhow::Result<()> {
452 let mut config = load_config().await?;
453
454 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
455
456 config
457 .convert_args_and_call_callback(&sig, &self.args)
458 .await?;
459
460 config.put();
461
462 Ok(())
463 }
464 }
465
466 kumo_mod.set(
467 "spawn_task",
468 lua.create_function(|lua, params: Value| {
469 let params: TaskParams = lua.from_value(params)?;
470
471 if !config::is_validating() {
472 std::thread::Builder::new()
473 .name(format!("spawned-task-{}", params.event_name))
474 .spawn(move || {
475 let runtime = tokio::runtime::Builder::new_current_thread()
476 .enable_io()
477 .enable_time()
478 .on_thread_park(kumo_server_memory::purge_thread_cache)
479 .build()
480 .unwrap();
481 let event_name = params.event_name.clone();
482
483 let result = runtime.block_on(async move { params.run().await });
484 if let Err(err) = result {
485 tracing::error!("Error while dispatching {event_name}: {err:#}");
486 }
487 })?;
488 }
489
490 Ok(())
491 })?,
492 )?;
493
494 kumo_mod.set(
495 "spawn_thread_pool",
496 lua.create_function(|lua, params: Value| {
497 #[derive(Deserialize, Debug)]
498 struct ThreadPoolParams {
499 name: String,
500 num_threads: usize,
501 }
502
503 let params: ThreadPoolParams = lua.from_value(params)?;
504 let num_threads = AtomicUsize::new(params.num_threads);
505
506 if !config::is_validating() {
507 let _runtime = kumo_server_runtime::Runtime::new(
511 ¶ms.name,
512 |_| params.num_threads,
513 &num_threads,
514 )
515 .map_err(any_err)?;
516 }
517
518 Ok(())
519 })?,
520 )?;
521
522 kumo_mod.set(
523 "validation_failed",
524 lua.create_function(|_, ()| {
525 config::set_validation_failed();
526 Ok(())
527 })?,
528 )?;
529
530 kumo_mod.set(
531 "enable_memory_callstack_tracking",
532 lua.create_function(|_, enable: bool| {
533 kumo_server_memory::set_tracking_callstacks(enable);
534 Ok(())
535 })?,
536 )?;
537
538 kumo_mod.set(
542 "prometheus_metrics",
543 lua.create_async_function(|lua, ()| async move {
544 use tokio_stream::StreamExt;
545 let mut json_text = String::new();
546 let mut stream = kumo_prometheus::registry::Registry::stream_json();
547 while let Some(text) = stream.next().await {
548 json_text.push_str(&text);
549 }
550 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
551 lua.to_value_with(&value, serialize_options())
552 })?,
553 )?;
554
555 Ok(())
556}