kumo_server_runtime/
lib.rs1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt::Write;
6use std::future::Future;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{Arc, LazyLock};
9use tokio::runtime::Handle;
10use tokio::task::JoinHandle;
11use tokio::time::Duration;
12
13pub static RUNTIME: LazyLock<Runtime> =
14 LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
15
16declare_metric! {
17static PARKED_THREADS: IntGaugeVec(
19 "thread_pool_parked",
20 &["pool"]);
21}
22
23declare_metric! {
24static NUM_THREADS: IntGaugeVec(
26 "thread_pool_size",
27 &["pool"]);
28}
29
30static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
31 LazyLock::new(|| Mutex::new(HashMap::new()));
32
33pub fn get_named_runtime(name: &str) -> Option<Runtime> {
34 RUNTIMES.lock().get(name).cloned()
35}
36
37pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
38
39pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
40 MAIN_RUNTIME.lock().replace(handle);
41}
42
43pub fn get_main_runtime() -> tokio::runtime::Handle {
44 MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
45}
46
47static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
48
49pub fn set_localset_threads(n: usize) {
50 LOCALSET_THREADS.store(n, Ordering::SeqCst);
51}
52
53struct RuntimeInner {
54 tokio_runtime: tokio::runtime::Runtime,
55 n_threads: usize,
56 name_prefix: String,
57}
58
59fn runtimes_by_name() -> BTreeMap<String, Runtime> {
60 RUNTIMES
61 .lock()
62 .iter()
63 .map(|(name, rt)| (name.clone(), rt.clone()))
64 .collect()
65}
66
67#[cfg(not(target_os = "linux"))]
68pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
69 "Runtime state dumping is not supported on this system".into()
70}
71
72#[cfg(target_os = "linux")]
79pub async fn dump_all_runtimes(timeout_duration: Duration) -> String {
80 let runtimes = runtimes_by_name();
81 let mut dumps = vec![];
82
83 async fn collect_dump(
84 label: &str,
85 handle: &tokio::runtime::Handle,
86 timeout_duration: Duration,
87 ) -> String {
88 match tokio::time::timeout(timeout_duration, handle.dump()).await {
89 Err(_) => format!("Runtime {label}: Timeout while collecting runtime dump"),
90 Ok(dump) => {
91 let label = label.to_string();
92 match tokio::task::spawn_blocking(move || {
93 let mut output = format!("Runtime: {label}\n");
94 for (i, task) in dump.tasks().iter().enumerate() {
95 let trace = task.trace();
96 writeln!(&mut output, "{label} TASK {i}:\n{trace}").ok();
97 }
98 output
99 })
100 .await
101 .map_err(|err| format!("spawn_blocking: join failed: {err:#}"))
102 {
103 Ok(s) | Err(s) => s,
104 }
105 }
106 }
107 }
108
109 dumps.push(collect_dump("main", &get_main_runtime(), timeout_duration).await);
110 for (label, rt) in runtimes {
111 dumps.push(collect_dump(&label, rt.handle(), timeout_duration).await);
112 }
113
114 dumps.join("\n\n")
115}
116
117#[derive(Clone)]
118pub struct Runtime {
119 inner: Arc<RuntimeInner>,
120}
121
122impl Drop for RuntimeInner {
123 fn drop(&mut self) {
124 PARKED_THREADS
125 .remove_label_values(&[&self.name_prefix])
126 .ok();
127 NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
128 }
129}
130
131impl Runtime {
132 pub fn new<F>(
133 name_prefix: &str,
134 default_size: F,
135 configured_size: &AtomicUsize,
136 ) -> anyhow::Result<Self>
137 where
138 F: FnOnce(usize) -> usize,
139 {
140 let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
141 let n_threads = match std::env::var(env_name) {
142 Ok(n) => n.parse()?,
143 Err(_) => {
144 let configured = configured_size.load(Ordering::SeqCst);
145 if configured == 0 {
146 let cpus = available_parallelism()?;
147 (default_size)(cpus).max(1)
148 } else {
149 configured
150 }
151 }
152 };
153
154 let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
155 let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
156 num_threads.set(n_threads as i64);
157
158 let next_id = Arc::new(AtomicUsize::new(0));
159
160 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
161 .enable_io()
162 .enable_time()
163 .worker_threads(n_threads)
164 .event_interval(
165 std::env::var("KUMOD_EVENT_INTERVAL")
166 .ok()
167 .and_then(|n| n.parse().ok())
168 .unwrap_or(61),
169 )
170 .max_io_events_per_tick(
171 std::env::var("KUMOD_IO_EVENTS_PER_TICK")
172 .ok()
173 .and_then(|n| n.parse().ok())
174 .unwrap_or(1024),
175 )
176 .on_thread_park({
177 let num_parked = num_parked.clone();
178 move || {
179 kumo_server_memory::purge_thread_cache();
180 num_parked.inc();
181 }
182 })
183 .thread_name_fn({
184 let name_prefix = name_prefix.to_string();
185 let next_id = next_id.clone();
186 move || {
187 let id = next_id.fetch_add(1, Ordering::SeqCst);
188 format!("{name_prefix}-{id}")
189 }
190 })
191 .max_blocking_threads(
192 std::env::var(format!(
193 "KUMOD_{}_MAX_BLOCKING_THREADS",
194 name_prefix.to_uppercase()
195 ))
196 .ok()
197 .and_then(|n| n.parse().ok())
198 .unwrap_or(512),
199 )
200 .on_thread_unpark({
201 let num_parked = num_parked.clone();
202 move || {
203 num_parked.dec();
204 }
205 })
206 .build()?;
207
208 let runtime = Self {
209 inner: Arc::new(RuntimeInner {
210 tokio_runtime,
211 n_threads,
212 name_prefix: name_prefix.to_string(),
213 }),
214 };
215
216 let mut runtimes = RUNTIMES.lock();
217 if runtimes.contains_key(name_prefix) {
218 anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
219 }
220
221 runtimes.insert(name_prefix.to_string(), runtime.clone());
222
223 Ok(runtime)
224 }
225
226 pub fn handle(&self) -> &tokio::runtime::Handle {
227 self.inner.tokio_runtime.handle()
228 }
229
230 pub fn get_num_threads(&self) -> usize {
231 self.inner.n_threads
232 }
233
234 pub fn spawn<FUT, N: AsRef<str>>(
236 &self,
237 name: N,
238 fut: FUT,
239 ) -> std::io::Result<JoinHandle<FUT::Output>>
240 where
241 FUT: Future + Send + 'static,
242 FUT::Output: Send,
243 {
244 tokio::task::Builder::new()
245 .name(name.as_ref())
246 .spawn_on(fut, self.handle())
247 }
248}
249
250pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
253where
254 FUT: Future + Send + 'static,
255 FUT::Output: Send,
256{
257 tokio::task::Builder::new()
258 .name(name.as_ref())
259 .spawn_on(fut, RUNTIME.handle())
260}
261
262pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
265where
266 FUT: Future + Send + 'static,
267 FUT::Output: Send,
268{
269 tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
270}
271
272pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
275where
276 F: FnOnce() -> R + Send + 'static,
277 R: Send + 'static,
278 N: AsRef<str>,
279{
280 tokio::task::Builder::new()
281 .name(name.as_ref())
282 .spawn_blocking(func)
283}
284
285pub fn spawn_blocking_on<F, N, R>(
288 name: N,
289 func: F,
290 runtime: &Handle,
291) -> std::io::Result<JoinHandle<R>>
292where
293 F: FnOnce() -> R + Send + 'static,
294 R: Send + 'static,
295 N: AsRef<str>,
296{
297 tokio::task::Builder::new()
298 .name(name.as_ref())
299 .spawn_blocking_on(func, runtime)
300}
301
302pub fn available_parallelism() -> anyhow::Result<usize> {
303 match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
304 Ok(n) => n
305 .parse()
306 .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
307 Err(_) => Ok(std::thread::available_parallelism()
308 .context("failed to get available_parallelism")?
309 .get()),
310 }
311}