kumo_server_runtime/
lib.rs

1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4#[cfg(target_os = "linux")]
5use std::collections::BTreeMap;
6use std::collections::HashMap;
7#[cfg(target_os = "linux")]
8use std::fmt::Write;
9use std::future::Future;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, LazyLock};
12use tokio::runtime::Handle;
13use tokio::task::JoinHandle;
14use tokio::time::Duration;
15
16pub static RUNTIME: LazyLock<Runtime> =
17    LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
18
19declare_metric! {
20/// number of parked (idle) threads in a thread pool
21static PARKED_THREADS: IntGaugeVec(
22        "thread_pool_parked",
23        &["pool"]);
24}
25
26declare_metric! {
27/// number of threads in a thread pool
28static NUM_THREADS: IntGaugeVec(
29        "thread_pool_size",
30        &["pool"]);
31}
32
33static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
34    LazyLock::new(|| Mutex::new(HashMap::new()));
35
36pub fn get_named_runtime(name: &str) -> Option<Runtime> {
37    RUNTIMES.lock().get(name).cloned()
38}
39
40pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
41
42pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
43    MAIN_RUNTIME.lock().replace(handle);
44}
45
46pub fn get_main_runtime() -> tokio::runtime::Handle {
47    MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
48}
49
50static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
51
52pub fn set_localset_threads(n: usize) {
53    LOCALSET_THREADS.store(n, Ordering::SeqCst);
54}
55
56struct RuntimeInner {
57    tokio_runtime: tokio::runtime::Runtime,
58    n_threads: usize,
59    name_prefix: String,
60}
61
62#[cfg(target_os = "linux")]
63fn runtimes_by_name() -> BTreeMap<String, Runtime> {
64    RUNTIMES
65        .lock()
66        .iter()
67        .map(|(name, rt)| (name.clone(), rt.clone()))
68        .collect()
69}
70
71#[cfg(not(target_os = "linux"))]
72pub async fn dump_all_runtimes(_timeout_duration: Duration) -> String {
73    "Runtime state dumping is not supported on this system".into()
74}
75
76// NOTE: at the time of writing, calling this once will prevent a
77// subsequent graceful shutdown from completing.
78//
79// You will need to call this multiple times to allow the graceful
80// shutdown to "clock through" and finish successfully.
81// I do not know what exactly causes that stutter/stickiness.
82#[cfg(target_os = "linux")]
83pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
84    let runtimes = runtimes_by_name();
85    let mut dumps = vec![];
86
87    async fn collect_dump(
88        label: &str,
89        handle: &tokio::runtime::Handle,
90        timeout_duration: Duration,
91    ) -> String {
92        match tokio::time::timeout(timeout_duration, handle.dump()).await {
93            Err(_) => format!("Runtime {label}: Timeout while collecting runtime dump"),
94            Ok(dump) => {
95                let label = label.to_string();
96                match tokio::task::spawn_blocking(move || {
97                    let mut output = format!("Runtime: {label}\n");
98                    for (i, task) in dump.tasks().iter().enumerate() {
99                        let trace = task.trace();
100                        writeln!(&mut output, "{label} TASK {i}:\n{trace}").ok();
101                    }
102                    output
103                })
104                .await
105                .map_err(|err| format!("spawn_blocking: join failed: {err:#}"))
106                {
107                    Ok(s) | Err(s) => s,
108                }
109            }
110        }
111    }
112
113    dumps.push(collect_dump("main", &get_main_runtime(), timeout_duration).await);
114    for (label, rt) in runtimes {
115        dumps.push(collect_dump(&label, rt.handle(), timeout_duration).await);
116    }
117
118    dumps.join("\n\n")
119}
120
121#[derive(Clone)]
122pub struct Runtime {
123    inner: Arc<RuntimeInner>,
124}
125
126impl Drop for RuntimeInner {
127    fn drop(&mut self) {
128        PARKED_THREADS
129            .remove_label_values(&[&self.name_prefix])
130            .ok();
131        NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
132    }
133}
134
135impl Runtime {
136    pub fn new<F>(
137        name_prefix: &str,
138        default_size: F,
139        configured_size: &AtomicUsize,
140    ) -> anyhow::Result<Self>
141    where
142        F: FnOnce(usize) -> usize,
143    {
144        let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
145        let n_threads = match std::env::var(env_name) {
146            Ok(n) => n.parse()?,
147            Err(_) => {
148                let configured = configured_size.load(Ordering::SeqCst);
149                if configured == 0 {
150                    let cpus = available_parallelism()?;
151                    (default_size)(cpus).max(1)
152                } else {
153                    configured
154                }
155            }
156        };
157
158        let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
159        let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
160        num_threads.set(n_threads as i64);
161
162        let next_id = Arc::new(AtomicUsize::new(0));
163
164        let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
165            .enable_io()
166            .enable_time()
167            .worker_threads(n_threads)
168            .event_interval(
169                std::env::var("KUMOD_EVENT_INTERVAL")
170                    .ok()
171                    .and_then(|n| n.parse().ok())
172                    .unwrap_or(61),
173            )
174            .max_io_events_per_tick(
175                std::env::var("KUMOD_IO_EVENTS_PER_TICK")
176                    .ok()
177                    .and_then(|n| n.parse().ok())
178                    .unwrap_or(1024),
179            )
180            .on_thread_park({
181                let num_parked = num_parked.clone();
182                move || {
183                    kumo_server_memory::purge_thread_cache();
184                    num_parked.inc();
185                }
186            })
187            .thread_name_fn({
188                let name_prefix = name_prefix.to_string();
189                let next_id = next_id.clone();
190                move || {
191                    let id = next_id.fetch_add(1, Ordering::SeqCst);
192                    format!("{name_prefix}-{id}")
193                }
194            })
195            .max_blocking_threads(
196                std::env::var(format!(
197                    "KUMOD_{}_MAX_BLOCKING_THREADS",
198                    name_prefix.to_uppercase()
199                ))
200                .ok()
201                .and_then(|n| n.parse().ok())
202                .unwrap_or(512),
203            )
204            .on_thread_unpark({
205                let num_parked = num_parked.clone();
206                move || {
207                    num_parked.dec();
208                }
209            })
210            .build()?;
211
212        let runtime = Self {
213            inner: Arc::new(RuntimeInner {
214                tokio_runtime,
215                n_threads,
216                name_prefix: name_prefix.to_string(),
217            }),
218        };
219
220        let mut runtimes = RUNTIMES.lock();
221        if runtimes.contains_key(name_prefix) {
222            anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
223        }
224
225        runtimes.insert(name_prefix.to_string(), runtime.clone());
226
227        Ok(runtime)
228    }
229
230    pub fn handle(&self) -> &tokio::runtime::Handle {
231        self.inner.tokio_runtime.handle()
232    }
233
234    pub fn get_num_threads(&self) -> usize {
235        self.inner.n_threads
236    }
237
238    /// Spawn a future into this runtime
239    pub fn spawn<FUT, N: AsRef<str>>(
240        &self,
241        name: N,
242        fut: FUT,
243    ) -> std::io::Result<JoinHandle<FUT::Output>>
244    where
245        FUT: Future + Send + 'static,
246        FUT::Output: Send,
247    {
248        tokio::task::Builder::new()
249            .name(name.as_ref())
250            .spawn_on(fut, self.handle())
251    }
252}
253
254/// Schedule func to run in the main runtime pool,
255/// which is named "localset" for legacy reasons.
256pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
257where
258    FUT: Future + Send + 'static,
259    FUT::Output: Send,
260{
261    tokio::task::Builder::new()
262        .name(name.as_ref())
263        .spawn_on(fut, RUNTIME.handle())
264}
265
266/// Spawn a future as a task with a name.
267/// The task is spawned into the current tokio runtime.
268pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
269where
270    FUT: Future + Send + 'static,
271    FUT::Output: Send,
272{
273    tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
274}
275
276/// Run a blocking function in the worker thread pool associated
277/// with the current tokio runtime.
278pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
279where
280    F: FnOnce() -> R + Send + 'static,
281    R: Send + 'static,
282    N: AsRef<str>,
283{
284    tokio::task::Builder::new()
285        .name(name.as_ref())
286        .spawn_blocking(func)
287}
288
289/// Run a blocking function in the worker thread pool associated
290/// with the provided tokio runtime.
291pub fn spawn_blocking_on<F, N, R>(
292    name: N,
293    func: F,
294    runtime: &Handle,
295) -> std::io::Result<JoinHandle<R>>
296where
297    F: FnOnce() -> R + Send + 'static,
298    R: Send + 'static,
299    N: AsRef<str>,
300{
301    tokio::task::Builder::new()
302        .name(name.as_ref())
303        .spawn_blocking_on(func, runtime)
304}
305
306pub fn available_parallelism() -> anyhow::Result<usize> {
307    match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
308        Ok(n) => n
309            .parse()
310            .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
311        Err(_) => Ok(std::thread::available_parallelism()
312            .context("failed to get available_parallelism")?
313            .get()),
314    }
315}