1use bstr::ByteSlice;
2use config::{
3 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
4 serialize_options, CallbackSignature,
5};
6use kumo_server_runtime::available_parallelism;
7use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
8use mod_redis::RedisConnKey;
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::AtomicUsize;
11
12pub mod acct;
13pub mod authn_authz;
14pub mod config_handle;
15pub mod diagnostic_logging;
16pub mod disk_space;
17pub mod http_server;
18pub mod log;
19pub mod nodeid;
20pub mod panic;
21pub mod start;
22pub mod tls_helpers;
23
24pub fn register(lua: &Lua) -> anyhow::Result<()> {
25 for func in [
26 mod_redis::register,
27 data_loader::register,
28 mod_digest::register,
29 mod_encode::register,
30 mod_aws_sigv4::register,
31 cidr_map::register,
32 domain_map::register,
33 mod_amqp::register,
34 mod_filesystem::register,
35 mod_file_type::register,
36 mod_http::register,
37 mod_regex::register,
38 mod_serde::register,
39 mod_sqlite::register,
40 mod_crypto::register,
41 mod_smtp_response_normalize::register,
42 kumo_jsonl::lua::register,
43 mod_counter_series::register,
44 mod_string::register,
45 mod_time::register,
46 mod_dns_resolver::register,
47 mod_kafka::register,
48 mod_memoize::register,
49 mod_mimepart::register,
50 mod_mpsc::register,
51 mod_nats::register,
52 mod_uuid::register,
53 kumo_api_types::shaping::register,
54 regex_set_map::register,
55 crate::authn_authz::register,
56 ] {
57 func(lua)?;
58 }
59
60 let kumo_mod = get_or_create_module(lua, "kumo")?;
61 kumo_mod.set("version", version_info::kumo_version())?;
62
63 fn event_registrar_name(name: &str) -> String {
64 format!("kumomta-event-registrars-{name}")
65 }
66
67 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
70 let decorated_name = event_registrar_name(name);
71 let mut call_stack = vec![];
72 for n in 1.. {
73 match lua.inspect_stack(n, |info| {
74 let source = info.source();
75 format!(
76 "{}:{}",
77 source
78 .short_src
79 .as_ref()
80 .map(|b| b.to_string())
81 .unwrap_or_else(String::new),
82 info.current_line().unwrap_or(0)
83 )
84 }) {
85 Some(info) => {
86 call_stack.push(info);
87 }
88 None => break,
89 }
90 }
91
92 let tbl: Value = lua.named_registry_value(&decorated_name)?;
93 match tbl {
94 Value::Nil => {
95 let tbl = lua.create_table()?;
96 tbl.set(1, call_stack)?;
97 lua.set_named_registry_value(&decorated_name, tbl)?;
98 Ok(())
99 }
100 Value::Table(tbl) => {
101 let len = tbl.raw_len();
102 tbl.set(len + 1, call_stack)?;
103 Ok(())
104 }
105 _ => Err(mlua::Error::external(format!(
106 "registry key for {decorated_name} has invalid type",
107 ))),
108 }
109 }
110
111 kumo_mod.set(
114 "get_event_registrars",
115 lua.create_function(move |lua, name: String| {
116 let decorated_name = event_registrar_name(&name);
117 let value: Value = lua.named_registry_value(&decorated_name)?;
118 Ok(value)
119 })?,
120 )?;
121
122 kumo_mod.set(
123 "on",
124 lua.create_function(move |lua, (name, func): (String, Function)| {
125 let decorated_name = decorate_callback_name(&name);
126
127 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
128 if current_event != "main" {
129 return Err(mlua::Error::external(format!(
130 "Attempting to register an event handler via \
131 `kumo.on('{name}', ...)` from within the event handler \
132 '{current_event}'. You must move your event handler registration \
133 so that it is setup directly when the policy is loaded \
134 in order for it to consistently trigger and handle events."
135 )));
136 }
137 }
138
139 register_event_caller(lua, &name)?;
140
141 if config::does_callback_allow_multiple(&name) {
142 let tbl: Value = lua.named_registry_value(&decorated_name)?;
143 return match tbl {
144 Value::Nil => {
145 let tbl = lua.create_table()?;
146 tbl.set(1, func)?;
147 lua.set_named_registry_value(&decorated_name, tbl)?;
148 Ok(())
149 }
150 Value::Table(tbl) => {
151 let len = tbl.raw_len();
152 tbl.set(len + 1, func)?;
153 Ok(())
154 }
155 _ => Err(mlua::Error::external(format!(
156 "registry key for {decorated_name} has invalid type",
157 ))),
158 };
159 }
160
161 let existing: Value = lua.named_registry_value(&decorated_name)?;
162 match existing {
163 Value::Nil => {}
164 Value::Function(func) => {
165 let info = func.info();
166 let src = info.source.unwrap_or_else(|| "?".into());
167 let line = info.line_defined.unwrap_or(0);
168 return Err(mlua::Error::external(format!(
169 "{name} event already has a handler defined at {src}:{line}"
170 )));
171 }
172 _ => {
173 return Err(mlua::Error::external(format!(
174 "{name} event already has a handler"
175 )));
176 }
177 }
178
179 lua.set_named_registry_value(&decorated_name, func)?;
180 Ok(())
181 })?,
182 )?;
183
184 kumo_mod.set(
185 "set_diagnostic_log_filter",
186 lua.create_function(move |_, filter: String| {
187 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
188 })?,
189 )?;
190
191 fn variadic_to_string(args: Variadic<Value>) -> String {
192 let mut output = String::new();
193 for (idx, item) in args.into_iter().enumerate() {
194 if idx > 0 {
195 output.push(' ');
196 }
197
198 match item {
199 Value::String(s) => {
200 let bytes = s.as_bytes();
201 for (start, end, c) in bytes.char_indices() {
202 if c == std::char::REPLACEMENT_CHARACTER {
203 let c_slice = &bytes[start..end];
204 for &b in c_slice.iter() {
205 output.push_str(&format!("\\x{b:02X}"));
206 }
207 } else {
208 output.push(c);
209 }
210 }
211 }
212 item => match item.to_string() {
213 Ok(s) => output.push_str(&s),
214 Err(_) => output.push_str(&format!("{item:?}")),
215 },
216 }
217 }
218 output
219 }
220
221 fn get_caller(lua: &Lua) -> String {
222 match lua.inspect_stack(1, |info| {
223 let source = info.source();
224 let file_name = source
225 .short_src
226 .as_ref()
227 .map(|b| b.to_string())
228 .unwrap_or_else(String::new);
229 let file_name = match file_name.strip_prefix("[string \"") {
232 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
233 None => &file_name,
234 };
235
236 format!("{file_name}:{}", info.current_line().unwrap_or(0))
237 }) {
238 Some(info) => info,
239 None => "?".to_string(),
240 }
241 }
242
243 kumo_mod.set(
244 "log_error",
245 lua.create_function(move |lua, args: Variadic<Value>| {
246 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
247 let src = get_caller(lua);
248 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
249 }
250 Ok(())
251 })?,
252 )?;
253 kumo_mod.set(
254 "log_info",
255 lua.create_function(move |lua, args: Variadic<Value>| {
256 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
257 let src = get_caller(lua);
258 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
259 }
260 Ok(())
261 })?,
262 )?;
263 kumo_mod.set(
264 "log_warn",
265 lua.create_function(move |lua, args: Variadic<Value>| {
266 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
267 let src = get_caller(lua);
268 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
269 }
270 Ok(())
271 })?,
272 )?;
273 kumo_mod.set(
274 "log_debug",
275 lua.create_function(move |lua, args: Variadic<Value>| {
276 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
277 let src = get_caller(lua);
278 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
279 }
280 Ok(())
281 })?,
282 )?;
283
284 kumo_mod.set(
285 "set_max_spare_lua_contexts",
286 lua.create_function(move |_, limit: usize| {
287 config::set_max_spare(limit);
288 Ok(())
289 })?,
290 )?;
291
292 kumo_mod.set(
293 "set_max_lua_context_use_count",
294 lua.create_function(move |_, limit: usize| {
295 config::set_max_use(limit);
296 Ok(())
297 })?,
298 )?;
299
300 kumo_mod.set(
301 "set_max_lua_context_age",
302 lua.create_function(move |_, limit: usize| {
303 config::set_max_age(limit);
304 Ok(())
305 })?,
306 )?;
307
308 kumo_mod.set(
309 "set_lua_gc_on_put",
310 lua.create_function(move |_, enable: u8| {
311 config::set_gc_on_put(enable);
312 Ok(())
313 })?,
314 )?;
315
316 kumo_mod.set(
317 "set_lruttl_cache_capacity",
318 lua.create_function(move |_, (name, capacity): (String, usize)| {
319 if lruttl::set_cache_capacity(&name, capacity) {
320 Ok(())
321 } else {
322 Err(mlua::Error::external(format!(
323 "could not set capacity for cache {name} \
324 as that is not a pre-defined lruttl cache name"
325 )))
326 }
327 })?,
328 )?;
329
330 kumo_mod.set(
331 "set_config_monitor_globs",
332 lua.create_function(move |_, globs: Vec<String>| {
333 config::epoch::set_globs(globs).map_err(any_err)?;
334 Ok(())
335 })?,
336 )?;
337 kumo_mod.set(
338 "eval_config_monitor_globs",
339 lua.create_async_function(|_, _: ()| async move {
340 config::epoch::eval_globs().await.map_err(any_err)
341 })?,
342 )?;
343 kumo_mod.set(
344 "bump_config_epoch",
345 lua.create_function(move |_, _: ()| {
346 config::epoch::bump_current_epoch();
347 Ok(())
348 })?,
349 )?;
350
351 kumo_mod.set(
352 "available_parallelism",
353 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
354 )?;
355
356 kumo_mod.set(
357 "set_memory_hard_limit",
358 lua.create_function(move |_, limit: usize| {
359 kumo_server_memory::set_hard_limit(limit);
360 Ok(())
361 })?,
362 )?;
363
364 kumo_mod.set(
365 "set_memory_low_thresh",
366 lua.create_function(move |_, limit: usize| {
367 kumo_server_memory::set_low_memory_thresh(limit);
368 Ok(())
369 })?,
370 )?;
371
372 kumo_mod.set(
373 "set_memory_soft_limit",
374 lua.create_function(move |_, limit: usize| {
375 kumo_server_memory::set_soft_limit(limit);
376 Ok(())
377 })?,
378 )?;
379
380 kumo_mod.set(
381 "get_memory_hard_limit",
382 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_hard_limit()))?,
383 )?;
384
385 kumo_mod.set(
386 "get_memory_soft_limit",
387 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_soft_limit()))?,
388 )?;
389
390 kumo_mod.set(
391 "get_memory_low_thresh",
392 lua.create_function(move |_, _: ()| Ok(kumo_server_memory::get_low_memory_thresh()))?,
393 )?;
394
395 kumo_mod.set(
396 "configure_redis_throttles",
397 lua.create_async_function(|lua, params: Value| async move {
398 let key: RedisConnKey = from_lua_value(&lua, params)?;
399 let conn = key.open().map_err(any_err)?;
400 conn.ping().await.map_err(any_err)?;
401 throttle::use_redis(conn).await.map_err(any_err)
402 })?,
403 )?;
404
405 kumo_mod.set(
406 "traceback",
407 lua.create_function(move |lua: &Lua, level: usize| {
408 #[derive(Debug, Serialize)]
409 struct Frame {
410 event: String,
411 name: Option<String>,
412 name_what: Option<String>,
413 source: Option<String>,
414 short_src: Option<String>,
415 line_defined: Option<usize>,
416 last_line_defined: Option<usize>,
417 what: &'static str,
418 curr_line: Option<usize>,
419 is_tail_call: bool,
420 }
421
422 let mut frames = vec![];
423 for n in level.. {
424 match lua.inspect_stack(n, |info| {
425 let source = info.source();
426 let names = info.names();
427 Frame {
428 curr_line: info.current_line(),
429 is_tail_call: info.is_tail_call(),
430 event: format!("{:?}", info.event()),
431 last_line_defined: source.last_line_defined,
432 line_defined: source.line_defined,
433 name: names.name.as_ref().map(|b| b.to_string()),
434 name_what: names.name_what.as_ref().map(|b| b.to_string()),
435 source: source.source.as_ref().map(|b| b.to_string()),
436 short_src: source.short_src.as_ref().map(|b| b.to_string()),
437 what: source.what,
438 }
439 }) {
440 Some(frame) => {
441 frames.push(frame);
442 }
443 None => break,
444 }
445 }
446
447 lua.to_value(&frames)
448 })?,
449 )?;
450
451 #[derive(Deserialize, Debug)]
454 struct TaskParams {
455 event_name: String,
456 args: Vec<serde_json::Value>,
457 }
458
459 impl TaskParams {
460 async fn run(&self) -> anyhow::Result<()> {
461 let mut config = load_config().await?;
462
463 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
464
465 config
466 .convert_args_and_call_callback(&sig, &self.args)
467 .await?;
468
469 config.put();
470
471 Ok(())
472 }
473 }
474
475 kumo_mod.set(
476 "spawn_task",
477 lua.create_function(|lua, params: Value| {
478 let params: TaskParams = lua.from_value(params)?;
479
480 if !config::is_validating() {
481 std::thread::Builder::new()
482 .name(format!("spawned-task-{}", params.event_name))
483 .spawn(move || {
484 let runtime = tokio::runtime::Builder::new_current_thread()
485 .enable_io()
486 .enable_time()
487 .on_thread_park(kumo_server_memory::purge_thread_cache)
488 .build()
489 .unwrap();
490 let event_name = params.event_name.clone();
491
492 let result = runtime.block_on(async move { params.run().await });
493 if let Err(err) = result {
494 tracing::error!("Error while dispatching {event_name}: {err:#}");
495 }
496 })?;
497 }
498
499 Ok(())
500 })?,
501 )?;
502
503 kumo_mod.set(
504 "spawn_thread_pool",
505 lua.create_function(|lua, params: Value| {
506 #[derive(Deserialize, Debug)]
507 struct ThreadPoolParams {
508 name: String,
509 num_threads: usize,
510 }
511
512 let params: ThreadPoolParams = lua.from_value(params)?;
513 let num_threads = AtomicUsize::new(params.num_threads);
514
515 if !config::is_validating() {
516 let _runtime = kumo_server_runtime::Runtime::new(
520 ¶ms.name,
521 |_| params.num_threads,
522 &num_threads,
523 )
524 .map_err(any_err)?;
525 }
526
527 Ok(())
528 })?,
529 )?;
530
531 kumo_mod.set(
532 "validation_failed",
533 lua.create_function(|_, ()| {
534 config::set_validation_failed();
535 Ok(())
536 })?,
537 )?;
538
539 kumo_mod.set(
540 "enable_memory_callstack_tracking",
541 lua.create_function(|_, enable: bool| {
542 kumo_server_memory::set_tracking_callstacks(enable);
543 Ok(())
544 })?,
545 )?;
546
547 kumo_mod.set(
551 "prometheus_metrics",
552 lua.create_async_function(|lua, ()| async move {
553 use tokio_stream::StreamExt;
554 let mut json_text = String::new();
555 let mut stream = kumo_prometheus::registry::Registry::stream_json();
556 while let Some(text) = stream.next().await {
557 json_text.push_str(&text);
558 }
559 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
560 lua.to_value_with(&value, serialize_options())
561 })?,
562 )?;
563
564 Ok(())
565}