1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_file_type::register,
31 mod_http::register,
32 mod_regex::register,
33 mod_serde::register,
34 mod_sqlite::register,
35 mod_string::register,
36 mod_time::register,
37 mod_dns_resolver::register,
38 mod_kafka::register,
39 mod_memoize::register,
40 mod_mimepart::register,
41 mod_mpsc::register,
42 mod_uuid::register,
43 kumo_api_types::shaping::register,
44 regex_set_map::register,
45 ] {
46 func(lua)?;
47 }
48
49 let kumo_mod = get_or_create_module(lua, "kumo")?;
50
51 fn event_registrar_name(name: &str) -> String {
52 format!("kumomta-event-registrars-{name}")
53 }
54
55 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
58 let decorated_name = event_registrar_name(name);
59 let mut call_stack = vec![];
60 for n in 1.. {
61 match lua.inspect_stack(n, |info| {
62 let source = info.source();
63 format!(
64 "{}:{}",
65 source
66 .short_src
67 .as_ref()
68 .map(|b| b.to_string())
69 .unwrap_or_else(String::new),
70 info.current_line().unwrap_or(0)
71 )
72 }) {
73 Some(info) => {
74 call_stack.push(info);
75 }
76 None => break,
77 }
78 }
79
80 let tbl: Value = lua.named_registry_value(&decorated_name)?;
81 match tbl {
82 Value::Nil => {
83 let tbl = lua.create_table()?;
84 tbl.set(1, call_stack)?;
85 lua.set_named_registry_value(&decorated_name, tbl)?;
86 Ok(())
87 }
88 Value::Table(tbl) => {
89 let len = tbl.raw_len();
90 tbl.set(len + 1, call_stack)?;
91 Ok(())
92 }
93 _ => Err(mlua::Error::external(format!(
94 "registry key for {decorated_name} has invalid type",
95 ))),
96 }
97 }
98
99 kumo_mod.set(
102 "get_event_registrars",
103 lua.create_function(move |lua, name: String| {
104 let decorated_name = event_registrar_name(&name);
105 let value: Value = lua.named_registry_value(&decorated_name)?;
106 Ok(value)
107 })?,
108 )?;
109
110 kumo_mod.set(
111 "on",
112 lua.create_function(move |lua, (name, func): (String, Function)| {
113 let decorated_name = decorate_callback_name(&name);
114
115 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
116 if current_event != "main" {
117 return Err(mlua::Error::external(format!(
118 "Attempting to register an event handler via \
119 `kumo.on('{name}', ...)` from within the event handler \
120 '{current_event}'. You must move your event handler registration \
121 so that it is setup directly when the policy is loaded \
122 in order for it to consistently trigger and handle events."
123 )));
124 }
125 }
126
127 register_event_caller(lua, &name)?;
128
129 if config::does_callback_allow_multiple(&name) {
130 let tbl: Value = lua.named_registry_value(&decorated_name)?;
131 return match tbl {
132 Value::Nil => {
133 let tbl = lua.create_table()?;
134 tbl.set(1, func)?;
135 lua.set_named_registry_value(&decorated_name, tbl)?;
136 Ok(())
137 }
138 Value::Table(tbl) => {
139 let len = tbl.raw_len();
140 tbl.set(len + 1, func)?;
141 Ok(())
142 }
143 _ => Err(mlua::Error::external(format!(
144 "registry key for {decorated_name} has invalid type",
145 ))),
146 };
147 }
148
149 let existing: Value = lua.named_registry_value(&decorated_name)?;
150 match existing {
151 Value::Nil => {}
152 Value::Function(func) => {
153 let info = func.info();
154 let src = info.source.unwrap_or_else(|| "?".into());
155 let line = info.line_defined.unwrap_or(0);
156 return Err(mlua::Error::external(format!(
157 "{name} event already has a handler defined at {src}:{line}"
158 )));
159 }
160 _ => {
161 return Err(mlua::Error::external(format!(
162 "{name} event already has a handler"
163 )));
164 }
165 }
166
167 lua.set_named_registry_value(&decorated_name, func)?;
168 Ok(())
169 })?,
170 )?;
171
172 kumo_mod.set(
173 "set_diagnostic_log_filter",
174 lua.create_function(move |_, filter: String| {
175 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
176 })?,
177 )?;
178
179 fn variadic_to_string(args: Variadic<Value>) -> String {
180 let mut output = String::new();
181 for (idx, item) in args.into_iter().enumerate() {
182 if idx > 0 {
183 output.push(' ');
184 }
185
186 match item {
187 Value::String(s) => match s.to_str() {
188 Ok(s) => output.push_str(&s),
189 Err(_) => {
190 let item = s.to_string_lossy();
191 output.push_str(&item);
192 }
193 },
194 item => match item.to_string() {
195 Ok(s) => output.push_str(&s),
196 Err(_) => output.push_str(&format!("{item:?}")),
197 },
198 }
199 }
200 output
201 }
202
203 fn get_caller(lua: &Lua) -> String {
204 match lua.inspect_stack(1, |info| {
205 let source = info.source();
206 let file_name = source
207 .short_src
208 .as_ref()
209 .map(|b| b.to_string())
210 .unwrap_or_else(String::new);
211 let file_name = match file_name.strip_prefix("[string \"") {
214 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
215 None => &file_name,
216 };
217
218 format!("{file_name}:{}", info.current_line().unwrap_or(0))
219 }) {
220 Some(info) => info,
221 None => "?".to_string(),
222 }
223 }
224
225 kumo_mod.set(
226 "log_error",
227 lua.create_function(move |lua, args: Variadic<Value>| {
228 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
229 let src = get_caller(lua);
230 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
231 }
232 Ok(())
233 })?,
234 )?;
235 kumo_mod.set(
236 "log_info",
237 lua.create_function(move |lua, args: Variadic<Value>| {
238 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
239 let src = get_caller(lua);
240 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
241 }
242 Ok(())
243 })?,
244 )?;
245 kumo_mod.set(
246 "log_warn",
247 lua.create_function(move |lua, args: Variadic<Value>| {
248 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
249 let src = get_caller(lua);
250 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
251 }
252 Ok(())
253 })?,
254 )?;
255 kumo_mod.set(
256 "log_debug",
257 lua.create_function(move |lua, args: Variadic<Value>| {
258 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
259 let src = get_caller(lua);
260 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
261 }
262 Ok(())
263 })?,
264 )?;
265
266 kumo_mod.set(
267 "set_max_spare_lua_contexts",
268 lua.create_function(move |_, limit: usize| {
269 config::set_max_spare(limit);
270 Ok(())
271 })?,
272 )?;
273
274 kumo_mod.set(
275 "set_max_lua_context_use_count",
276 lua.create_function(move |_, limit: usize| {
277 config::set_max_use(limit);
278 Ok(())
279 })?,
280 )?;
281
282 kumo_mod.set(
283 "set_max_lua_context_age",
284 lua.create_function(move |_, limit: usize| {
285 config::set_max_age(limit);
286 Ok(())
287 })?,
288 )?;
289
290 kumo_mod.set(
291 "set_lua_gc_on_put",
292 lua.create_function(move |_, enable: u8| {
293 config::set_gc_on_put(enable);
294 Ok(())
295 })?,
296 )?;
297
298 kumo_mod.set(
299 "set_lruttl_cache_capacity",
300 lua.create_function(move |_, (name, capacity): (String, usize)| {
301 if lruttl::set_cache_capacity(&name, capacity) {
302 Ok(())
303 } else {
304 Err(mlua::Error::external(format!(
305 "could not set capacity for cache {name} \
306 as that is not a pre-defined lruttl cache name"
307 )))
308 }
309 })?,
310 )?;
311
312 kumo_mod.set(
313 "set_config_monitor_globs",
314 lua.create_function(move |_, globs: Vec<String>| {
315 config::epoch::set_globs(globs).map_err(any_err)?;
316 Ok(())
317 })?,
318 )?;
319 kumo_mod.set(
320 "eval_config_monitor_globs",
321 lua.create_async_function(|_, _: ()| async move {
322 config::epoch::eval_globs().await.map_err(any_err)
323 })?,
324 )?;
325 kumo_mod.set(
326 "bump_config_epoch",
327 lua.create_function(move |_, _: ()| {
328 config::epoch::bump_current_epoch();
329 Ok(())
330 })?,
331 )?;
332
333 kumo_mod.set(
334 "available_parallelism",
335 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
336 )?;
337
338 kumo_mod.set(
339 "set_memory_hard_limit",
340 lua.create_function(move |_, limit: usize| {
341 kumo_server_memory::set_hard_limit(limit);
342 Ok(())
343 })?,
344 )?;
345
346 kumo_mod.set(
347 "set_memory_low_thresh",
348 lua.create_function(move |_, limit: usize| {
349 kumo_server_memory::set_low_memory_thresh(limit);
350 Ok(())
351 })?,
352 )?;
353
354 kumo_mod.set(
355 "set_memory_soft_limit",
356 lua.create_function(move |_, limit: usize| {
357 kumo_server_memory::set_soft_limit(limit);
358 Ok(())
359 })?,
360 )?;
361
362 kumo_mod.set(
363 "configure_redis_throttles",
364 lua.create_async_function(|lua, params: Value| async move {
365 let key: RedisConnKey = from_lua_value(&lua, params)?;
366 let conn = key.open().map_err(any_err)?;
367 conn.ping().await.map_err(any_err)?;
368 throttle::use_redis(conn).await.map_err(any_err)
369 })?,
370 )?;
371
372 kumo_mod.set(
373 "traceback",
374 lua.create_function(move |lua: &Lua, level: usize| {
375 #[derive(Debug, Serialize)]
376 struct Frame {
377 event: String,
378 name: Option<String>,
379 name_what: Option<String>,
380 source: Option<String>,
381 short_src: Option<String>,
382 line_defined: Option<usize>,
383 last_line_defined: Option<usize>,
384 what: &'static str,
385 curr_line: Option<usize>,
386 is_tail_call: bool,
387 }
388
389 let mut frames = vec![];
390 for n in level.. {
391 match lua.inspect_stack(n, |info| {
392 let source = info.source();
393 let names = info.names();
394 Frame {
395 curr_line: info.current_line(),
396 is_tail_call: info.is_tail_call(),
397 event: format!("{:?}", info.event()),
398 last_line_defined: source.last_line_defined,
399 line_defined: source.line_defined,
400 name: names.name.as_ref().map(|b| b.to_string()),
401 name_what: names.name_what.as_ref().map(|b| b.to_string()),
402 source: source.source.as_ref().map(|b| b.to_string()),
403 short_src: source.short_src.as_ref().map(|b| b.to_string()),
404 what: source.what,
405 }
406 }) {
407 Some(frame) => {
408 frames.push(frame);
409 }
410 None => break,
411 }
412 }
413
414 lua.to_value(&frames)
415 })?,
416 )?;
417
418 #[derive(Deserialize, Debug)]
421 struct TaskParams {
422 event_name: String,
423 args: Vec<serde_json::Value>,
424 }
425
426 impl TaskParams {
427 async fn run(&self) -> anyhow::Result<()> {
428 let mut config = load_config().await?;
429
430 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
431
432 config
433 .convert_args_and_call_callback(&sig, &self.args)
434 .await?;
435
436 config.put();
437
438 Ok(())
439 }
440 }
441
442 kumo_mod.set(
443 "spawn_task",
444 lua.create_function(|lua, params: Value| {
445 let params: TaskParams = lua.from_value(params)?;
446
447 if !config::is_validating() {
448 std::thread::Builder::new()
449 .name(format!("spawned-task-{}", params.event_name))
450 .spawn(move || {
451 let runtime = tokio::runtime::Builder::new_current_thread()
452 .enable_io()
453 .enable_time()
454 .on_thread_park(kumo_server_memory::purge_thread_cache)
455 .build()
456 .unwrap();
457 let event_name = params.event_name.clone();
458
459 let result = runtime.block_on(async move { params.run().await });
460 if let Err(err) = result {
461 tracing::error!("Error while dispatching {event_name}: {err:#}");
462 }
463 })?;
464 }
465
466 Ok(())
467 })?,
468 )?;
469
470 kumo_mod.set(
471 "spawn_thread_pool",
472 lua.create_function(|lua, params: Value| {
473 #[derive(Deserialize, Debug)]
474 struct ThreadPoolParams {
475 name: String,
476 num_threads: usize,
477 }
478
479 let params: ThreadPoolParams = lua.from_value(params)?;
480 let num_threads = AtomicUsize::new(params.num_threads);
481
482 if !config::is_validating() {
483 let _runtime = kumo_server_runtime::Runtime::new(
487 ¶ms.name,
488 |_| params.num_threads,
489 &num_threads,
490 )
491 .map_err(any_err)?;
492 }
493
494 Ok(())
495 })?,
496 )?;
497
498 kumo_mod.set(
499 "validation_failed",
500 lua.create_function(|_, ()| {
501 config::set_validation_failed();
502 Ok(())
503 })?,
504 )?;
505
506 kumo_mod.set(
507 "enable_memory_callstack_tracking",
508 lua.create_function(|_, enable: bool| {
509 kumo_server_memory::set_tracking_callstacks(enable);
510 Ok(())
511 })?,
512 )?;
513
514 Ok(())
515}