kumo_server_runtime/
lib.rs

1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt::Write;
6use std::future::Future;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{Arc, LazyLock};
9use tokio::runtime::Handle;
10use tokio::task::JoinHandle;
11use tokio::time::Duration;
12
13pub static RUNTIME: LazyLock<Runtime> =
14    LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
15
16declare_metric! {
17/// number of parked (idle) threads in a thread pool
18static PARKED_THREADS: IntGaugeVec(
19        "thread_pool_parked",
20        &["pool"]);
21}
22
23declare_metric! {
24/// number of threads in a thread pool
25static NUM_THREADS: IntGaugeVec(
26        "thread_pool_size",
27        &["pool"]);
28}
29
30static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
31    LazyLock::new(|| Mutex::new(HashMap::new()));
32
33pub fn get_named_runtime(name: &str) -> Option<Runtime> {
34    RUNTIMES.lock().get(name).cloned()
35}
36
37pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
38
39pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
40    MAIN_RUNTIME.lock().replace(handle);
41}
42
43pub fn get_main_runtime() -> tokio::runtime::Handle {
44    MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
45}
46
47static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
48
49pub fn set_localset_threads(n: usize) {
50    LOCALSET_THREADS.store(n, Ordering::SeqCst);
51}
52
53struct RuntimeInner {
54    tokio_runtime: tokio::runtime::Runtime,
55    n_threads: usize,
56    name_prefix: String,
57}
58
59fn runtimes_by_name() -> BTreeMap<String, Runtime> {
60    RUNTIMES
61        .lock()
62        .iter()
63        .map(|(name, rt)| (name.clone(), rt.clone()))
64        .collect()
65}
66
67#[cfg(not(target_os = "linux"))]
68pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
69    "Runtime state dumping is not supported on this system".into()
70}
71
72// NOTE: at the time of writing, calling this once will prevent a
73// subsequent graceful shutdown from completing.
74//
75// You will need to call this multiple times to allow the graceful
76// shutdown to "clock through" and finish successfully.
77// I do not know what exactly causes that stutter/stickiness.
78#[cfg(target_os = "linux")]
79pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
80    let runtimes = runtimes_by_name();
81    let mut dumps = vec![];
82
83    async fn collect_dump(
84        label: &str,
85        handle: &tokio::runtime::Handle,
86        timeout_duration: Duration,
87    ) -> String {
88        match tokio::time::timeout(timeout_duration, handle.dump()).await {
89            Err(_) => format!("Runtime {label}: Timeout while collecting runtime dump"),
90            Ok(dump) => {
91                let label = label.to_string();
92                match tokio::task::spawn_blocking(move || {
93                    let mut output = format!("Runtime: {label}\n");
94                    for (i, task) in dump.tasks().iter().enumerate() {
95                        let trace = task.trace();
96                        writeln!(&mut output, "{label} TASK {i}:\n{trace}").ok();
97                    }
98                    output
99                })
100                .await
101                .map_err(|err| format!("spawn_blocking: join failed: {err:#}"))
102                {
103                    Ok(s) | Err(s) => s,
104                }
105            }
106        }
107    }
108
109    dumps.push(collect_dump("main", &get_main_runtime(), timeout_duration).await);
110    for (label, rt) in runtimes {
111        dumps.push(collect_dump(&label, rt.handle(), timeout_duration).await);
112    }
113
114    dumps.join("\n\n")
115}
116
117#[derive(Clone)]
118pub struct Runtime {
119    inner: Arc<RuntimeInner>,
120}
121
122impl Drop for RuntimeInner {
123    fn drop(&mut self) {
124        PARKED_THREADS
125            .remove_label_values(&[&self.name_prefix])
126            .ok();
127        NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
128    }
129}
130
131impl Runtime {
132    pub fn new<F>(
133        name_prefix: &str,
134        default_size: F,
135        configured_size: &AtomicUsize,
136    ) -> anyhow::Result<Self>
137    where
138        F: FnOnce(usize) -> usize,
139    {
140        let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
141        let n_threads = match std::env::var(env_name) {
142            Ok(n) => n.parse()?,
143            Err(_) => {
144                let configured = configured_size.load(Ordering::SeqCst);
145                if configured == 0 {
146                    let cpus = available_parallelism()?;
147                    (default_size)(cpus).max(1)
148                } else {
149                    configured
150                }
151            }
152        };
153
154        let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
155        let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
156        num_threads.set(n_threads as i64);
157
158        let next_id = Arc::new(AtomicUsize::new(0));
159
160        let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
161            .enable_io()
162            .enable_time()
163            .worker_threads(n_threads)
164            .event_interval(
165                std::env::var("KUMOD_EVENT_INTERVAL")
166                    .ok()
167                    .and_then(|n| n.parse().ok())
168                    .unwrap_or(61),
169            )
170            .max_io_events_per_tick(
171                std::env::var("KUMOD_IO_EVENTS_PER_TICK")
172                    .ok()
173                    .and_then(|n| n.parse().ok())
174                    .unwrap_or(1024),
175            )
176            .on_thread_park({
177                let num_parked = num_parked.clone();
178                move || {
179                    kumo_server_memory::purge_thread_cache();
180                    num_parked.inc();
181                }
182            })
183            .thread_name_fn({
184                let name_prefix = name_prefix.to_string();
185                let next_id = next_id.clone();
186                move || {
187                    let id = next_id.fetch_add(1, Ordering::SeqCst);
188                    format!("{name_prefix}-{id}")
189                }
190            })
191            .max_blocking_threads(
192                std::env::var(format!(
193                    "KUMOD_{}_MAX_BLOCKING_THREADS",
194                    name_prefix.to_uppercase()
195                ))
196                .ok()
197                .and_then(|n| n.parse().ok())
198                .unwrap_or(512),
199            )
200            .on_thread_unpark({
201                let num_parked = num_parked.clone();
202                move || {
203                    num_parked.dec();
204                }
205            })
206            .build()?;
207
208        let runtime = Self {
209            inner: Arc::new(RuntimeInner {
210                tokio_runtime,
211                n_threads,
212                name_prefix: name_prefix.to_string(),
213            }),
214        };
215
216        let mut runtimes = RUNTIMES.lock();
217        if runtimes.contains_key(name_prefix) {
218            anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
219        }
220
221        runtimes.insert(name_prefix.to_string(), runtime.clone());
222
223        Ok(runtime)
224    }
225
226    pub fn handle(&self) -> &tokio::runtime::Handle {
227        self.inner.tokio_runtime.handle()
228    }
229
230    pub fn get_num_threads(&self) -> usize {
231        self.inner.n_threads
232    }
233
234    /// Spawn a future into this runtime
235    pub fn spawn<FUT, N: AsRef<str>>(
236        &self,
237        name: N,
238        fut: FUT,
239    ) -> std::io::Result<JoinHandle<FUT::Output>>
240    where
241        FUT: Future + Send + 'static,
242        FUT::Output: Send,
243    {
244        tokio::task::Builder::new()
245            .name(name.as_ref())
246            .spawn_on(fut, self.handle())
247    }
248}
249
250/// Schedule func to run in the main runtime pool,
251/// which is named "localset" for legacy reasons.
252pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
253where
254    FUT: Future + Send + 'static,
255    FUT::Output: Send,
256{
257    tokio::task::Builder::new()
258        .name(name.as_ref())
259        .spawn_on(fut, RUNTIME.handle())
260}
261
262/// Spawn a future as a task with a name.
263/// The task is spawned into the current tokio runtime.
264pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
265where
266    FUT: Future + Send + 'static,
267    FUT::Output: Send,
268{
269    tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
270}
271
272/// Run a blocking function in the worker thread pool associated
273/// with the current tokio runtime.
274pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
275where
276    F: FnOnce() -> R + Send + 'static,
277    R: Send + 'static,
278    N: AsRef<str>,
279{
280    tokio::task::Builder::new()
281        .name(name.as_ref())
282        .spawn_blocking(func)
283}
284
285/// Run a blocking function in the worker thread pool associated
286/// with the provided tokio runtime.
287pub fn spawn_blocking_on<F, N, R>(
288    name: N,
289    func: F,
290    runtime: &Handle,
291) -> std::io::Result<JoinHandle<R>>
292where
293    F: FnOnce() -> R + Send + 'static,
294    R: Send + 'static,
295    N: AsRef<str>,
296{
297    tokio::task::Builder::new()
298        .name(name.as_ref())
299        .spawn_blocking_on(func, runtime)
300}
301
302pub fn available_parallelism() -> anyhow::Result<usize> {
303    match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
304        Ok(n) => n
305            .parse()
306            .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
307        Err(_) => Ok(std::thread::available_parallelism()
308            .context("failed to get available_parallelism")?
309            .get()),
310    }
311}