kumo_server_runtime/
lib.rs

1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::future::Future;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, LazyLock};
8use tokio::runtime::Handle;
9use tokio::task::JoinHandle;
10
11pub static RUNTIME: LazyLock<Runtime> =
12    LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
13
14declare_metric! {
15/// number of parked (idle) threads in a thread pool
16static PARKED_THREADS: IntGaugeVec(
17        "thread_pool_parked",
18        &["pool"]);
19}
20
21declare_metric! {
22/// number of threads in a thread pool
23static NUM_THREADS: IntGaugeVec(
24        "thread_pool_size",
25        &["pool"]);
26}
27
28static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
29    LazyLock::new(|| Mutex::new(HashMap::new()));
30
31pub fn get_named_runtime(name: &str) -> Option<Runtime> {
32    RUNTIMES.lock().get(name).cloned()
33}
34
35pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
36
37pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
38    MAIN_RUNTIME.lock().replace(handle);
39}
40
41pub fn get_main_runtime() -> tokio::runtime::Handle {
42    MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
43}
44
45static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
46
47pub fn set_localset_threads(n: usize) {
48    LOCALSET_THREADS.store(n, Ordering::SeqCst);
49}
50
51struct RuntimeInner {
52    tokio_runtime: tokio::runtime::Runtime,
53    n_threads: usize,
54    name_prefix: String,
55}
56
57#[derive(Clone)]
58pub struct Runtime {
59    inner: Arc<RuntimeInner>,
60}
61
62impl Drop for RuntimeInner {
63    fn drop(&mut self) {
64        PARKED_THREADS
65            .remove_label_values(&[&self.name_prefix])
66            .ok();
67        NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
68    }
69}
70
71impl Runtime {
72    pub fn new<F>(
73        name_prefix: &str,
74        default_size: F,
75        configured_size: &AtomicUsize,
76    ) -> anyhow::Result<Self>
77    where
78        F: FnOnce(usize) -> usize,
79    {
80        let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
81        let n_threads = match std::env::var(env_name) {
82            Ok(n) => n.parse()?,
83            Err(_) => {
84                let configured = configured_size.load(Ordering::SeqCst);
85                if configured == 0 {
86                    let cpus = available_parallelism()?;
87                    (default_size)(cpus).max(1)
88                } else {
89                    configured
90                }
91            }
92        };
93
94        let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
95        let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
96        num_threads.set(n_threads as i64);
97
98        let next_id = Arc::new(AtomicUsize::new(0));
99
100        let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
101            .enable_io()
102            .enable_time()
103            .worker_threads(n_threads)
104            .event_interval(
105                std::env::var("KUMOD_EVENT_INTERVAL")
106                    .ok()
107                    .and_then(|n| n.parse().ok())
108                    .unwrap_or(61),
109            )
110            .max_io_events_per_tick(
111                std::env::var("KUMOD_IO_EVENTS_PER_TICK")
112                    .ok()
113                    .and_then(|n| n.parse().ok())
114                    .unwrap_or(1024),
115            )
116            .on_thread_park({
117                let num_parked = num_parked.clone();
118                move || {
119                    kumo_server_memory::purge_thread_cache();
120                    num_parked.inc();
121                }
122            })
123            .thread_name_fn({
124                let name_prefix = name_prefix.to_string();
125                let next_id = next_id.clone();
126                move || {
127                    let id = next_id.fetch_add(1, Ordering::SeqCst);
128                    format!("{name_prefix}-{id}")
129                }
130            })
131            .max_blocking_threads(
132                std::env::var(format!(
133                    "KUMOD_{}_MAX_BLOCKING_THREADS",
134                    name_prefix.to_uppercase()
135                ))
136                .ok()
137                .and_then(|n| n.parse().ok())
138                .unwrap_or(512),
139            )
140            .on_thread_unpark({
141                let num_parked = num_parked.clone();
142                move || {
143                    num_parked.dec();
144                }
145            })
146            .build()?;
147
148        let runtime = Self {
149            inner: Arc::new(RuntimeInner {
150                tokio_runtime,
151                n_threads,
152                name_prefix: name_prefix.to_string(),
153            }),
154        };
155
156        let mut runtimes = RUNTIMES.lock();
157        if runtimes.contains_key(name_prefix) {
158            anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
159        }
160
161        runtimes.insert(name_prefix.to_string(), runtime.clone());
162
163        Ok(runtime)
164    }
165
166    pub fn handle(&self) -> &tokio::runtime::Handle {
167        self.inner.tokio_runtime.handle()
168    }
169
170    pub fn get_num_threads(&self) -> usize {
171        self.inner.n_threads
172    }
173
174    /// Spawn a future into this runtime
175    pub fn spawn<FUT, N: AsRef<str>>(
176        &self,
177        name: N,
178        fut: FUT,
179    ) -> std::io::Result<JoinHandle<FUT::Output>>
180    where
181        FUT: Future + Send + 'static,
182        FUT::Output: Send,
183    {
184        tokio::task::Builder::new()
185            .name(name.as_ref())
186            .spawn_on(fut, self.handle())
187    }
188}
189
190/// Schedule func to run in the main runtime pool,
191/// which is named "localset" for legacy reasons.
192pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
193where
194    FUT: Future + Send + 'static,
195    FUT::Output: Send,
196{
197    tokio::task::Builder::new()
198        .name(name.as_ref())
199        .spawn_on(fut, RUNTIME.handle())
200}
201
202/// Spawn a future as a task with a name.
203/// The task is spawned into the current tokio runtime.
204pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
205where
206    FUT: Future + Send + 'static,
207    FUT::Output: Send,
208{
209    tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
210}
211
212/// Run a blocking function in the worker thread pool associated
213/// with the current tokio runtime.
214pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
215where
216    F: FnOnce() -> R + Send + 'static,
217    R: Send + 'static,
218    N: AsRef<str>,
219{
220    tokio::task::Builder::new()
221        .name(name.as_ref())
222        .spawn_blocking(func)
223}
224
225/// Run a blocking function in the worker thread pool associated
226/// with the provided tokio runtime.
227pub fn spawn_blocking_on<F, N, R>(
228    name: N,
229    func: F,
230    runtime: &Handle,
231) -> std::io::Result<JoinHandle<R>>
232where
233    F: FnOnce() -> R + Send + 'static,
234    R: Send + 'static,
235    N: AsRef<str>,
236{
237    tokio::task::Builder::new()
238        .name(name.as_ref())
239        .spawn_blocking_on(func, runtime)
240}
241
242pub fn available_parallelism() -> anyhow::Result<usize> {
243    match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
244        Ok(n) => n
245            .parse()
246            .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
247        Err(_) => Ok(std::thread::available_parallelism()
248            .context("failed to get available_parallelism")?
249            .get()),
250    }
251}