1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 mod_aws_sigv4::register,
27 cidr_map::register,
28 domain_map::register,
29 mod_amqp::register,
30 mod_filesystem::register,
31 mod_file_type::register,
32 mod_http::register,
33 mod_regex::register,
34 mod_serde::register,
35 mod_sqlite::register,
36 mod_crypto::register,
37 mod_smtp_response_normalize::register,
38 mod_string::register,
39 mod_time::register,
40 mod_dns_resolver::register,
41 mod_kafka::register,
42 mod_memoize::register,
43 mod_mimepart::register,
44 mod_mpsc::register,
45 mod_uuid::register,
46 kumo_api_types::shaping::register,
47 regex_set_map::register,
48 ] {
49 func(lua)?;
50 }
51
52 let kumo_mod = get_or_create_module(lua, "kumo")?;
53 kumo_mod.set("version", version_info::kumo_version())?;
54
55 fn event_registrar_name(name: &str) -> String {
56 format!("kumomta-event-registrars-{name}")
57 }
58
59 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
62 let decorated_name = event_registrar_name(name);
63 let mut call_stack = vec![];
64 for n in 1.. {
65 match lua.inspect_stack(n, |info| {
66 let source = info.source();
67 format!(
68 "{}:{}",
69 source
70 .short_src
71 .as_ref()
72 .map(|b| b.to_string())
73 .unwrap_or_else(String::new),
74 info.current_line().unwrap_or(0)
75 )
76 }) {
77 Some(info) => {
78 call_stack.push(info);
79 }
80 None => break,
81 }
82 }
83
84 let tbl: Value = lua.named_registry_value(&decorated_name)?;
85 match tbl {
86 Value::Nil => {
87 let tbl = lua.create_table()?;
88 tbl.set(1, call_stack)?;
89 lua.set_named_registry_value(&decorated_name, tbl)?;
90 Ok(())
91 }
92 Value::Table(tbl) => {
93 let len = tbl.raw_len();
94 tbl.set(len + 1, call_stack)?;
95 Ok(())
96 }
97 _ => Err(mlua::Error::external(format!(
98 "registry key for {decorated_name} has invalid type",
99 ))),
100 }
101 }
102
103 kumo_mod.set(
106 "get_event_registrars",
107 lua.create_function(move |lua, name: String| {
108 let decorated_name = event_registrar_name(&name);
109 let value: Value = lua.named_registry_value(&decorated_name)?;
110 Ok(value)
111 })?,
112 )?;
113
114 kumo_mod.set(
115 "on",
116 lua.create_function(move |lua, (name, func): (String, Function)| {
117 let decorated_name = decorate_callback_name(&name);
118
119 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
120 if current_event != "main" {
121 return Err(mlua::Error::external(format!(
122 "Attempting to register an event handler via \
123 `kumo.on('{name}', ...)` from within the event handler \
124 '{current_event}'. You must move your event handler registration \
125 so that it is setup directly when the policy is loaded \
126 in order for it to consistently trigger and handle events."
127 )));
128 }
129 }
130
131 register_event_caller(lua, &name)?;
132
133 if config::does_callback_allow_multiple(&name) {
134 let tbl: Value = lua.named_registry_value(&decorated_name)?;
135 return match tbl {
136 Value::Nil => {
137 let tbl = lua.create_table()?;
138 tbl.set(1, func)?;
139 lua.set_named_registry_value(&decorated_name, tbl)?;
140 Ok(())
141 }
142 Value::Table(tbl) => {
143 let len = tbl.raw_len();
144 tbl.set(len + 1, func)?;
145 Ok(())
146 }
147 _ => Err(mlua::Error::external(format!(
148 "registry key for {decorated_name} has invalid type",
149 ))),
150 };
151 }
152
153 let existing: Value = lua.named_registry_value(&decorated_name)?;
154 match existing {
155 Value::Nil => {}
156 Value::Function(func) => {
157 let info = func.info();
158 let src = info.source.unwrap_or_else(|| "?".into());
159 let line = info.line_defined.unwrap_or(0);
160 return Err(mlua::Error::external(format!(
161 "{name} event already has a handler defined at {src}:{line}"
162 )));
163 }
164 _ => {
165 return Err(mlua::Error::external(format!(
166 "{name} event already has a handler"
167 )));
168 }
169 }
170
171 lua.set_named_registry_value(&decorated_name, func)?;
172 Ok(())
173 })?,
174 )?;
175
176 kumo_mod.set(
177 "set_diagnostic_log_filter",
178 lua.create_function(move |_, filter: String| {
179 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
180 })?,
181 )?;
182
183 fn variadic_to_string(args: Variadic<Value>) -> String {
184 let mut output = String::new();
185 for (idx, item) in args.into_iter().enumerate() {
186 if idx > 0 {
187 output.push(' ');
188 }
189
190 match item {
191 Value::String(s) => match s.to_str() {
192 Ok(s) => output.push_str(&s),
193 Err(_) => {
194 let item = s.to_string_lossy();
195 output.push_str(&item);
196 }
197 },
198 item => match item.to_string() {
199 Ok(s) => output.push_str(&s),
200 Err(_) => output.push_str(&format!("{item:?}")),
201 },
202 }
203 }
204 output
205 }
206
207 fn get_caller(lua: &Lua) -> String {
208 match lua.inspect_stack(1, |info| {
209 let source = info.source();
210 let file_name = source
211 .short_src
212 .as_ref()
213 .map(|b| b.to_string())
214 .unwrap_or_else(String::new);
215 let file_name = match file_name.strip_prefix("[string \"") {
218 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
219 None => &file_name,
220 };
221
222 format!("{file_name}:{}", info.current_line().unwrap_or(0))
223 }) {
224 Some(info) => info,
225 None => "?".to_string(),
226 }
227 }
228
229 kumo_mod.set(
230 "log_error",
231 lua.create_function(move |lua, args: Variadic<Value>| {
232 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
233 let src = get_caller(lua);
234 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
235 }
236 Ok(())
237 })?,
238 )?;
239 kumo_mod.set(
240 "log_info",
241 lua.create_function(move |lua, args: Variadic<Value>| {
242 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
243 let src = get_caller(lua);
244 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
245 }
246 Ok(())
247 })?,
248 )?;
249 kumo_mod.set(
250 "log_warn",
251 lua.create_function(move |lua, args: Variadic<Value>| {
252 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
253 let src = get_caller(lua);
254 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
255 }
256 Ok(())
257 })?,
258 )?;
259 kumo_mod.set(
260 "log_debug",
261 lua.create_function(move |lua, args: Variadic<Value>| {
262 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
263 let src = get_caller(lua);
264 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
265 }
266 Ok(())
267 })?,
268 )?;
269
270 kumo_mod.set(
271 "set_max_spare_lua_contexts",
272 lua.create_function(move |_, limit: usize| {
273 config::set_max_spare(limit);
274 Ok(())
275 })?,
276 )?;
277
278 kumo_mod.set(
279 "set_max_lua_context_use_count",
280 lua.create_function(move |_, limit: usize| {
281 config::set_max_use(limit);
282 Ok(())
283 })?,
284 )?;
285
286 kumo_mod.set(
287 "set_max_lua_context_age",
288 lua.create_function(move |_, limit: usize| {
289 config::set_max_age(limit);
290 Ok(())
291 })?,
292 )?;
293
294 kumo_mod.set(
295 "set_lua_gc_on_put",
296 lua.create_function(move |_, enable: u8| {
297 config::set_gc_on_put(enable);
298 Ok(())
299 })?,
300 )?;
301
302 kumo_mod.set(
303 "set_lruttl_cache_capacity",
304 lua.create_function(move |_, (name, capacity): (String, usize)| {
305 if lruttl::set_cache_capacity(&name, capacity) {
306 Ok(())
307 } else {
308 Err(mlua::Error::external(format!(
309 "could not set capacity for cache {name} \
310 as that is not a pre-defined lruttl cache name"
311 )))
312 }
313 })?,
314 )?;
315
316 kumo_mod.set(
317 "set_config_monitor_globs",
318 lua.create_function(move |_, globs: Vec<String>| {
319 config::epoch::set_globs(globs).map_err(any_err)?;
320 Ok(())
321 })?,
322 )?;
323 kumo_mod.set(
324 "eval_config_monitor_globs",
325 lua.create_async_function(|_, _: ()| async move {
326 config::epoch::eval_globs().await.map_err(any_err)
327 })?,
328 )?;
329 kumo_mod.set(
330 "bump_config_epoch",
331 lua.create_function(move |_, _: ()| {
332 config::epoch::bump_current_epoch();
333 Ok(())
334 })?,
335 )?;
336
337 kumo_mod.set(
338 "available_parallelism",
339 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
340 )?;
341
342 kumo_mod.set(
343 "set_memory_hard_limit",
344 lua.create_function(move |_, limit: usize| {
345 kumo_server_memory::set_hard_limit(limit);
346 Ok(())
347 })?,
348 )?;
349
350 kumo_mod.set(
351 "set_memory_low_thresh",
352 lua.create_function(move |_, limit: usize| {
353 kumo_server_memory::set_low_memory_thresh(limit);
354 Ok(())
355 })?,
356 )?;
357
358 kumo_mod.set(
359 "set_memory_soft_limit",
360 lua.create_function(move |_, limit: usize| {
361 kumo_server_memory::set_soft_limit(limit);
362 Ok(())
363 })?,
364 )?;
365
366 kumo_mod.set(
367 "configure_redis_throttles",
368 lua.create_async_function(|lua, params: Value| async move {
369 let key: RedisConnKey = from_lua_value(&lua, params)?;
370 let conn = key.open().map_err(any_err)?;
371 conn.ping().await.map_err(any_err)?;
372 throttle::use_redis(conn).await.map_err(any_err)
373 })?,
374 )?;
375
376 kumo_mod.set(
377 "traceback",
378 lua.create_function(move |lua: &Lua, level: usize| {
379 #[derive(Debug, Serialize)]
380 struct Frame {
381 event: String,
382 name: Option<String>,
383 name_what: Option<String>,
384 source: Option<String>,
385 short_src: Option<String>,
386 line_defined: Option<usize>,
387 last_line_defined: Option<usize>,
388 what: &'static str,
389 curr_line: Option<usize>,
390 is_tail_call: bool,
391 }
392
393 let mut frames = vec![];
394 for n in level.. {
395 match lua.inspect_stack(n, |info| {
396 let source = info.source();
397 let names = info.names();
398 Frame {
399 curr_line: info.current_line(),
400 is_tail_call: info.is_tail_call(),
401 event: format!("{:?}", info.event()),
402 last_line_defined: source.last_line_defined,
403 line_defined: source.line_defined,
404 name: names.name.as_ref().map(|b| b.to_string()),
405 name_what: names.name_what.as_ref().map(|b| b.to_string()),
406 source: source.source.as_ref().map(|b| b.to_string()),
407 short_src: source.short_src.as_ref().map(|b| b.to_string()),
408 what: source.what,
409 }
410 }) {
411 Some(frame) => {
412 frames.push(frame);
413 }
414 None => break,
415 }
416 }
417
418 lua.to_value(&frames)
419 })?,
420 )?;
421
422 #[derive(Deserialize, Debug)]
425 struct TaskParams {
426 event_name: String,
427 args: Vec<serde_json::Value>,
428 }
429
430 impl TaskParams {
431 async fn run(&self) -> anyhow::Result<()> {
432 let mut config = load_config().await?;
433
434 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
435
436 config
437 .convert_args_and_call_callback(&sig, &self.args)
438 .await?;
439
440 config.put();
441
442 Ok(())
443 }
444 }
445
446 kumo_mod.set(
447 "spawn_task",
448 lua.create_function(|lua, params: Value| {
449 let params: TaskParams = lua.from_value(params)?;
450
451 if !config::is_validating() {
452 std::thread::Builder::new()
453 .name(format!("spawned-task-{}", params.event_name))
454 .spawn(move || {
455 let runtime = tokio::runtime::Builder::new_current_thread()
456 .enable_io()
457 .enable_time()
458 .on_thread_park(kumo_server_memory::purge_thread_cache)
459 .build()
460 .unwrap();
461 let event_name = params.event_name.clone();
462
463 let result = runtime.block_on(async move { params.run().await });
464 if let Err(err) = result {
465 tracing::error!("Error while dispatching {event_name}: {err:#}");
466 }
467 })?;
468 }
469
470 Ok(())
471 })?,
472 )?;
473
474 kumo_mod.set(
475 "spawn_thread_pool",
476 lua.create_function(|lua, params: Value| {
477 #[derive(Deserialize, Debug)]
478 struct ThreadPoolParams {
479 name: String,
480 num_threads: usize,
481 }
482
483 let params: ThreadPoolParams = lua.from_value(params)?;
484 let num_threads = AtomicUsize::new(params.num_threads);
485
486 if !config::is_validating() {
487 let _runtime = kumo_server_runtime::Runtime::new(
491 ¶ms.name,
492 |_| params.num_threads,
493 &num_threads,
494 )
495 .map_err(any_err)?;
496 }
497
498 Ok(())
499 })?,
500 )?;
501
502 kumo_mod.set(
503 "validation_failed",
504 lua.create_function(|_, ()| {
505 config::set_validation_failed();
506 Ok(())
507 })?,
508 )?;
509
510 kumo_mod.set(
511 "enable_memory_callstack_tracking",
512 lua.create_function(|_, enable: bool| {
513 kumo_server_memory::set_tracking_callstacks(enable);
514 Ok(())
515 })?,
516 )?;
517
518 kumo_mod.set(
522 "prometheus_metrics",
523 lua.create_async_function(|lua, ()| async move {
524 use tokio_stream::StreamExt;
525 let mut json_text = String::new();
526 let mut stream = kumo_prometheus::registry::Registry::stream_json();
527 while let Some(text) = stream.next().await {
528 json_text.push_str(&text);
529 }
530 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
531 lua.to_value_with(&value, serialize_options())
532 })?,
533 )?;
534
535 Ok(())
536}