1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_http::register,
31 mod_regex::register,
32 mod_serde::register,
33 mod_sqlite::register,
34 mod_string::register,
35 mod_time::register,
36 mod_dns_resolver::register,
37 mod_kafka::register,
38 mod_memoize::register,
39 mod_mpsc::register,
40 mod_uuid::register,
41 kumo_api_types::shaping::register,
42 regex_set_map::register,
43 ] {
44 func(lua)?;
45 }
46
47 let kumo_mod = get_or_create_module(lua, "kumo")?;
48
49 fn event_registrar_name(name: &str) -> String {
50 format!("kumomta-event-registrars-{name}")
51 }
52
53 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
56 let decorated_name = event_registrar_name(name);
57 let mut call_stack = vec![];
58 for n in 1.. {
59 match lua.inspect_stack(n) {
60 Some(info) => {
61 let source = info.source();
62 call_stack.push(format!(
63 "{}:{}",
64 source
65 .short_src
66 .as_ref()
67 .map(|b| b.to_string())
68 .unwrap_or_else(String::new),
69 info.curr_line()
70 ));
71 }
72 None => break,
73 }
74 }
75
76 let tbl: Value = lua.named_registry_value(&decorated_name)?;
77 match tbl {
78 Value::Nil => {
79 let tbl = lua.create_table()?;
80 tbl.set(1, call_stack)?;
81 lua.set_named_registry_value(&decorated_name, tbl)?;
82 Ok(())
83 }
84 Value::Table(tbl) => {
85 let len = tbl.raw_len();
86 tbl.set(len + 1, call_stack)?;
87 Ok(())
88 }
89 _ => Err(mlua::Error::external(format!(
90 "registry key for {decorated_name} has invalid type",
91 ))),
92 }
93 }
94
95 kumo_mod.set(
98 "get_event_registrars",
99 lua.create_function(move |lua, name: String| {
100 let decorated_name = event_registrar_name(&name);
101 let value: Value = lua.named_registry_value(&decorated_name)?;
102 Ok(value)
103 })?,
104 )?;
105
106 kumo_mod.set(
107 "on",
108 lua.create_function(move |lua, (name, func): (String, Function)| {
109 let decorated_name = decorate_callback_name(&name);
110
111 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
112 if current_event != "main" {
113 return Err(mlua::Error::external(format!(
114 "Attempting to register an event handler via \
115 `kumo.on('{name}', ...)` from within the event handler \
116 '{current_event}'. You must move your event handler registration \
117 so that it is setup directly when the policy is loaded \
118 in order for it to consistently trigger and handle events."
119 )));
120 }
121 }
122
123 register_event_caller(lua, &name)?;
124
125 if config::does_callback_allow_multiple(&name) {
126 let tbl: Value = lua.named_registry_value(&decorated_name)?;
127 return match tbl {
128 Value::Nil => {
129 let tbl = lua.create_table()?;
130 tbl.set(1, func)?;
131 lua.set_named_registry_value(&decorated_name, tbl)?;
132 Ok(())
133 }
134 Value::Table(tbl) => {
135 let len = tbl.raw_len();
136 tbl.set(len + 1, func)?;
137 Ok(())
138 }
139 _ => Err(mlua::Error::external(format!(
140 "registry key for {decorated_name} has invalid type",
141 ))),
142 };
143 }
144
145 let existing: Value = lua.named_registry_value(&decorated_name)?;
146 match existing {
147 Value::Nil => {}
148 Value::Function(func) => {
149 let info = func.info();
150 let src = info.source.unwrap_or_else(|| "?".into());
151 let line = info.line_defined.unwrap_or(0);
152 return Err(mlua::Error::external(format!(
153 "{name} event already has a handler defined at {src}:{line}"
154 )));
155 }
156 _ => {
157 return Err(mlua::Error::external(format!(
158 "{name} event already has a handler"
159 )));
160 }
161 }
162
163 lua.set_named_registry_value(&decorated_name, func)?;
164 Ok(())
165 })?,
166 )?;
167
168 kumo_mod.set(
169 "set_diagnostic_log_filter",
170 lua.create_function(move |_, filter: String| {
171 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
172 })?,
173 )?;
174
175 fn variadic_to_string(args: Variadic<Value>) -> String {
176 let mut output = String::new();
177 for (idx, item) in args.into_iter().enumerate() {
178 if idx > 0 {
179 output.push(' ');
180 }
181
182 match item {
183 Value::String(s) => match s.to_str() {
184 Ok(s) => output.push_str(&s),
185 Err(_) => {
186 let item = s.to_string_lossy();
187 output.push_str(&item);
188 }
189 },
190 item => match item.to_string() {
191 Ok(s) => output.push_str(&s),
192 Err(_) => output.push_str(&format!("{item:?}")),
193 },
194 }
195 }
196 output
197 }
198
199 fn get_caller(lua: &Lua) -> String {
200 match lua.inspect_stack(1) {
201 Some(info) => {
202 let source = info.source();
203 let file_name = source
204 .short_src
205 .as_ref()
206 .map(|b| b.to_string())
207 .unwrap_or_else(String::new);
208 let file_name = match file_name.strip_prefix("[string \"") {
211 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
212 None => &file_name,
213 };
214
215 format!("{file_name}:{}", info.curr_line())
216 }
217 None => "?".to_string(),
218 }
219 }
220
221 kumo_mod.set(
222 "log_error",
223 lua.create_function(move |lua, args: Variadic<Value>| {
224 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
225 let src = get_caller(lua);
226 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
227 }
228 Ok(())
229 })?,
230 )?;
231 kumo_mod.set(
232 "log_info",
233 lua.create_function(move |lua, args: Variadic<Value>| {
234 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
235 let src = get_caller(lua);
236 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
237 }
238 Ok(())
239 })?,
240 )?;
241 kumo_mod.set(
242 "log_warn",
243 lua.create_function(move |lua, args: Variadic<Value>| {
244 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
245 let src = get_caller(lua);
246 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
247 }
248 Ok(())
249 })?,
250 )?;
251 kumo_mod.set(
252 "log_debug",
253 lua.create_function(move |lua, args: Variadic<Value>| {
254 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
255 let src = get_caller(lua);
256 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
257 }
258 Ok(())
259 })?,
260 )?;
261
262 kumo_mod.set(
263 "set_max_spare_lua_contexts",
264 lua.create_function(move |_, limit: usize| {
265 config::set_max_spare(limit);
266 Ok(())
267 })?,
268 )?;
269
270 kumo_mod.set(
271 "set_max_lua_context_use_count",
272 lua.create_function(move |_, limit: usize| {
273 config::set_max_use(limit);
274 Ok(())
275 })?,
276 )?;
277
278 kumo_mod.set(
279 "set_max_lua_context_age",
280 lua.create_function(move |_, limit: usize| {
281 config::set_max_age(limit);
282 Ok(())
283 })?,
284 )?;
285
286 kumo_mod.set(
287 "set_lua_gc_on_put",
288 lua.create_function(move |_, enable: u8| {
289 config::set_gc_on_put(enable);
290 Ok(())
291 })?,
292 )?;
293
294 kumo_mod.set(
295 "set_lruttl_cache_capacity",
296 lua.create_function(move |_, (name, capacity): (String, usize)| {
297 if lruttl::set_cache_capacity(&name, capacity) {
298 Ok(())
299 } else {
300 Err(mlua::Error::external(format!(
301 "could not set capacity for cache {name} \
302 as that is not a pre-defined lruttl cache name"
303 )))
304 }
305 })?,
306 )?;
307
308 kumo_mod.set(
309 "set_config_monitor_globs",
310 lua.create_function(move |_, globs: Vec<String>| {
311 config::epoch::set_globs(globs).map_err(any_err)?;
312 Ok(())
313 })?,
314 )?;
315 kumo_mod.set(
316 "eval_config_monitor_globs",
317 lua.create_async_function(|_, _: ()| async move {
318 config::epoch::eval_globs().await.map_err(any_err)
319 })?,
320 )?;
321 kumo_mod.set(
322 "bump_config_epoch",
323 lua.create_function(move |_, _: ()| {
324 config::epoch::bump_current_epoch();
325 Ok(())
326 })?,
327 )?;
328
329 kumo_mod.set(
330 "available_parallelism",
331 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
332 )?;
333
334 kumo_mod.set(
335 "set_memory_hard_limit",
336 lua.create_function(move |_, limit: usize| {
337 kumo_server_memory::set_hard_limit(limit);
338 Ok(())
339 })?,
340 )?;
341
342 kumo_mod.set(
343 "set_memory_low_thresh",
344 lua.create_function(move |_, limit: usize| {
345 kumo_server_memory::set_low_memory_thresh(limit);
346 Ok(())
347 })?,
348 )?;
349
350 kumo_mod.set(
351 "set_memory_soft_limit",
352 lua.create_function(move |_, limit: usize| {
353 kumo_server_memory::set_soft_limit(limit);
354 Ok(())
355 })?,
356 )?;
357
358 kumo_mod.set(
359 "configure_redis_throttles",
360 lua.create_async_function(|lua, params: Value| async move {
361 let key: RedisConnKey = from_lua_value(&lua, params)?;
362 let conn = key.open().map_err(any_err)?;
363 conn.ping().await.map_err(any_err)?;
364 throttle::use_redis(conn).await.map_err(any_err)
365 })?,
366 )?;
367
368 kumo_mod.set(
369 "traceback",
370 lua.create_function(move |lua: &Lua, level: usize| {
371 #[derive(Debug, Serialize)]
372 struct Frame {
373 event: String,
374 name: Option<String>,
375 name_what: Option<String>,
376 source: Option<String>,
377 short_src: Option<String>,
378 line_defined: Option<usize>,
379 last_line_defined: Option<usize>,
380 what: &'static str,
381 curr_line: i32,
382 is_tail_call: bool,
383 }
384
385 let mut frames = vec![];
386 for n in level.. {
387 match lua.inspect_stack(n) {
388 Some(info) => {
389 let source = info.source();
390 let names = info.names();
391 frames.push(Frame {
392 curr_line: info.curr_line(),
393 is_tail_call: info.is_tail_call(),
394 event: format!("{:?}", info.event()),
395 last_line_defined: source.last_line_defined,
396 line_defined: source.line_defined,
397 name: names.name.as_ref().map(|b| b.to_string()),
398 name_what: names.name_what.as_ref().map(|b| b.to_string()),
399 source: source.source.as_ref().map(|b| b.to_string()),
400 short_src: source.short_src.as_ref().map(|b| b.to_string()),
401 what: source.what,
402 });
403 }
404 None => break,
405 }
406 }
407
408 lua.to_value(&frames)
409 })?,
410 )?;
411
412 #[derive(Deserialize, Debug)]
415 struct TaskParams {
416 event_name: String,
417 args: Vec<serde_json::Value>,
418 }
419
420 impl TaskParams {
421 async fn run(&self) -> anyhow::Result<()> {
422 let mut config = load_config().await?;
423
424 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
425
426 config
427 .convert_args_and_call_callback(&sig, &self.args)
428 .await?;
429
430 config.put();
431
432 Ok(())
433 }
434 }
435
436 kumo_mod.set(
437 "spawn_task",
438 lua.create_function(|lua, params: Value| {
439 let params: TaskParams = lua.from_value(params)?;
440
441 if !config::is_validating() {
442 std::thread::Builder::new()
443 .name(format!("spawned-task-{}", params.event_name))
444 .spawn(move || {
445 let runtime = tokio::runtime::Builder::new_current_thread()
446 .enable_io()
447 .enable_time()
448 .on_thread_park(kumo_server_memory::purge_thread_cache)
449 .build()
450 .unwrap();
451 let event_name = params.event_name.clone();
452
453 let result = runtime.block_on(async move { params.run().await });
454 if let Err(err) = result {
455 tracing::error!("Error while dispatching {event_name}: {err:#}");
456 }
457 })?;
458 }
459
460 Ok(())
461 })?,
462 )?;
463
464 kumo_mod.set(
465 "spawn_thread_pool",
466 lua.create_function(|lua, params: Value| {
467 #[derive(Deserialize, Debug)]
468 struct ThreadPoolParams {
469 name: String,
470 num_threads: usize,
471 }
472
473 let params: ThreadPoolParams = lua.from_value(params)?;
474 let num_threads = AtomicUsize::new(params.num_threads);
475
476 if !config::is_validating() {
477 let _runtime = kumo_server_runtime::Runtime::new(
481 ¶ms.name,
482 |_| params.num_threads,
483 &num_threads,
484 )
485 .map_err(any_err)?;
486 }
487
488 Ok(())
489 })?,
490 )?;
491
492 kumo_mod.set(
493 "validation_failed",
494 lua.create_function(|_, ()| {
495 config::set_validation_failed();
496 Ok(())
497 })?,
498 )?;
499
500 kumo_mod.set(
501 "enable_memory_callstack_tracking",
502 lua.create_function(|_, enable: bool| {
503 kumo_server_memory::set_tracking_callstacks(enable);
504 Ok(())
505 })?,
506 )?;
507
508 Ok(())
509}