1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_http::register,
31 mod_regex::register,
32 mod_serde::register,
33 mod_sqlite::register,
34 mod_string::register,
35 mod_time::register,
36 mod_dns_resolver::register,
37 mod_kafka::register,
38 mod_memoize::register,
39 mod_mimepart::register,
40 mod_mpsc::register,
41 mod_uuid::register,
42 kumo_api_types::shaping::register,
43 regex_set_map::register,
44 ] {
45 func(lua)?;
46 }
47
48 let kumo_mod = get_or_create_module(lua, "kumo")?;
49
50 fn event_registrar_name(name: &str) -> String {
51 format!("kumomta-event-registrars-{name}")
52 }
53
54 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
57 let decorated_name = event_registrar_name(name);
58 let mut call_stack = vec![];
59 for n in 1.. {
60 match lua.inspect_stack(n) {
61 Some(info) => {
62 let source = info.source();
63 call_stack.push(format!(
64 "{}:{}",
65 source
66 .short_src
67 .as_ref()
68 .map(|b| b.to_string())
69 .unwrap_or_else(String::new),
70 info.curr_line()
71 ));
72 }
73 None => break,
74 }
75 }
76
77 let tbl: Value = lua.named_registry_value(&decorated_name)?;
78 match tbl {
79 Value::Nil => {
80 let tbl = lua.create_table()?;
81 tbl.set(1, call_stack)?;
82 lua.set_named_registry_value(&decorated_name, tbl)?;
83 Ok(())
84 }
85 Value::Table(tbl) => {
86 let len = tbl.raw_len();
87 tbl.set(len + 1, call_stack)?;
88 Ok(())
89 }
90 _ => Err(mlua::Error::external(format!(
91 "registry key for {decorated_name} has invalid type",
92 ))),
93 }
94 }
95
96 kumo_mod.set(
99 "get_event_registrars",
100 lua.create_function(move |lua, name: String| {
101 let decorated_name = event_registrar_name(&name);
102 let value: Value = lua.named_registry_value(&decorated_name)?;
103 Ok(value)
104 })?,
105 )?;
106
107 kumo_mod.set(
108 "on",
109 lua.create_function(move |lua, (name, func): (String, Function)| {
110 let decorated_name = decorate_callback_name(&name);
111
112 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
113 if current_event != "main" {
114 return Err(mlua::Error::external(format!(
115 "Attempting to register an event handler via \
116 `kumo.on('{name}', ...)` from within the event handler \
117 '{current_event}'. You must move your event handler registration \
118 so that it is setup directly when the policy is loaded \
119 in order for it to consistently trigger and handle events."
120 )));
121 }
122 }
123
124 register_event_caller(lua, &name)?;
125
126 if config::does_callback_allow_multiple(&name) {
127 let tbl: Value = lua.named_registry_value(&decorated_name)?;
128 return match tbl {
129 Value::Nil => {
130 let tbl = lua.create_table()?;
131 tbl.set(1, func)?;
132 lua.set_named_registry_value(&decorated_name, tbl)?;
133 Ok(())
134 }
135 Value::Table(tbl) => {
136 let len = tbl.raw_len();
137 tbl.set(len + 1, func)?;
138 Ok(())
139 }
140 _ => Err(mlua::Error::external(format!(
141 "registry key for {decorated_name} has invalid type",
142 ))),
143 };
144 }
145
146 let existing: Value = lua.named_registry_value(&decorated_name)?;
147 match existing {
148 Value::Nil => {}
149 Value::Function(func) => {
150 let info = func.info();
151 let src = info.source.unwrap_or_else(|| "?".into());
152 let line = info.line_defined.unwrap_or(0);
153 return Err(mlua::Error::external(format!(
154 "{name} event already has a handler defined at {src}:{line}"
155 )));
156 }
157 _ => {
158 return Err(mlua::Error::external(format!(
159 "{name} event already has a handler"
160 )));
161 }
162 }
163
164 lua.set_named_registry_value(&decorated_name, func)?;
165 Ok(())
166 })?,
167 )?;
168
169 kumo_mod.set(
170 "set_diagnostic_log_filter",
171 lua.create_function(move |_, filter: String| {
172 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
173 })?,
174 )?;
175
176 fn variadic_to_string(args: Variadic<Value>) -> String {
177 let mut output = String::new();
178 for (idx, item) in args.into_iter().enumerate() {
179 if idx > 0 {
180 output.push(' ');
181 }
182
183 match item {
184 Value::String(s) => match s.to_str() {
185 Ok(s) => output.push_str(&s),
186 Err(_) => {
187 let item = s.to_string_lossy();
188 output.push_str(&item);
189 }
190 },
191 item => match item.to_string() {
192 Ok(s) => output.push_str(&s),
193 Err(_) => output.push_str(&format!("{item:?}")),
194 },
195 }
196 }
197 output
198 }
199
200 fn get_caller(lua: &Lua) -> String {
201 match lua.inspect_stack(1) {
202 Some(info) => {
203 let source = info.source();
204 let file_name = source
205 .short_src
206 .as_ref()
207 .map(|b| b.to_string())
208 .unwrap_or_else(String::new);
209 let file_name = match file_name.strip_prefix("[string \"") {
212 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
213 None => &file_name,
214 };
215
216 format!("{file_name}:{}", info.curr_line())
217 }
218 None => "?".to_string(),
219 }
220 }
221
222 kumo_mod.set(
223 "log_error",
224 lua.create_function(move |lua, args: Variadic<Value>| {
225 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
226 let src = get_caller(lua);
227 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
228 }
229 Ok(())
230 })?,
231 )?;
232 kumo_mod.set(
233 "log_info",
234 lua.create_function(move |lua, args: Variadic<Value>| {
235 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
236 let src = get_caller(lua);
237 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
238 }
239 Ok(())
240 })?,
241 )?;
242 kumo_mod.set(
243 "log_warn",
244 lua.create_function(move |lua, args: Variadic<Value>| {
245 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
246 let src = get_caller(lua);
247 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
248 }
249 Ok(())
250 })?,
251 )?;
252 kumo_mod.set(
253 "log_debug",
254 lua.create_function(move |lua, args: Variadic<Value>| {
255 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
256 let src = get_caller(lua);
257 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
258 }
259 Ok(())
260 })?,
261 )?;
262
263 kumo_mod.set(
264 "set_max_spare_lua_contexts",
265 lua.create_function(move |_, limit: usize| {
266 config::set_max_spare(limit);
267 Ok(())
268 })?,
269 )?;
270
271 kumo_mod.set(
272 "set_max_lua_context_use_count",
273 lua.create_function(move |_, limit: usize| {
274 config::set_max_use(limit);
275 Ok(())
276 })?,
277 )?;
278
279 kumo_mod.set(
280 "set_max_lua_context_age",
281 lua.create_function(move |_, limit: usize| {
282 config::set_max_age(limit);
283 Ok(())
284 })?,
285 )?;
286
287 kumo_mod.set(
288 "set_lua_gc_on_put",
289 lua.create_function(move |_, enable: u8| {
290 config::set_gc_on_put(enable);
291 Ok(())
292 })?,
293 )?;
294
295 kumo_mod.set(
296 "set_lruttl_cache_capacity",
297 lua.create_function(move |_, (name, capacity): (String, usize)| {
298 if lruttl::set_cache_capacity(&name, capacity) {
299 Ok(())
300 } else {
301 Err(mlua::Error::external(format!(
302 "could not set capacity for cache {name} \
303 as that is not a pre-defined lruttl cache name"
304 )))
305 }
306 })?,
307 )?;
308
309 kumo_mod.set(
310 "set_config_monitor_globs",
311 lua.create_function(move |_, globs: Vec<String>| {
312 config::epoch::set_globs(globs).map_err(any_err)?;
313 Ok(())
314 })?,
315 )?;
316 kumo_mod.set(
317 "eval_config_monitor_globs",
318 lua.create_async_function(|_, _: ()| async move {
319 config::epoch::eval_globs().await.map_err(any_err)
320 })?,
321 )?;
322 kumo_mod.set(
323 "bump_config_epoch",
324 lua.create_function(move |_, _: ()| {
325 config::epoch::bump_current_epoch();
326 Ok(())
327 })?,
328 )?;
329
330 kumo_mod.set(
331 "available_parallelism",
332 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
333 )?;
334
335 kumo_mod.set(
336 "set_memory_hard_limit",
337 lua.create_function(move |_, limit: usize| {
338 kumo_server_memory::set_hard_limit(limit);
339 Ok(())
340 })?,
341 )?;
342
343 kumo_mod.set(
344 "set_memory_low_thresh",
345 lua.create_function(move |_, limit: usize| {
346 kumo_server_memory::set_low_memory_thresh(limit);
347 Ok(())
348 })?,
349 )?;
350
351 kumo_mod.set(
352 "set_memory_soft_limit",
353 lua.create_function(move |_, limit: usize| {
354 kumo_server_memory::set_soft_limit(limit);
355 Ok(())
356 })?,
357 )?;
358
359 kumo_mod.set(
360 "configure_redis_throttles",
361 lua.create_async_function(|lua, params: Value| async move {
362 let key: RedisConnKey = from_lua_value(&lua, params)?;
363 let conn = key.open().map_err(any_err)?;
364 conn.ping().await.map_err(any_err)?;
365 throttle::use_redis(conn).await.map_err(any_err)
366 })?,
367 )?;
368
369 kumo_mod.set(
370 "traceback",
371 lua.create_function(move |lua: &Lua, level: usize| {
372 #[derive(Debug, Serialize)]
373 struct Frame {
374 event: String,
375 name: Option<String>,
376 name_what: Option<String>,
377 source: Option<String>,
378 short_src: Option<String>,
379 line_defined: Option<usize>,
380 last_line_defined: Option<usize>,
381 what: &'static str,
382 curr_line: i32,
383 is_tail_call: bool,
384 }
385
386 let mut frames = vec![];
387 for n in level.. {
388 match lua.inspect_stack(n) {
389 Some(info) => {
390 let source = info.source();
391 let names = info.names();
392 frames.push(Frame {
393 curr_line: info.curr_line(),
394 is_tail_call: info.is_tail_call(),
395 event: format!("{:?}", info.event()),
396 last_line_defined: source.last_line_defined,
397 line_defined: source.line_defined,
398 name: names.name.as_ref().map(|b| b.to_string()),
399 name_what: names.name_what.as_ref().map(|b| b.to_string()),
400 source: source.source.as_ref().map(|b| b.to_string()),
401 short_src: source.short_src.as_ref().map(|b| b.to_string()),
402 what: source.what,
403 });
404 }
405 None => break,
406 }
407 }
408
409 lua.to_value(&frames)
410 })?,
411 )?;
412
413 #[derive(Deserialize, Debug)]
416 struct TaskParams {
417 event_name: String,
418 args: Vec<serde_json::Value>,
419 }
420
421 impl TaskParams {
422 async fn run(&self) -> anyhow::Result<()> {
423 let mut config = load_config().await?;
424
425 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
426
427 config
428 .convert_args_and_call_callback(&sig, &self.args)
429 .await?;
430
431 config.put();
432
433 Ok(())
434 }
435 }
436
437 kumo_mod.set(
438 "spawn_task",
439 lua.create_function(|lua, params: Value| {
440 let params: TaskParams = lua.from_value(params)?;
441
442 if !config::is_validating() {
443 std::thread::Builder::new()
444 .name(format!("spawned-task-{}", params.event_name))
445 .spawn(move || {
446 let runtime = tokio::runtime::Builder::new_current_thread()
447 .enable_io()
448 .enable_time()
449 .on_thread_park(kumo_server_memory::purge_thread_cache)
450 .build()
451 .unwrap();
452 let event_name = params.event_name.clone();
453
454 let result = runtime.block_on(async move { params.run().await });
455 if let Err(err) = result {
456 tracing::error!("Error while dispatching {event_name}: {err:#}");
457 }
458 })?;
459 }
460
461 Ok(())
462 })?,
463 )?;
464
465 kumo_mod.set(
466 "spawn_thread_pool",
467 lua.create_function(|lua, params: Value| {
468 #[derive(Deserialize, Debug)]
469 struct ThreadPoolParams {
470 name: String,
471 num_threads: usize,
472 }
473
474 let params: ThreadPoolParams = lua.from_value(params)?;
475 let num_threads = AtomicUsize::new(params.num_threads);
476
477 if !config::is_validating() {
478 let _runtime = kumo_server_runtime::Runtime::new(
482 ¶ms.name,
483 |_| params.num_threads,
484 &num_threads,
485 )
486 .map_err(any_err)?;
487 }
488
489 Ok(())
490 })?,
491 )?;
492
493 kumo_mod.set(
494 "validation_failed",
495 lua.create_function(|_, ()| {
496 config::set_validation_failed();
497 Ok(())
498 })?,
499 )?;
500
501 kumo_mod.set(
502 "enable_memory_callstack_tracking",
503 lua.create_function(|_, enable: bool| {
504 kumo_server_memory::set_tracking_callstacks(enable);
505 Ok(())
506 })?,
507 )?;
508
509 Ok(())
510}