1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_file_type::register,
31 mod_http::register,
32 mod_regex::register,
33 mod_serde::register,
34 mod_sqlite::register,
35 mod_crypto::register,
36 mod_smtp_response_normalize::register,
37 mod_string::register,
38 mod_time::register,
39 mod_dns_resolver::register,
40 mod_kafka::register,
41 mod_memoize::register,
42 mod_mimepart::register,
43 mod_mpsc::register,
44 mod_uuid::register,
45 kumo_api_types::shaping::register,
46 regex_set_map::register,
47 ] {
48 func(lua)?;
49 }
50
51 let kumo_mod = get_or_create_module(lua, "kumo")?;
52 kumo_mod.set("version", version_info::kumo_version())?;
53
54 fn event_registrar_name(name: &str) -> String {
55 format!("kumomta-event-registrars-{name}")
56 }
57
58 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
61 let decorated_name = event_registrar_name(name);
62 let mut call_stack = vec![];
63 for n in 1.. {
64 match lua.inspect_stack(n, |info| {
65 let source = info.source();
66 format!(
67 "{}:{}",
68 source
69 .short_src
70 .as_ref()
71 .map(|b| b.to_string())
72 .unwrap_or_else(String::new),
73 info.current_line().unwrap_or(0)
74 )
75 }) {
76 Some(info) => {
77 call_stack.push(info);
78 }
79 None => break,
80 }
81 }
82
83 let tbl: Value = lua.named_registry_value(&decorated_name)?;
84 match tbl {
85 Value::Nil => {
86 let tbl = lua.create_table()?;
87 tbl.set(1, call_stack)?;
88 lua.set_named_registry_value(&decorated_name, tbl)?;
89 Ok(())
90 }
91 Value::Table(tbl) => {
92 let len = tbl.raw_len();
93 tbl.set(len + 1, call_stack)?;
94 Ok(())
95 }
96 _ => Err(mlua::Error::external(format!(
97 "registry key for {decorated_name} has invalid type",
98 ))),
99 }
100 }
101
102 kumo_mod.set(
105 "get_event_registrars",
106 lua.create_function(move |lua, name: String| {
107 let decorated_name = event_registrar_name(&name);
108 let value: Value = lua.named_registry_value(&decorated_name)?;
109 Ok(value)
110 })?,
111 )?;
112
113 kumo_mod.set(
114 "on",
115 lua.create_function(move |lua, (name, func): (String, Function)| {
116 let decorated_name = decorate_callback_name(&name);
117
118 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
119 if current_event != "main" {
120 return Err(mlua::Error::external(format!(
121 "Attempting to register an event handler via \
122 `kumo.on('{name}', ...)` from within the event handler \
123 '{current_event}'. You must move your event handler registration \
124 so that it is setup directly when the policy is loaded \
125 in order for it to consistently trigger and handle events."
126 )));
127 }
128 }
129
130 register_event_caller(lua, &name)?;
131
132 if config::does_callback_allow_multiple(&name) {
133 let tbl: Value = lua.named_registry_value(&decorated_name)?;
134 return match tbl {
135 Value::Nil => {
136 let tbl = lua.create_table()?;
137 tbl.set(1, func)?;
138 lua.set_named_registry_value(&decorated_name, tbl)?;
139 Ok(())
140 }
141 Value::Table(tbl) => {
142 let len = tbl.raw_len();
143 tbl.set(len + 1, func)?;
144 Ok(())
145 }
146 _ => Err(mlua::Error::external(format!(
147 "registry key for {decorated_name} has invalid type",
148 ))),
149 };
150 }
151
152 let existing: Value = lua.named_registry_value(&decorated_name)?;
153 match existing {
154 Value::Nil => {}
155 Value::Function(func) => {
156 let info = func.info();
157 let src = info.source.unwrap_or_else(|| "?".into());
158 let line = info.line_defined.unwrap_or(0);
159 return Err(mlua::Error::external(format!(
160 "{name} event already has a handler defined at {src}:{line}"
161 )));
162 }
163 _ => {
164 return Err(mlua::Error::external(format!(
165 "{name} event already has a handler"
166 )));
167 }
168 }
169
170 lua.set_named_registry_value(&decorated_name, func)?;
171 Ok(())
172 })?,
173 )?;
174
175 kumo_mod.set(
176 "set_diagnostic_log_filter",
177 lua.create_function(move |_, filter: String| {
178 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
179 })?,
180 )?;
181
182 fn variadic_to_string(args: Variadic<Value>) -> String {
183 let mut output = String::new();
184 for (idx, item) in args.into_iter().enumerate() {
185 if idx > 0 {
186 output.push(' ');
187 }
188
189 match item {
190 Value::String(s) => match s.to_str() {
191 Ok(s) => output.push_str(&s),
192 Err(_) => {
193 let item = s.to_string_lossy();
194 output.push_str(&item);
195 }
196 },
197 item => match item.to_string() {
198 Ok(s) => output.push_str(&s),
199 Err(_) => output.push_str(&format!("{item:?}")),
200 },
201 }
202 }
203 output
204 }
205
206 fn get_caller(lua: &Lua) -> String {
207 match lua.inspect_stack(1, |info| {
208 let source = info.source();
209 let file_name = source
210 .short_src
211 .as_ref()
212 .map(|b| b.to_string())
213 .unwrap_or_else(String::new);
214 let file_name = match file_name.strip_prefix("[string \"") {
217 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
218 None => &file_name,
219 };
220
221 format!("{file_name}:{}", info.current_line().unwrap_or(0))
222 }) {
223 Some(info) => info,
224 None => "?".to_string(),
225 }
226 }
227
228 kumo_mod.set(
229 "log_error",
230 lua.create_function(move |lua, args: Variadic<Value>| {
231 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
232 let src = get_caller(lua);
233 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
234 }
235 Ok(())
236 })?,
237 )?;
238 kumo_mod.set(
239 "log_info",
240 lua.create_function(move |lua, args: Variadic<Value>| {
241 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
242 let src = get_caller(lua);
243 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
244 }
245 Ok(())
246 })?,
247 )?;
248 kumo_mod.set(
249 "log_warn",
250 lua.create_function(move |lua, args: Variadic<Value>| {
251 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
252 let src = get_caller(lua);
253 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
254 }
255 Ok(())
256 })?,
257 )?;
258 kumo_mod.set(
259 "log_debug",
260 lua.create_function(move |lua, args: Variadic<Value>| {
261 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
262 let src = get_caller(lua);
263 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
264 }
265 Ok(())
266 })?,
267 )?;
268
269 kumo_mod.set(
270 "set_max_spare_lua_contexts",
271 lua.create_function(move |_, limit: usize| {
272 config::set_max_spare(limit);
273 Ok(())
274 })?,
275 )?;
276
277 kumo_mod.set(
278 "set_max_lua_context_use_count",
279 lua.create_function(move |_, limit: usize| {
280 config::set_max_use(limit);
281 Ok(())
282 })?,
283 )?;
284
285 kumo_mod.set(
286 "set_max_lua_context_age",
287 lua.create_function(move |_, limit: usize| {
288 config::set_max_age(limit);
289 Ok(())
290 })?,
291 )?;
292
293 kumo_mod.set(
294 "set_lua_gc_on_put",
295 lua.create_function(move |_, enable: u8| {
296 config::set_gc_on_put(enable);
297 Ok(())
298 })?,
299 )?;
300
301 kumo_mod.set(
302 "set_lruttl_cache_capacity",
303 lua.create_function(move |_, (name, capacity): (String, usize)| {
304 if lruttl::set_cache_capacity(&name, capacity) {
305 Ok(())
306 } else {
307 Err(mlua::Error::external(format!(
308 "could not set capacity for cache {name} \
309 as that is not a pre-defined lruttl cache name"
310 )))
311 }
312 })?,
313 )?;
314
315 kumo_mod.set(
316 "set_config_monitor_globs",
317 lua.create_function(move |_, globs: Vec<String>| {
318 config::epoch::set_globs(globs).map_err(any_err)?;
319 Ok(())
320 })?,
321 )?;
322 kumo_mod.set(
323 "eval_config_monitor_globs",
324 lua.create_async_function(|_, _: ()| async move {
325 config::epoch::eval_globs().await.map_err(any_err)
326 })?,
327 )?;
328 kumo_mod.set(
329 "bump_config_epoch",
330 lua.create_function(move |_, _: ()| {
331 config::epoch::bump_current_epoch();
332 Ok(())
333 })?,
334 )?;
335
336 kumo_mod.set(
337 "available_parallelism",
338 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
339 )?;
340
341 kumo_mod.set(
342 "set_memory_hard_limit",
343 lua.create_function(move |_, limit: usize| {
344 kumo_server_memory::set_hard_limit(limit);
345 Ok(())
346 })?,
347 )?;
348
349 kumo_mod.set(
350 "set_memory_low_thresh",
351 lua.create_function(move |_, limit: usize| {
352 kumo_server_memory::set_low_memory_thresh(limit);
353 Ok(())
354 })?,
355 )?;
356
357 kumo_mod.set(
358 "set_memory_soft_limit",
359 lua.create_function(move |_, limit: usize| {
360 kumo_server_memory::set_soft_limit(limit);
361 Ok(())
362 })?,
363 )?;
364
365 kumo_mod.set(
366 "configure_redis_throttles",
367 lua.create_async_function(|lua, params: Value| async move {
368 let key: RedisConnKey = from_lua_value(&lua, params)?;
369 let conn = key.open().map_err(any_err)?;
370 conn.ping().await.map_err(any_err)?;
371 throttle::use_redis(conn).await.map_err(any_err)
372 })?,
373 )?;
374
375 kumo_mod.set(
376 "traceback",
377 lua.create_function(move |lua: &Lua, level: usize| {
378 #[derive(Debug, Serialize)]
379 struct Frame {
380 event: String,
381 name: Option<String>,
382 name_what: Option<String>,
383 source: Option<String>,
384 short_src: Option<String>,
385 line_defined: Option<usize>,
386 last_line_defined: Option<usize>,
387 what: &'static str,
388 curr_line: Option<usize>,
389 is_tail_call: bool,
390 }
391
392 let mut frames = vec![];
393 for n in level.. {
394 match lua.inspect_stack(n, |info| {
395 let source = info.source();
396 let names = info.names();
397 Frame {
398 curr_line: info.current_line(),
399 is_tail_call: info.is_tail_call(),
400 event: format!("{:?}", info.event()),
401 last_line_defined: source.last_line_defined,
402 line_defined: source.line_defined,
403 name: names.name.as_ref().map(|b| b.to_string()),
404 name_what: names.name_what.as_ref().map(|b| b.to_string()),
405 source: source.source.as_ref().map(|b| b.to_string()),
406 short_src: source.short_src.as_ref().map(|b| b.to_string()),
407 what: source.what,
408 }
409 }) {
410 Some(frame) => {
411 frames.push(frame);
412 }
413 None => break,
414 }
415 }
416
417 lua.to_value(&frames)
418 })?,
419 )?;
420
421 #[derive(Deserialize, Debug)]
424 struct TaskParams {
425 event_name: String,
426 args: Vec<serde_json::Value>,
427 }
428
429 impl TaskParams {
430 async fn run(&self) -> anyhow::Result<()> {
431 let mut config = load_config().await?;
432
433 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
434
435 config
436 .convert_args_and_call_callback(&sig, &self.args)
437 .await?;
438
439 config.put();
440
441 Ok(())
442 }
443 }
444
445 kumo_mod.set(
446 "spawn_task",
447 lua.create_function(|lua, params: Value| {
448 let params: TaskParams = lua.from_value(params)?;
449
450 if !config::is_validating() {
451 std::thread::Builder::new()
452 .name(format!("spawned-task-{}", params.event_name))
453 .spawn(move || {
454 let runtime = tokio::runtime::Builder::new_current_thread()
455 .enable_io()
456 .enable_time()
457 .on_thread_park(kumo_server_memory::purge_thread_cache)
458 .build()
459 .unwrap();
460 let event_name = params.event_name.clone();
461
462 let result = runtime.block_on(async move { params.run().await });
463 if let Err(err) = result {
464 tracing::error!("Error while dispatching {event_name}: {err:#}");
465 }
466 })?;
467 }
468
469 Ok(())
470 })?,
471 )?;
472
473 kumo_mod.set(
474 "spawn_thread_pool",
475 lua.create_function(|lua, params: Value| {
476 #[derive(Deserialize, Debug)]
477 struct ThreadPoolParams {
478 name: String,
479 num_threads: usize,
480 }
481
482 let params: ThreadPoolParams = lua.from_value(params)?;
483 let num_threads = AtomicUsize::new(params.num_threads);
484
485 if !config::is_validating() {
486 let _runtime = kumo_server_runtime::Runtime::new(
490 ¶ms.name,
491 |_| params.num_threads,
492 &num_threads,
493 )
494 .map_err(any_err)?;
495 }
496
497 Ok(())
498 })?,
499 )?;
500
501 kumo_mod.set(
502 "validation_failed",
503 lua.create_function(|_, ()| {
504 config::set_validation_failed();
505 Ok(())
506 })?,
507 )?;
508
509 kumo_mod.set(
510 "enable_memory_callstack_tracking",
511 lua.create_function(|_, enable: bool| {
512 kumo_server_memory::set_tracking_callstacks(enable);
513 Ok(())
514 })?,
515 )?;
516
517 kumo_mod.set(
521 "prometheus_metrics",
522 lua.create_async_function(|lua, ()| async move {
523 use tokio_stream::StreamExt;
524 let mut json_text = String::new();
525 let mut stream = kumo_prometheus::registry::Registry::stream_json();
526 while let Some(text) = stream.next().await {
527 json_text.push_str(&text);
528 }
529 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
530 lua.to_value_with(&value, serialize_options())
531 })?,
532 )?;
533
534 Ok(())
535}