1use config::{
2 any_err, decorate_callback_name, from_lua_value, get_or_create_module, load_config,
3 serialize_options, CallbackSignature,
4};
5use kumo_server_runtime::available_parallelism;
6use mlua::{Function, Lua, LuaSerdeExt, Value, Variadic};
7use mod_redis::RedisConnKey;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::AtomicUsize;
10
11pub mod config_handle;
12pub mod diagnostic_logging;
13pub mod disk_space;
14pub mod http_server;
15pub mod nodeid;
16pub mod panic;
17pub mod start;
18pub mod tls_helpers;
19
20pub fn register(lua: &Lua) -> anyhow::Result<()> {
21 for func in [
22 mod_redis::register,
23 data_loader::register,
24 mod_digest::register,
25 mod_encode::register,
26 cidr_map::register,
27 domain_map::register,
28 mod_amqp::register,
29 mod_filesystem::register,
30 mod_file_type::register,
31 mod_http::register,
32 mod_regex::register,
33 mod_serde::register,
34 mod_sqlite::register,
35 mod_crypto::register,
36 mod_string::register,
37 mod_time::register,
38 mod_dns_resolver::register,
39 mod_kafka::register,
40 mod_memoize::register,
41 mod_mimepart::register,
42 mod_mpsc::register,
43 mod_uuid::register,
44 kumo_api_types::shaping::register,
45 regex_set_map::register,
46 ] {
47 func(lua)?;
48 }
49
50 let kumo_mod = get_or_create_module(lua, "kumo")?;
51
52 fn event_registrar_name(name: &str) -> String {
53 format!("kumomta-event-registrars-{name}")
54 }
55
56 fn register_event_caller(lua: &Lua, name: &str) -> mlua::Result<()> {
59 let decorated_name = event_registrar_name(name);
60 let mut call_stack = vec![];
61 for n in 1.. {
62 match lua.inspect_stack(n, |info| {
63 let source = info.source();
64 format!(
65 "{}:{}",
66 source
67 .short_src
68 .as_ref()
69 .map(|b| b.to_string())
70 .unwrap_or_else(String::new),
71 info.current_line().unwrap_or(0)
72 )
73 }) {
74 Some(info) => {
75 call_stack.push(info);
76 }
77 None => break,
78 }
79 }
80
81 let tbl: Value = lua.named_registry_value(&decorated_name)?;
82 match tbl {
83 Value::Nil => {
84 let tbl = lua.create_table()?;
85 tbl.set(1, call_stack)?;
86 lua.set_named_registry_value(&decorated_name, tbl)?;
87 Ok(())
88 }
89 Value::Table(tbl) => {
90 let len = tbl.raw_len();
91 tbl.set(len + 1, call_stack)?;
92 Ok(())
93 }
94 _ => Err(mlua::Error::external(format!(
95 "registry key for {decorated_name} has invalid type",
96 ))),
97 }
98 }
99
100 kumo_mod.set(
103 "get_event_registrars",
104 lua.create_function(move |lua, name: String| {
105 let decorated_name = event_registrar_name(&name);
106 let value: Value = lua.named_registry_value(&decorated_name)?;
107 Ok(value)
108 })?,
109 )?;
110
111 kumo_mod.set(
112 "on",
113 lua.create_function(move |lua, (name, func): (String, Function)| {
114 let decorated_name = decorate_callback_name(&name);
115
116 if let Ok(current_event) = lua.globals().get::<String>("_KUMO_CURRENT_EVENT") {
117 if current_event != "main" {
118 return Err(mlua::Error::external(format!(
119 "Attempting to register an event handler via \
120 `kumo.on('{name}', ...)` from within the event handler \
121 '{current_event}'. You must move your event handler registration \
122 so that it is setup directly when the policy is loaded \
123 in order for it to consistently trigger and handle events."
124 )));
125 }
126 }
127
128 register_event_caller(lua, &name)?;
129
130 if config::does_callback_allow_multiple(&name) {
131 let tbl: Value = lua.named_registry_value(&decorated_name)?;
132 return match tbl {
133 Value::Nil => {
134 let tbl = lua.create_table()?;
135 tbl.set(1, func)?;
136 lua.set_named_registry_value(&decorated_name, tbl)?;
137 Ok(())
138 }
139 Value::Table(tbl) => {
140 let len = tbl.raw_len();
141 tbl.set(len + 1, func)?;
142 Ok(())
143 }
144 _ => Err(mlua::Error::external(format!(
145 "registry key for {decorated_name} has invalid type",
146 ))),
147 };
148 }
149
150 let existing: Value = lua.named_registry_value(&decorated_name)?;
151 match existing {
152 Value::Nil => {}
153 Value::Function(func) => {
154 let info = func.info();
155 let src = info.source.unwrap_or_else(|| "?".into());
156 let line = info.line_defined.unwrap_or(0);
157 return Err(mlua::Error::external(format!(
158 "{name} event already has a handler defined at {src}:{line}"
159 )));
160 }
161 _ => {
162 return Err(mlua::Error::external(format!(
163 "{name} event already has a handler"
164 )));
165 }
166 }
167
168 lua.set_named_registry_value(&decorated_name, func)?;
169 Ok(())
170 })?,
171 )?;
172
173 kumo_mod.set(
174 "set_diagnostic_log_filter",
175 lua.create_function(move |_, filter: String| {
176 diagnostic_logging::set_diagnostic_log_filter(&filter).map_err(any_err)
177 })?,
178 )?;
179
180 fn variadic_to_string(args: Variadic<Value>) -> String {
181 let mut output = String::new();
182 for (idx, item) in args.into_iter().enumerate() {
183 if idx > 0 {
184 output.push(' ');
185 }
186
187 match item {
188 Value::String(s) => match s.to_str() {
189 Ok(s) => output.push_str(&s),
190 Err(_) => {
191 let item = s.to_string_lossy();
192 output.push_str(&item);
193 }
194 },
195 item => match item.to_string() {
196 Ok(s) => output.push_str(&s),
197 Err(_) => output.push_str(&format!("{item:?}")),
198 },
199 }
200 }
201 output
202 }
203
204 fn get_caller(lua: &Lua) -> String {
205 match lua.inspect_stack(1, |info| {
206 let source = info.source();
207 let file_name = source
208 .short_src
209 .as_ref()
210 .map(|b| b.to_string())
211 .unwrap_or_else(String::new);
212 let file_name = match file_name.strip_prefix("[string \"") {
215 Some(name) => name.strip_suffix("\"]").unwrap_or(name),
216 None => &file_name,
217 };
218
219 format!("{file_name}:{}", info.current_line().unwrap_or(0))
220 }) {
221 Some(info) => info,
222 None => "?".to_string(),
223 }
224 }
225
226 kumo_mod.set(
227 "log_error",
228 lua.create_function(move |lua, args: Variadic<Value>| {
229 if tracing::event_enabled!(target: "lua", tracing::Level::ERROR) {
230 let src = get_caller(lua);
231 tracing::error!(target: "lua", "{src}: {}", variadic_to_string(args));
232 }
233 Ok(())
234 })?,
235 )?;
236 kumo_mod.set(
237 "log_info",
238 lua.create_function(move |lua, args: Variadic<Value>| {
239 if tracing::event_enabled!(target: "lua", tracing::Level::INFO) {
240 let src = get_caller(lua);
241 tracing::info!(target: "lua", "{src}: {}", variadic_to_string(args));
242 }
243 Ok(())
244 })?,
245 )?;
246 kumo_mod.set(
247 "log_warn",
248 lua.create_function(move |lua, args: Variadic<Value>| {
249 if tracing::event_enabled!(target: "lua", tracing::Level::WARN) {
250 let src = get_caller(lua);
251 tracing::warn!(target: "lua", "{src}: {}", variadic_to_string(args));
252 }
253 Ok(())
254 })?,
255 )?;
256 kumo_mod.set(
257 "log_debug",
258 lua.create_function(move |lua, args: Variadic<Value>| {
259 if tracing::event_enabled!(target: "lua", tracing::Level::DEBUG) {
260 let src = get_caller(lua);
261 tracing::debug!(target: "lua", "{src}: {}", variadic_to_string(args));
262 }
263 Ok(())
264 })?,
265 )?;
266
267 kumo_mod.set(
268 "set_max_spare_lua_contexts",
269 lua.create_function(move |_, limit: usize| {
270 config::set_max_spare(limit);
271 Ok(())
272 })?,
273 )?;
274
275 kumo_mod.set(
276 "set_max_lua_context_use_count",
277 lua.create_function(move |_, limit: usize| {
278 config::set_max_use(limit);
279 Ok(())
280 })?,
281 )?;
282
283 kumo_mod.set(
284 "set_max_lua_context_age",
285 lua.create_function(move |_, limit: usize| {
286 config::set_max_age(limit);
287 Ok(())
288 })?,
289 )?;
290
291 kumo_mod.set(
292 "set_lua_gc_on_put",
293 lua.create_function(move |_, enable: u8| {
294 config::set_gc_on_put(enable);
295 Ok(())
296 })?,
297 )?;
298
299 kumo_mod.set(
300 "set_lruttl_cache_capacity",
301 lua.create_function(move |_, (name, capacity): (String, usize)| {
302 if lruttl::set_cache_capacity(&name, capacity) {
303 Ok(())
304 } else {
305 Err(mlua::Error::external(format!(
306 "could not set capacity for cache {name} \
307 as that is not a pre-defined lruttl cache name"
308 )))
309 }
310 })?,
311 )?;
312
313 kumo_mod.set(
314 "set_config_monitor_globs",
315 lua.create_function(move |_, globs: Vec<String>| {
316 config::epoch::set_globs(globs).map_err(any_err)?;
317 Ok(())
318 })?,
319 )?;
320 kumo_mod.set(
321 "eval_config_monitor_globs",
322 lua.create_async_function(|_, _: ()| async move {
323 config::epoch::eval_globs().await.map_err(any_err)
324 })?,
325 )?;
326 kumo_mod.set(
327 "bump_config_epoch",
328 lua.create_function(move |_, _: ()| {
329 config::epoch::bump_current_epoch();
330 Ok(())
331 })?,
332 )?;
333
334 kumo_mod.set(
335 "available_parallelism",
336 lua.create_function(move |_, _: ()| available_parallelism().map_err(any_err))?,
337 )?;
338
339 kumo_mod.set(
340 "set_memory_hard_limit",
341 lua.create_function(move |_, limit: usize| {
342 kumo_server_memory::set_hard_limit(limit);
343 Ok(())
344 })?,
345 )?;
346
347 kumo_mod.set(
348 "set_memory_low_thresh",
349 lua.create_function(move |_, limit: usize| {
350 kumo_server_memory::set_low_memory_thresh(limit);
351 Ok(())
352 })?,
353 )?;
354
355 kumo_mod.set(
356 "set_memory_soft_limit",
357 lua.create_function(move |_, limit: usize| {
358 kumo_server_memory::set_soft_limit(limit);
359 Ok(())
360 })?,
361 )?;
362
363 kumo_mod.set(
364 "configure_redis_throttles",
365 lua.create_async_function(|lua, params: Value| async move {
366 let key: RedisConnKey = from_lua_value(&lua, params)?;
367 let conn = key.open().map_err(any_err)?;
368 conn.ping().await.map_err(any_err)?;
369 throttle::use_redis(conn).await.map_err(any_err)
370 })?,
371 )?;
372
373 kumo_mod.set(
374 "traceback",
375 lua.create_function(move |lua: &Lua, level: usize| {
376 #[derive(Debug, Serialize)]
377 struct Frame {
378 event: String,
379 name: Option<String>,
380 name_what: Option<String>,
381 source: Option<String>,
382 short_src: Option<String>,
383 line_defined: Option<usize>,
384 last_line_defined: Option<usize>,
385 what: &'static str,
386 curr_line: Option<usize>,
387 is_tail_call: bool,
388 }
389
390 let mut frames = vec![];
391 for n in level.. {
392 match lua.inspect_stack(n, |info| {
393 let source = info.source();
394 let names = info.names();
395 Frame {
396 curr_line: info.current_line(),
397 is_tail_call: info.is_tail_call(),
398 event: format!("{:?}", info.event()),
399 last_line_defined: source.last_line_defined,
400 line_defined: source.line_defined,
401 name: names.name.as_ref().map(|b| b.to_string()),
402 name_what: names.name_what.as_ref().map(|b| b.to_string()),
403 source: source.source.as_ref().map(|b| b.to_string()),
404 short_src: source.short_src.as_ref().map(|b| b.to_string()),
405 what: source.what,
406 }
407 }) {
408 Some(frame) => {
409 frames.push(frame);
410 }
411 None => break,
412 }
413 }
414
415 lua.to_value(&frames)
416 })?,
417 )?;
418
419 #[derive(Deserialize, Debug)]
422 struct TaskParams {
423 event_name: String,
424 args: Vec<serde_json::Value>,
425 }
426
427 impl TaskParams {
428 async fn run(&self) -> anyhow::Result<()> {
429 let mut config = load_config().await?;
430
431 let sig = CallbackSignature::<Value, ()>::new(self.event_name.to_string());
432
433 config
434 .convert_args_and_call_callback(&sig, &self.args)
435 .await?;
436
437 config.put();
438
439 Ok(())
440 }
441 }
442
443 kumo_mod.set(
444 "spawn_task",
445 lua.create_function(|lua, params: Value| {
446 let params: TaskParams = lua.from_value(params)?;
447
448 if !config::is_validating() {
449 std::thread::Builder::new()
450 .name(format!("spawned-task-{}", params.event_name))
451 .spawn(move || {
452 let runtime = tokio::runtime::Builder::new_current_thread()
453 .enable_io()
454 .enable_time()
455 .on_thread_park(kumo_server_memory::purge_thread_cache)
456 .build()
457 .unwrap();
458 let event_name = params.event_name.clone();
459
460 let result = runtime.block_on(async move { params.run().await });
461 if let Err(err) = result {
462 tracing::error!("Error while dispatching {event_name}: {err:#}");
463 }
464 })?;
465 }
466
467 Ok(())
468 })?,
469 )?;
470
471 kumo_mod.set(
472 "spawn_thread_pool",
473 lua.create_function(|lua, params: Value| {
474 #[derive(Deserialize, Debug)]
475 struct ThreadPoolParams {
476 name: String,
477 num_threads: usize,
478 }
479
480 let params: ThreadPoolParams = lua.from_value(params)?;
481 let num_threads = AtomicUsize::new(params.num_threads);
482
483 if !config::is_validating() {
484 let _runtime = kumo_server_runtime::Runtime::new(
488 ¶ms.name,
489 |_| params.num_threads,
490 &num_threads,
491 )
492 .map_err(any_err)?;
493 }
494
495 Ok(())
496 })?,
497 )?;
498
499 kumo_mod.set(
500 "validation_failed",
501 lua.create_function(|_, ()| {
502 config::set_validation_failed();
503 Ok(())
504 })?,
505 )?;
506
507 kumo_mod.set(
508 "enable_memory_callstack_tracking",
509 lua.create_function(|_, enable: bool| {
510 kumo_server_memory::set_tracking_callstacks(enable);
511 Ok(())
512 })?,
513 )?;
514
515 kumo_mod.set(
519 "prometheus_metrics",
520 lua.create_async_function(|lua, ()| async move {
521 use tokio_stream::StreamExt;
522 let mut json_text = String::new();
523 let mut stream = kumo_prometheus::registry::Registry::stream_json();
524 while let Some(text) = stream.next().await {
525 json_text.push_str(&text);
526 }
527 let value: serde_json::Value = serde_json::from_str(&json_text).map_err(any_err)?;
528 lua.to_value_with(&value, serialize_options())
529 })?,
530 )?;
531
532 Ok(())
533}