1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod acct;
12pub mod authn_authz;
13pub mod config_handle;
14pub mod diagnostic_logging;
15pub mod disk_space;
16pub mod http_server;
17pub mod log;
18pub mod nodeid;
19pub mod panic;
20pub mod start;
21pub mod tls_helpers;
22
23pub fn register(lua: &Lua) -> anyhow::Result<()> {
24 for func in [
25 mod_redis::register,
26 data_loader::register,
27 mod_digest::register,
28 mod_encode::register,
29 mod_aws_sigv4::register,
30 cidr_map::register,
31 domain_map::register,
32 mod_amqp::register,
33 mod_filesystem::register,
34 mod_file_type::register,
35 mod_http::register,
36 mod_regex::register,
37 mod_serde::register,
38 mod_sqlite::register,
39 mod_crypto::register,
40 mod_smtp_response_normalize::register,
41 mod_string::register,
42 mod_time::register,
43 mod_dns_resolver::register,
44 mod_kafka::register,
45 mod_memoize::register,
46 mod_mimepart::register,
47 mod_mpsc::register,
48 mod_uuid::register,
49 kumo_api_types::shaping::register,
50 regex_set_map::register,
51 crate::authn_authz::register,
52 ] {
53 func(lua)?;
54 }
55
56 let kumo_mod = get_or_create_module(lua, "kumo")?;
57 kumo_mod.set("version", version_info::kumo_version())?;
58
59 fn event_registrar_name(name: &str) -> String {
60 format!("kumomta-event-registrars-{name}")
61 }
62
63 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
66 let decorated_name = event_registrar_name(name);
67 let mut call_stack = vec![];
68 for n in 1.. {
69 match lua.inspect_stack(n, |info| {
70 let source = info.source();
71 format!(
72 "{}:{}",
73 source
74 .short_src
75 .as_ref()
76 .map(|b| b.to_string())
77 .unwrap_or_else(String::new),
78 info.current_line().unwrap_or(0)
79 )
80 }) {
81 Some(info) => {
82 call_stack.push(info);
83 }
84 None => break,
85 }
86 }
87
88 let tbl: Value = lua.named_registry_value(&decorated_name)?;
89 match tbl {
90 Value::Nil => {
91 let tbl = lua.create_table()?;
92 tbl.set(1, call_stack)?;
93 lua.set_named_registry_value(&decorated_name, tbl)?;
94 Ok(())
95 }
96 Value::Table(tbl) => {
97 let len = tbl.raw_len();
98 tbl.set(len + 1, call_stack)?;
99 Ok(())
100 }
101 _ => Err(mlua::Error::external(format!(
102 "registry key for {decorated_name} has invalid type",
103 ))),
104 }
105 }
106
107 kumo_mod.set(
110 "get_event_registrars",
111 lua.create_function(move |lua, name: String| {
112 let decorated_name = event_registrar_name(&name);
113 let value: Value = lua.named_registry_value(&decorated_name)?;
114 Ok(value)
115 })?,
116 )?;
117
118 kumo_mod.set(
119 "on",
120 lua.create_function(move |lua, (name, func): (String, Function)| {
121 let decorated_name = decorate_callback_name(&name);
122
123 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
124 if current_event != "main" {
125 return Err(mlua::Error::external(format!(
126 "Attempting to register an event handler via \
127 `kumo.on('{name}', ...)` from within the event handler \
128 '{current_event}'. You must move your event handler registration \
129 so that it is setup directly when the policy is loaded \
130 in order for it to consistently trigger and handle events."
131 )));
132 }
133 }
134
135 register_event_caller(lua, &name)?;
136
137 if config::does_callback_allow_multiple(&name) {
138 let tbl: Value = lua.named_registry_value(&decorated_name)?;
139 return match tbl {
140 Value::Nil => {
141 let tbl = lua.create_table()?;
142 tbl.set(1, func)?;
143 lua.set_named_registry_value(&decorated_name, tbl)?;
144 Ok(())
145 }
146 Value::Table(tbl) => {
147 let len = tbl.raw_len();
148 tbl.set(len + 1, func)?;
149 Ok(())
150 }
151 _ => Err(mlua::Error::external(format!(
152 "registry key for {decorated_name} has invalid type",
153 ))),
154 };
155 }
156
157 let existing: Value = lua.named_registry_value(&decorated_name)?;
158 match existing {
159 Value::Nil => {}
160 Value::Function(func) => {
161 let info = func.info();
162 let src = info.source.unwrap_or_else(|| "?".into());
163 let line = info.line_defined.unwrap_or(0);
164 return Err(mlua::Error::external(format!(
165 "{name} event already has a handler defined at {src}:{line}"
166 )));
167 }
168 _ => {
169 return Err(mlua::Error::external(format!(
170 "{name} event already has a handler"
171 )));
172 }
173 }
174
175 lua.set_named_registry_value(&decorated_name, func)?;
176 Ok(())
177 })?,
178 )?;
179
180 kumo_mod.set(
181 "set_diagnostic_log_filter",
182 lua.create_function(move |_, filter: String| {
183 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
184 })?,
185 )?;
186
187 fn variadic_to_string(args: Variadic<Value>) -> String {
188 let mut output = String::new();
189 for (idx, item) in args.into_iter().enumerate() {
190 if idx > 0 {
191 output.push(' ');
192 }
193
194 match item {
195 Value::String(s) => match s.to_str() {
196 Ok(s) => output.push_str(&s),
197 Err(_) => {
198 let item = s.to_string_lossy();
199 output.push_str(&item);
200 }
201 },
202 item => match item.to_string() {
203 Ok(s) => output.push_str(&s),
204 Err(_) => output.push_str(&format!("{item:?}")),
205 },
206 }
207 }
208 output
209 }
210
211 fn get_caller(lua: &Lua) -> String {
212 match lua.inspect_stack(1, |info| {
213 let source = info.source();
214 let file_name = source
215 .short_src
216 .as_ref()
217 .map(|b| b.to_string())
218 .unwrap_or_else(String::new);
219 let file_name = match file_name.strip_prefix("[string \"") {
222 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
223 None => &file_name,
224 };
225
226 format!("{file_name}:{}", info.current_line().unwrap_or(0))
227 }) {
228 Some(info) => info,
229 None => "?".to_string(),
230 }
231 }
232
233 kumo_mod.set(
234 "log_error",
235 lua.create_function(move |lua, args: Variadic<Value>| {
236 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
237 let src = get_caller(lua);
238 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
239 }
240 Ok(())
241 })?,
242 )?;
243 kumo_mod.set(
244 "log_info",
245 lua.create_function(move |lua, args: Variadic<Value>| {
246 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
247 let src = get_caller(lua);
248 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
249 }
250 Ok(())
251 })?,
252 )?;
253 kumo_mod.set(
254 "log_warn",
255 lua.create_function(move |lua, args: Variadic<Value>| {
256 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
257 let src = get_caller(lua);
258 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
259 }
260 Ok(())
261 })?,
262 )?;
263 kumo_mod.set(
264 "log_debug",
265 lua.create_function(move |lua, args: Variadic<Value>| {
266 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
267 let src = get_caller(lua);
268 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
269 }
270 Ok(())
271 })?,
272 )?;
273
274 kumo_mod.set(
275 "set_max_spare_lua_contexts",
276 lua.create_function(move |_, limit: usize| {
277 config::set_max_spare(limit);
278 Ok(())
279 })?,
280 )?;
281
282 kumo_mod.set(
283 "set_max_lua_context_use_count",
284 lua.create_function(move |_, limit: usize| {
285 config::set_max_use(limit);
286 Ok(())
287 })?,
288 )?;
289
290 kumo_mod.set(
291 "set_max_lua_context_age",
292 lua.create_function(move |_, limit: usize| {
293 config::set_max_age(limit);
294 Ok(())
295 })?,
296 )?;
297
298 kumo_mod.set(
299 "set_lua_gc_on_put",
300 lua.create_function(move |_, enable: u8| {
301 config::set_gc_on_put(enable);
302 Ok(())
303 })?,
304 )?;
305
306 kumo_mod.set(
307 "set_lruttl_cache_capacity",
308 lua.create_function(move |_, (name, capacity): (String, usize)| {
309 if lruttl::set_cache_capacity(&name, capacity) {
310 Ok(())
311 } else {
312 Err(mlua::Error::external(format!(
313 "could not set capacity for cache {name} \
314 as that is not a pre-defined lruttl cache name"
315 )))
316 }
317 })?,
318 )?;
319
320 kumo_mod.set(
321 "set_config_monitor_globs",
322 lua.create_function(move |_, globs: Vec<String>| {
323 config::epoch::set_globs(globs).map_err(any_err)?;
324 Ok(())
325 })?,
326 )?;
327 kumo_mod.set(
328 "eval_config_monitor_globs",
329 lua.create_async_function(|_, _: ()| async move {
330 config::epoch::eval_globs().await.map_err(any_err)
331 })?,
332 )?;
333 kumo_mod.set(
334 "bump_config_epoch",
335 lua.create_function(move |_, _: ()| {
336 config::epoch::bump_current_epoch();
337 Ok(())
338 })?,
339 )?;
340
341 kumo_mod.set(
342 "available_parallelism",
343 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
344 )?;
345
346 kumo_mod.set(
347 "set_memory_hard_limit",
348 lua.create_function(move |_, limit: usize| {
349 kumo_server_memory::set_hard_limit(limit);
350 Ok(())
351 })?,
352 )?;
353
354 kumo_mod.set(
355 "set_memory_low_thresh",
356 lua.create_function(move |_, limit: usize| {
357 kumo_server_memory::set_low_memory_thresh(limit);
358 Ok(())
359 })?,
360 )?;
361
362 kumo_mod.set(
363 "set_memory_soft_limit",
364 lua.create_function(move |_, limit: usize| {
365 kumo_server_memory::set_soft_limit(limit);
366 Ok(())
367 })?,
368 )?;
369
370 kumo_mod.set(
371 "configure_redis_throttles",
372 lua.create_async_function(|lua, params: Value| async move {
373 let key: RedisConnKey = from_lua_value(&lua, params)?;
374 let conn = key.open().map_err(any_err)?;
375 conn.ping().await.map_err(any_err)?;
376 throttle::use_redis(conn).await.map_err(any_err)
377 })?,
378 )?;
379
380 kumo_mod.set(
381 "traceback",
382 lua.create_function(move |lua: &Lua, level: usize| {
383 #[derive(Debug, Serialize)]
384 struct Frame {
385 event: String,
386 name: Option<String>,
387 name_what: Option<String>,
388 source: Option<String>,
389 short_src: Option<String>,
390 line_defined: Option<usize>,
391 last_line_defined: Option<usize>,
392 what: &'static str,
393 curr_line: Option<usize>,
394 is_tail_call: bool,
395 }
396
397 let mut frames = vec![];
398 for n in level.. {
399 match lua.inspect_stack(n, |info| {
400 let source = info.source();
401 let names = info.names();
402 Frame {
403 curr_line: info.current_line(),
404 is_tail_call: info.is_tail_call(),
405 event: format!("{:?}", info.event()),
406 last_line_defined: source.last_line_defined,
407 line_defined: source.line_defined,
408 name: names.name.as_ref().map(|b| b.to_string()),
409 name_what: names.name_what.as_ref().map(|b| b.to_string()),
410 source: source.source.as_ref().map(|b| b.to_string()),
411 short_src: source.short_src.as_ref().map(|b| b.to_string()),
412 what: source.what,
413 }
414 }) {
415 Some(frame) => {
416 frames.push(frame);
417 }
418 None => break,
419 }
420 }
421
422 lua.to_value(&frames)
423 })?,
424 )?;
425
426 #[derive(Deserialize, Debug)]
429 struct TaskParams {
430 event_name: String,
431 args: Vec<serde_json::Value>,
432 }
433
434 impl TaskParams {
435 async fn run(&self) -> anyhow::Result<()> {
436 let mut config = load_config().await?;
437
438 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
439
440 config
441 .convert_args_and_call_callback(&sig, &self.args)
442 .await?;
443
444 config.put();
445
446 Ok(())
447 }
448 }
449
450 kumo_mod.set(
451 "spawn_task",
452 lua.create_function(|lua, params: Value| {
453 let params: TaskParams = lua.from_value(params)?;
454
455 if !config::is_validating() {
456 std::thread::Builder::new()
457 .name(format!("spawned-task-{}", params.event_name))
458 .spawn(move || {
459 let runtime = tokio::runtime::Builder::new_current_thread()
460 .enable_io()
461 .enable_time()
462 .on_thread_park(kumo_server_memory::purge_thread_cache)
463 .build()
464 .unwrap();
465 let event_name = params.event_name.clone();
466
467 let result = runtime.block_on(async move { params.run().await });
468 if let Err(err) = result {
469 tracing::error!("Error while dispatching {event_name}: {err:#}");
470 }
471 })?;
472 }
473
474 Ok(())
475 })?,
476 )?;
477
478 kumo_mod.set(
479 "spawn_thread_pool",
480 lua.create_function(|lua, params: Value| {
481 #[derive(Deserialize, Debug)]
482 struct ThreadPoolParams {
483 name: String,
484 num_threads: usize,
485 }
486
487 let params: ThreadPoolParams = lua.from_value(params)?;
488 let num_threads = AtomicUsize::new(params.num_threads);
489
490 if !config::is_validating() {
491 let _runtime = kumo_server_runtime::Runtime::new(
495 ¶ms.name,
496 |_| params.num_threads,
497 &num_threads,
498 )
499 .map_err(any_err)?;
500 }
501
502 Ok(())
503 })?,
504 )?;
505
506 kumo_mod.set(
507 "validation_failed",
508 lua.create_function(|_, ()| {
509 config::set_validation_failed();
510 Ok(())
511 })?,
512 )?;
513
514 kumo_mod.set(
515 "enable_memory_callstack_tracking",
516 lua.create_function(|_, enable: bool| {
517 kumo_server_memory::set_tracking_callstacks(enable);
518 Ok(())
519 })?,
520 )?;
521
522 kumo_mod.set(
526 "prometheus_metrics",
527 lua.create_async_function(|lua, ()| async move {
528 use tokio_stream::StreamExt;
529 let mut json_text = String::new();
530 let mut stream = kumo_prometheus::registry::Registry::stream_json();
531 while let Some(text) = stream.next().await {
532 json_text.push_str(&text);
533 }
534 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
535 lua.to_value_with(&value, serialize_options())
536 })?,
537 )?;
538
539 Ok(())
540}