kumo_server_runtime/
lib.rs

1use anyhow::Context;
2use parking_lot::Mutex;
3use prometheus::IntGaugeVec;
4use std::collections::HashMap;
5use std::future::Future;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, LazyLock};
8use tokio::runtime::Handle;
9use tokio::task::JoinHandle;
10
11pub static RUNTIME: LazyLock<Runtime> =
12    LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
13static PARKED_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
14    prometheus::register_int_gauge_vec!(
15        "thread_pool_parked",
16        "number of parked(idle) threads in a thread pool",
17        &["pool"]
18    )
19    .unwrap()
20});
21static NUM_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
22    prometheus::register_int_gauge_vec!(
23        "thread_pool_size",
24        "number of threads in a thread pool",
25        &["pool"]
26    )
27    .unwrap()
28});
29
30static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
31    LazyLock::new(|| Mutex::new(HashMap::new()));
32
33pub fn get_named_runtime(name: &str) -> Option<Runtime> {
34    RUNTIMES.lock().get(name).cloned()
35}
36
37pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
38
39pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
40    MAIN_RUNTIME.lock().replace(handle);
41}
42
43pub fn get_main_runtime() -> tokio::runtime::Handle {
44    MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
45}
46
47static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
48
49pub fn set_localset_threads(n: usize) {
50    LOCALSET_THREADS.store(n, Ordering::SeqCst);
51}
52
53struct RuntimeInner {
54    tokio_runtime: tokio::runtime::Runtime,
55    n_threads: usize,
56    name_prefix: String,
57}
58
59#[derive(Clone)]
60pub struct Runtime {
61    inner: Arc<RuntimeInner>,
62}
63
64impl Drop for RuntimeInner {
65    fn drop(&mut self) {
66        PARKED_THREADS
67            .remove_label_values(&[&self.name_prefix])
68            .ok();
69        NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
70    }
71}
72
73impl Runtime {
74    pub fn new<F>(
75        name_prefix: &str,
76        default_size: F,
77        configured_size: &AtomicUsize,
78    ) -> anyhow::Result<Self>
79    where
80        F: FnOnce(usize) -> usize,
81    {
82        let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
83        let n_threads = match std::env::var(env_name) {
84            Ok(n) => n.parse()?,
85            Err(_) => {
86                let configured = configured_size.load(Ordering::SeqCst);
87                if configured == 0 {
88                    let cpus = available_parallelism()?;
89                    (default_size)(cpus).max(1)
90                } else {
91                    configured
92                }
93            }
94        };
95
96        let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
97        let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
98        num_threads.set(n_threads as i64);
99
100        let next_id = Arc::new(AtomicUsize::new(0));
101
102        let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
103            .enable_io()
104            .enable_time()
105            .worker_threads(n_threads)
106            .event_interval(
107                std::env::var("KUMOD_EVENT_INTERVAL")
108                    .ok()
109                    .and_then(|n| n.parse().ok())
110                    .unwrap_or(61),
111            )
112            .max_io_events_per_tick(
113                std::env::var("KUMOD_IO_EVENTS_PER_TICK")
114                    .ok()
115                    .and_then(|n| n.parse().ok())
116                    .unwrap_or(1024),
117            )
118            .on_thread_park({
119                let num_parked = num_parked.clone();
120                move || {
121                    kumo_server_memory::purge_thread_cache();
122                    num_parked.inc();
123                }
124            })
125            .thread_name_fn({
126                let name_prefix = name_prefix.to_string();
127                let next_id = next_id.clone();
128                move || {
129                    let id = next_id.fetch_add(1, Ordering::SeqCst);
130                    format!("{name_prefix}-{id}")
131                }
132            })
133            .max_blocking_threads(
134                std::env::var(format!(
135                    "KUMOD_{}_MAX_BLOCKING_THREADS",
136                    name_prefix.to_uppercase()
137                ))
138                .ok()
139                .and_then(|n| n.parse().ok())
140                .unwrap_or(512),
141            )
142            .on_thread_unpark({
143                let num_parked = num_parked.clone();
144                move || {
145                    num_parked.dec();
146                }
147            })
148            .build()?;
149
150        let runtime = Self {
151            inner: Arc::new(RuntimeInner {
152                tokio_runtime,
153                n_threads,
154                name_prefix: name_prefix.to_string(),
155            }),
156        };
157
158        let mut runtimes = RUNTIMES.lock();
159        if runtimes.contains_key(name_prefix) {
160            anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
161        }
162
163        runtimes.insert(name_prefix.to_string(), runtime.clone());
164
165        Ok(runtime)
166    }
167
168    pub fn handle(&self) -> &tokio::runtime::Handle {
169        self.inner.tokio_runtime.handle()
170    }
171
172    pub fn get_num_threads(&self) -> usize {
173        self.inner.n_threads
174    }
175
176    /// Spawn a future into this runtime
177    pub fn spawn<FUT, N: AsRef<str>>(
178        &self,
179        name: N,
180        fut: FUT,
181    ) -> std::io::Result<JoinHandle<FUT::Output>>
182    where
183        FUT: Future + Send + 'static,
184        FUT::Output: Send,
185    {
186        tokio::task::Builder::new()
187            .name(name.as_ref())
188            .spawn_on(fut, self.handle())
189    }
190}
191
192/// Schedule func to run in the main runtime pool,
193/// which is named "localset" for legacy reasons.
194pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
195where
196    FUT: Future + Send + 'static,
197    FUT::Output: Send,
198{
199    tokio::task::Builder::new()
200        .name(name.as_ref())
201        .spawn_on(fut, RUNTIME.handle())
202}
203
204/// Spawn a future as a task with a name.
205/// The task is spawned into the current tokio runtime.
206pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
207where
208    FUT: Future + Send + 'static,
209    FUT::Output: Send,
210{
211    tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
212}
213
214/// Run a blocking function in the worker thread pool associated
215/// with the current tokio runtime.
216pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
217where
218    F: FnOnce() -> R + Send + 'static,
219    R: Send + 'static,
220    N: AsRef<str>,
221{
222    tokio::task::Builder::new()
223        .name(name.as_ref())
224        .spawn_blocking(func)
225}
226
227/// Run a blocking function in the worker thread pool associated
228/// with the provided tokio runtime.
229pub fn spawn_blocking_on<F, N, R>(
230    name: N,
231    func: F,
232    runtime: &Handle,
233) -> std::io::Result<JoinHandle<R>>
234where
235    F: FnOnce() -> R + Send + 'static,
236    R: Send + 'static,
237    N: AsRef<str>,
238{
239    tokio::task::Builder::new()
240        .name(name.as_ref())
241        .spawn_blocking_on(func, runtime)
242}
243
244pub fn available_parallelism() -> anyhow::Result<usize> {
245    match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
246        Ok(n) => n
247            .parse()
248            .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
249        Err(_) => Ok(std::thread::available_parallelism()
250            .context("failed to get available_parallelism")?
251            .get()),
252    }
253}