1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_file_type::register,
31 mod_http::register,
32 mod_regex::register,
33 mod_serde::register,
34 mod_sqlite::register,
35 mod_string::register,
36 mod_time::register,
37 mod_dns_resolver::register,
38 mod_kafka::register,
39 mod_memoize::register,
40 mod_mimepart::register,
41 mod_mpsc::register,
42 mod_uuid::register,
43 kumo_api_types::shaping::register,
44 regex_set_map::register,
45 ] {
46 func(lua)?;
47 }
48
49 let kumo_mod = get_or_create_module(lua, "kumo")?;
50
51 fn event_registrar_name(name: &str) -> String {
52 format!("kumomta-event-registrars-{name}")
53 }
54
55 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
58 let decorated_name = event_registrar_name(name);
59 let mut call_stack = vec![];
60 for n in 1.. {
61 match lua.inspect_stack(n) {
62 Some(info) => {
63 let source = info.source();
64 call_stack.push(format!(
65 "{}:{}",
66 source
67 .short_src
68 .as_ref()
69 .map(|b| b.to_string())
70 .unwrap_or_else(String::new),
71 info.curr_line()
72 ));
73 }
74 None => break,
75 }
76 }
77
78 let tbl: Value = lua.named_registry_value(&decorated_name)?;
79 match tbl {
80 Value::Nil => {
81 let tbl = lua.create_table()?;
82 tbl.set(1, call_stack)?;
83 lua.set_named_registry_value(&decorated_name, tbl)?;
84 Ok(())
85 }
86 Value::Table(tbl) => {
87 let len = tbl.raw_len();
88 tbl.set(len + 1, call_stack)?;
89 Ok(())
90 }
91 _ => Err(mlua::Error::external(format!(
92 "registry key for {decorated_name} has invalid type",
93 ))),
94 }
95 }
96
97 kumo_mod.set(
100 "get_event_registrars",
101 lua.create_function(move |lua, name: String| {
102 let decorated_name = event_registrar_name(&name);
103 let value: Value = lua.named_registry_value(&decorated_name)?;
104 Ok(value)
105 })?,
106 )?;
107
108 kumo_mod.set(
109 "on",
110 lua.create_function(move |lua, (name, func): (String, Function)| {
111 let decorated_name = decorate_callback_name(&name);
112
113 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
114 if current_event != "main" {
115 return Err(mlua::Error::external(format!(
116 "Attempting to register an event handler via \
117 `kumo.on('{name}', ...)` from within the event handler \
118 '{current_event}'. You must move your event handler registration \
119 so that it is setup directly when the policy is loaded \
120 in order for it to consistently trigger and handle events."
121 )));
122 }
123 }
124
125 register_event_caller(lua, &name)?;
126
127 if config::does_callback_allow_multiple(&name) {
128 let tbl: Value = lua.named_registry_value(&decorated_name)?;
129 return match tbl {
130 Value::Nil => {
131 let tbl = lua.create_table()?;
132 tbl.set(1, func)?;
133 lua.set_named_registry_value(&decorated_name, tbl)?;
134 Ok(())
135 }
136 Value::Table(tbl) => {
137 let len = tbl.raw_len();
138 tbl.set(len + 1, func)?;
139 Ok(())
140 }
141 _ => Err(mlua::Error::external(format!(
142 "registry key for {decorated_name} has invalid type",
143 ))),
144 };
145 }
146
147 let existing: Value = lua.named_registry_value(&decorated_name)?;
148 match existing {
149 Value::Nil => {}
150 Value::Function(func) => {
151 let info = func.info();
152 let src = info.source.unwrap_or_else(|| "?".into());
153 let line = info.line_defined.unwrap_or(0);
154 return Err(mlua::Error::external(format!(
155 "{name} event already has a handler defined at {src}:{line}"
156 )));
157 }
158 _ => {
159 return Err(mlua::Error::external(format!(
160 "{name} event already has a handler"
161 )));
162 }
163 }
164
165 lua.set_named_registry_value(&decorated_name, func)?;
166 Ok(())
167 })?,
168 )?;
169
170 kumo_mod.set(
171 "set_diagnostic_log_filter",
172 lua.create_function(move |_, filter: String| {
173 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
174 })?,
175 )?;
176
177 fn variadic_to_string(args: Variadic<Value>) -> String {
178 let mut output = String::new();
179 for (idx, item) in args.into_iter().enumerate() {
180 if idx > 0 {
181 output.push(' ');
182 }
183
184 match item {
185 Value::String(s) => match s.to_str() {
186 Ok(s) => output.push_str(&s),
187 Err(_) => {
188 let item = s.to_string_lossy();
189 output.push_str(&item);
190 }
191 },
192 item => match item.to_string() {
193 Ok(s) => output.push_str(&s),
194 Err(_) => output.push_str(&format!("{item:?}")),
195 },
196 }
197 }
198 output
199 }
200
201 fn get_caller(lua: &Lua) -> String {
202 match lua.inspect_stack(1) {
203 Some(info) => {
204 let source = info.source();
205 let file_name = source
206 .short_src
207 .as_ref()
208 .map(|b| b.to_string())
209 .unwrap_or_else(String::new);
210 let file_name = match file_name.strip_prefix("[string \"") {
213 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
214 None => &file_name,
215 };
216
217 format!("{file_name}:{}", info.curr_line())
218 }
219 None => "?".to_string(),
220 }
221 }
222
223 kumo_mod.set(
224 "log_error",
225 lua.create_function(move |lua, args: Variadic<Value>| {
226 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
227 let src = get_caller(lua);
228 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
229 }
230 Ok(())
231 })?,
232 )?;
233 kumo_mod.set(
234 "log_info",
235 lua.create_function(move |lua, args: Variadic<Value>| {
236 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
237 let src = get_caller(lua);
238 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
239 }
240 Ok(())
241 })?,
242 )?;
243 kumo_mod.set(
244 "log_warn",
245 lua.create_function(move |lua, args: Variadic<Value>| {
246 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
247 let src = get_caller(lua);
248 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
249 }
250 Ok(())
251 })?,
252 )?;
253 kumo_mod.set(
254 "log_debug",
255 lua.create_function(move |lua, args: Variadic<Value>| {
256 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
257 let src = get_caller(lua);
258 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
259 }
260 Ok(())
261 })?,
262 )?;
263
264 kumo_mod.set(
265 "set_max_spare_lua_contexts",
266 lua.create_function(move |_, limit: usize| {
267 config::set_max_spare(limit);
268 Ok(())
269 })?,
270 )?;
271
272 kumo_mod.set(
273 "set_max_lua_context_use_count",
274 lua.create_function(move |_, limit: usize| {
275 config::set_max_use(limit);
276 Ok(())
277 })?,
278 )?;
279
280 kumo_mod.set(
281 "set_max_lua_context_age",
282 lua.create_function(move |_, limit: usize| {
283 config::set_max_age(limit);
284 Ok(())
285 })?,
286 )?;
287
288 kumo_mod.set(
289 "set_lua_gc_on_put",
290 lua.create_function(move |_, enable: u8| {
291 config::set_gc_on_put(enable);
292 Ok(())
293 })?,
294 )?;
295
296 kumo_mod.set(
297 "set_lruttl_cache_capacity",
298 lua.create_function(move |_, (name, capacity): (String, usize)| {
299 if lruttl::set_cache_capacity(&name, capacity) {
300 Ok(())
301 } else {
302 Err(mlua::Error::external(format!(
303 "could not set capacity for cache {name} \
304 as that is not a pre-defined lruttl cache name"
305 )))
306 }
307 })?,
308 )?;
309
310 kumo_mod.set(
311 "set_config_monitor_globs",
312 lua.create_function(move |_, globs: Vec<String>| {
313 config::epoch::set_globs(globs).map_err(any_err)?;
314 Ok(())
315 })?,
316 )?;
317 kumo_mod.set(
318 "eval_config_monitor_globs",
319 lua.create_async_function(|_, _: ()| async move {
320 config::epoch::eval_globs().await.map_err(any_err)
321 })?,
322 )?;
323 kumo_mod.set(
324 "bump_config_epoch",
325 lua.create_function(move |_, _: ()| {
326 config::epoch::bump_current_epoch();
327 Ok(())
328 })?,
329 )?;
330
331 kumo_mod.set(
332 "available_parallelism",
333 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
334 )?;
335
336 kumo_mod.set(
337 "set_memory_hard_limit",
338 lua.create_function(move |_, limit: usize| {
339 kumo_server_memory::set_hard_limit(limit);
340 Ok(())
341 })?,
342 )?;
343
344 kumo_mod.set(
345 "set_memory_low_thresh",
346 lua.create_function(move |_, limit: usize| {
347 kumo_server_memory::set_low_memory_thresh(limit);
348 Ok(())
349 })?,
350 )?;
351
352 kumo_mod.set(
353 "set_memory_soft_limit",
354 lua.create_function(move |_, limit: usize| {
355 kumo_server_memory::set_soft_limit(limit);
356 Ok(())
357 })?,
358 )?;
359
360 kumo_mod.set(
361 "configure_redis_throttles",
362 lua.create_async_function(|lua, params: Value| async move {
363 let key: RedisConnKey = from_lua_value(&lua, params)?;
364 let conn = key.open().map_err(any_err)?;
365 conn.ping().await.map_err(any_err)?;
366 throttle::use_redis(conn).await.map_err(any_err)
367 })?,
368 )?;
369
370 kumo_mod.set(
371 "traceback",
372 lua.create_function(move |lua: &Lua, level: usize| {
373 #[derive(Debug, Serialize)]
374 struct Frame {
375 event: String,
376 name: Option<String>,
377 name_what: Option<String>,
378 source: Option<String>,
379 short_src: Option<String>,
380 line_defined: Option<usize>,
381 last_line_defined: Option<usize>,
382 what: &'static str,
383 curr_line: i32,
384 is_tail_call: bool,
385 }
386
387 let mut frames = vec![];
388 for n in level.. {
389 match lua.inspect_stack(n) {
390 Some(info) => {
391 let source = info.source();
392 let names = info.names();
393 frames.push(Frame {
394 curr_line: info.curr_line(),
395 is_tail_call: info.is_tail_call(),
396 event: format!("{:?}", info.event()),
397 last_line_defined: source.last_line_defined,
398 line_defined: source.line_defined,
399 name: names.name.as_ref().map(|b| b.to_string()),
400 name_what: names.name_what.as_ref().map(|b| b.to_string()),
401 source: source.source.as_ref().map(|b| b.to_string()),
402 short_src: source.short_src.as_ref().map(|b| b.to_string()),
403 what: source.what,
404 });
405 }
406 None => break,
407 }
408 }
409
410 lua.to_value(&frames)
411 })?,
412 )?;
413
414 #[derive(Deserialize, Debug)]
417 struct TaskParams {
418 event_name: String,
419 args: Vec<serde_json::Value>,
420 }
421
422 impl TaskParams {
423 async fn run(&self) -> anyhow::Result<()> {
424 let mut config = load_config().await?;
425
426 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
427
428 config
429 .convert_args_and_call_callback(&sig, &self.args)
430 .await?;
431
432 config.put();
433
434 Ok(())
435 }
436 }
437
438 kumo_mod.set(
439 "spawn_task",
440 lua.create_function(|lua, params: Value| {
441 let params: TaskParams = lua.from_value(params)?;
442
443 if !config::is_validating() {
444 std::thread::Builder::new()
445 .name(format!("spawned-task-{}", params.event_name))
446 .spawn(move || {
447 let runtime = tokio::runtime::Builder::new_current_thread()
448 .enable_io()
449 .enable_time()
450 .on_thread_park(kumo_server_memory::purge_thread_cache)
451 .build()
452 .unwrap();
453 let event_name = params.event_name.clone();
454
455 let result = runtime.block_on(async move { params.run().await });
456 if let Err(err) = result {
457 tracing::error!("Error while dispatching {event_name}: {err:#}");
458 }
459 })?;
460 }
461
462 Ok(())
463 })?,
464 )?;
465
466 kumo_mod.set(
467 "spawn_thread_pool",
468 lua.create_function(|lua, params: Value| {
469 #[derive(Deserialize, Debug)]
470 struct ThreadPoolParams {
471 name: String,
472 num_threads: usize,
473 }
474
475 let params: ThreadPoolParams = lua.from_value(params)?;
476 let num_threads = AtomicUsize::new(params.num_threads);
477
478 if !config::is_validating() {
479 let _runtime = kumo_server_runtime::Runtime::new(
483 ¶ms.name,
484 |_| params.num_threads,
485 &num_threads,
486 )
487 .map_err(any_err)?;
488 }
489
490 Ok(())
491 })?,
492 )?;
493
494 kumo_mod.set(
495 "validation_failed",
496 lua.create_function(|_, ()| {
497 config::set_validation_failed();
498 Ok(())
499 })?,
500 )?;
501
502 kumo_mod.set(
503 "enable_memory_callstack_tracking",
504 lua.create_function(|_, enable: bool| {
505 kumo_server_memory::set_tracking_callstacks(enable);
506 Ok(())
507 })?,
508 )?;
509
510 Ok(())
511}