kumo_server_runtime/
lib.rsuse prometheus::IntGaugeVec;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock};
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
pub static RUNTIME: LazyLock<Runtime> =
LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
static PARKED_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
prometheus::register_int_gauge_vec!(
"thread_pool_parked",
"number of parked(idle) threads in a thread pool",
&["pool"]
)
.unwrap()
});
static NUM_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
prometheus::register_int_gauge_vec!(
"thread_pool_size",
"number of threads in a thread pool",
&["pool"]
)
.unwrap()
});
pub static MAIN_RUNTIME: std::sync::Mutex<Option<tokio::runtime::Handle>> =
std::sync::Mutex::new(None);
pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
MAIN_RUNTIME.lock().unwrap().replace(handle);
}
pub fn get_main_runtime() -> tokio::runtime::Handle {
MAIN_RUNTIME
.lock()
.unwrap()
.as_ref()
.map(|r| r.clone())
.unwrap()
}
static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
pub fn set_localset_threads(n: usize) {
LOCALSET_THREADS.store(n, Ordering::SeqCst);
}
pub struct Runtime {
tokio_runtime: tokio::runtime::Runtime,
n_threads: usize,
name_prefix: &'static str,
}
impl Drop for Runtime {
fn drop(&mut self) {
PARKED_THREADS.remove_label_values(&[self.name_prefix]).ok();
NUM_THREADS.remove_label_values(&[self.name_prefix]).ok();
}
}
impl Runtime {
pub fn new<F>(
name_prefix: &'static str,
default_size: F,
configured_size: &AtomicUsize,
) -> anyhow::Result<Self>
where
F: FnOnce(usize) -> usize,
{
let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
let n_threads = match std::env::var(env_name) {
Ok(n) => n.parse()?,
Err(_) => {
let configured = configured_size.load(Ordering::SeqCst);
if configured == 0 {
let cpus = std::thread::available_parallelism()?.get();
(default_size)(cpus).max(1)
} else {
configured
}
}
};
let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
num_threads.set(n_threads as i64);
let next_id = Arc::new(AtomicUsize::new(0));
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.worker_threads(n_threads)
.event_interval(
std::env::var("KUMOD_EVENT_INTERVAL")
.ok()
.and_then(|n| n.parse().ok())
.unwrap_or(61),
)
.max_io_events_per_tick(
std::env::var("KUMOD_IO_EVENTS_PER_TICK")
.ok()
.and_then(|n| n.parse().ok())
.unwrap_or(1024),
)
.on_thread_park({
let num_parked = num_parked.clone();
move || {
kumo_server_memory::purge_thread_cache();
num_parked.inc();
}
})
.thread_name_fn({
let name_prefix = name_prefix.to_string();
let next_id = next_id.clone();
move || {
let id = next_id.fetch_add(1, Ordering::SeqCst);
format!("{name_prefix}-{id}")
}
})
.max_blocking_threads(
std::env::var(format!(
"KUMOD_{}_MAX_BLOCKING_THREADS",
name_prefix.to_uppercase()
))
.ok()
.and_then(|n| n.parse().ok())
.unwrap_or(512),
)
.on_thread_unpark({
let num_parked = num_parked.clone();
move || {
num_parked.dec();
}
})
.build()?;
Ok(Self {
tokio_runtime,
n_threads,
name_prefix,
})
}
pub fn handle(&self) -> &tokio::runtime::Handle {
self.tokio_runtime.handle()
}
pub fn get_num_threads(&self) -> usize {
self.n_threads
}
pub fn spawn<FUT, N: AsRef<str>>(
&self,
name: N,
fut: FUT,
) -> std::io::Result<JoinHandle<FUT::Output>>
where
FUT: Future + Send + 'static,
FUT::Output: Send,
{
tokio::task::Builder::new()
.name(name.as_ref())
.spawn_on(fut, self.handle())
}
}
pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
where
FUT: Future + Send + 'static,
FUT::Output: Send,
{
tokio::task::Builder::new()
.name(name.as_ref())
.spawn_on(fut, RUNTIME.handle())
}
pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
where
FUT: Future + Send + 'static,
FUT::Output: Send,
{
tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
}
pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
N: AsRef<str>,
{
tokio::task::Builder::new()
.name(name.as_ref())
.spawn_blocking(func)
}
pub fn spawn_blocking_on<F, N, R>(
name: N,
func: F,
runtime: &Handle,
) -> std::io::Result<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
N: AsRef<str>,
{
tokio::task::Builder::new()
.name(name.as_ref())
.spawn_blocking_on(func, runtime)
}