kumo_server_runtime/
lib.rs1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4#[cfg(target_os = "linux")]
5use std::collections::BTreeMap;
6use std::collections::HashMap;
7#[cfg(target_os = "linux")]
8use std::fmt::Write;
9use std::future::Future;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, LazyLock};
12use tokio::runtime::Handle;
13use tokio::task::JoinHandle;
14use tokio::time::Duration;
15
16pub static RUNTIME: LazyLock<Runtime> =
17 LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
18
19declare_metric! {
20static PARKED_THREADS: IntGaugeVec(
22 "thread_pool_parked",
23 &["pool"]);
24}
25
26declare_metric! {
27static NUM_THREADS: IntGaugeVec(
29 "thread_pool_size",
30 &["pool"]);
31}
32
33static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
34 LazyLock::new(|| Mutex::new(HashMap::new()));
35
36pub fn get_named_runtime(name: &str) -> Option<Runtime> {
37 RUNTIMES.lock().get(name).cloned()
38}
39
40pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
41
42pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
43 MAIN_RUNTIME.lock().replace(handle);
44}
45
46pub fn get_main_runtime() -> tokio::runtime::Handle {
47 MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
48}
49
50static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
51
52pub fn set_localset_threads(n: usize) {
53 LOCALSET_THREADS.store(n, Ordering::SeqCst);
54}
55
56struct RuntimeInner {
57 tokio_runtime: tokio::runtime::Runtime,
58 n_threads: usize,
59 name_prefix: String,
60}
61
62#[cfg(target_os = "linux")]
63fn runtimes_by_name() -> BTreeMap<String, Runtime> {
64 RUNTIMES
65 .lock()
66 .iter()
67 .map(|(name, rt)| (name.clone(), rt.clone()))
68 .collect()
69}
70
71#[cfg(not(target_os = "linux"))]
72pub async fn dump_all_runtimes(_timeout_duration: Duration) -> String {
73 "Runtime state dumping is not supported on this system".into()
74}
75
76#[cfg(target_os = "linux")]
83pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
84 let runtimes = runtimes_by_name();
85 let mut dumps = vec![];
86
87 async fn collect_dump(
88 label: &str,
89 handle: &tokio::runtime::Handle,
90 timeout_duration: Duration,
91 ) -> String {
92 match tokio::time::timeout(timeout_duration, handle.dump()).await {
93 Err(_) => format!("Runtime {label}: Timeout while collecting runtime dump"),
94 Ok(dump) => {
95 let label = label.to_string();
96 match tokio::task::spawn_blocking(move || {
97 let mut output = format!("Runtime: {label}\n");
98 for (i, task) in dump.tasks().iter().enumerate() {
99 let trace = task.trace();
100 writeln!(&mut output, "{label} TASK {i}:\n{trace}").ok();
101 }
102 output
103 })
104 .await
105 .map_err(|err| format!("spawn_blocking: join failed: {err:#}"))
106 {
107 Ok(s) | Err(s) => s,
108 }
109 }
110 }
111 }
112
113 dumps.push(collect_dump("main", &get_main_runtime(), timeout_duration).await);
114 for (label, rt) in runtimes {
115 dumps.push(collect_dump(&label, rt.handle(), timeout_duration).await);
116 }
117
118 dumps.join("\n\n")
119}
120
121#[derive(Clone)]
122pub struct Runtime {
123 inner: Arc<RuntimeInner>,
124}
125
126impl Drop for RuntimeInner {
127 fn drop(&mut self) {
128 PARKED_THREADS
129 .remove_label_values(&[&self.name_prefix])
130 .ok();
131 NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
132 }
133}
134
135impl Runtime {
136 pub fn new<F>(
137 name_prefix: &str,
138 default_size: F,
139 configured_size: &AtomicUsize,
140 ) -> anyhow::Result<Self>
141 where
142 F: FnOnce(usize) -> usize,
143 {
144 let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
145 let n_threads = match std::env::var(env_name) {
146 Ok(n) => n.parse()?,
147 Err(_) => {
148 let configured = configured_size.load(Ordering::SeqCst);
149 if configured == 0 {
150 let cpus = available_parallelism()?;
151 (default_size)(cpus).max(1)
152 } else {
153 configured
154 }
155 }
156 };
157
158 let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
159 let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
160 num_threads.set(n_threads as i64);
161
162 let next_id = Arc::new(AtomicUsize::new(0));
163
164 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
165 .enable_io()
166 .enable_time()
167 .worker_threads(n_threads)
168 .event_interval(
169 std::env::var("KUMOD_EVENT_INTERVAL")
170 .ok()
171 .and_then(|n| n.parse().ok())
172 .unwrap_or(61),
173 )
174 .max_io_events_per_tick(
175 std::env::var("KUMOD_IO_EVENTS_PER_TICK")
176 .ok()
177 .and_then(|n| n.parse().ok())
178 .unwrap_or(1024),
179 )
180 .on_thread_park({
181 let num_parked = num_parked.clone();
182 move || {
183 kumo_server_memory::purge_thread_cache();
184 num_parked.inc();
185 }
186 })
187 .thread_name_fn({
188 let name_prefix = name_prefix.to_string();
189 let next_id = next_id.clone();
190 move || {
191 let id = next_id.fetch_add(1, Ordering::SeqCst);
192 format!("{name_prefix}-{id}")
193 }
194 })
195 .max_blocking_threads(
196 std::env::var(format!(
197 "KUMOD_{}_MAX_BLOCKING_THREADS",
198 name_prefix.to_uppercase()
199 ))
200 .ok()
201 .and_then(|n| n.parse().ok())
202 .unwrap_or(512),
203 )
204 .on_thread_unpark({
205 let num_parked = num_parked.clone();
206 move || {
207 num_parked.dec();
208 }
209 })
210 .build()?;
211
212 let runtime = Self {
213 inner: Arc::new(RuntimeInner {
214 tokio_runtime,
215 n_threads,
216 name_prefix: name_prefix.to_string(),
217 }),
218 };
219
220 let mut runtimes = RUNTIMES.lock();
221 if runtimes.contains_key(name_prefix) {
222 anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
223 }
224
225 runtimes.insert(name_prefix.to_string(), runtime.clone());
226
227 Ok(runtime)
228 }
229
230 pub fn handle(&self) -> &tokio::runtime::Handle {
231 self.inner.tokio_runtime.handle()
232 }
233
234 pub fn get_num_threads(&self) -> usize {
235 self.inner.n_threads
236 }
237
238 pub fn spawn<FUT, N: AsRef<str>>(
240 &self,
241 name: N,
242 fut: FUT,
243 ) -> std::io::Result<JoinHandle<FUT::Output>>
244 where
245 FUT: Future + Send + 'static,
246 FUT::Output: Send,
247 {
248 tokio::task::Builder::new()
249 .name(name.as_ref())
250 .spawn_on(fut, self.handle())
251 }
252}
253
254pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
257where
258 FUT: Future + Send + 'static,
259 FUT::Output: Send,
260{
261 tokio::task::Builder::new()
262 .name(name.as_ref())
263 .spawn_on(fut, RUNTIME.handle())
264}
265
266pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
269where
270 FUT: Future + Send + 'static,
271 FUT::Output: Send,
272{
273 tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
274}
275
276pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
279where
280 F: FnOnce() -> R + Send + 'static,
281 R: Send + 'static,
282 N: AsRef<str>,
283{
284 tokio::task::Builder::new()
285 .name(name.as_ref())
286 .spawn_blocking(func)
287}
288
289pub fn spawn_blocking_on<F, N, R>(
292 name: N,
293 func: F,
294 runtime: &Handle,
295) -> std::io::Result<JoinHandle<R>>
296where
297 F: FnOnce() -> R + Send + 'static,
298 R: Send + 'static,
299 N: AsRef<str>,
300{
301 tokio::task::Builder::new()
302 .name(name.as_ref())
303 .spawn_blocking_on(func, runtime)
304}
305
306pub fn available_parallelism() -> anyhow::Result<usize> {
307 match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
308 Ok(n) => n
309 .parse()
310 .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
311 Err(_) => Ok(std::thread::available_parallelism()
312 .context("failed to get available_parallelism")?
313 .get()),
314 }
315}