kumo_server_runtime/
lib.rs1use anyhow::Context;
2use parking_lot::Mutex;
3use prometheus::IntGaugeVec;
4use std::collections::HashMap;
5use std::future::Future;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, LazyLock};
8use tokio::runtime::Handle;
9use tokio::task::JoinHandle;
10
11pub static RUNTIME: LazyLock<Runtime> =
12 LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
13static PARKED_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
14 prometheus::register_int_gauge_vec!(
15 "thread_pool_parked",
16 "number of parked(idle) threads in a thread pool",
17 &["pool"]
18 )
19 .unwrap()
20});
21static NUM_THREADS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
22 prometheus::register_int_gauge_vec!(
23 "thread_pool_size",
24 "number of threads in a thread pool",
25 &["pool"]
26 )
27 .unwrap()
28});
29
30static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
31 LazyLock::new(|| Mutex::new(HashMap::new()));
32
33pub fn get_named_runtime(name: &str) -> Option<Runtime> {
34 RUNTIMES.lock().get(name).cloned()
35}
36
37pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
38
39pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
40 MAIN_RUNTIME.lock().replace(handle);
41}
42
43pub fn get_main_runtime() -> tokio::runtime::Handle {
44 MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
45}
46
47static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
48
49pub fn set_localset_threads(n: usize) {
50 LOCALSET_THREADS.store(n, Ordering::SeqCst);
51}
52
53struct RuntimeInner {
54 tokio_runtime: tokio::runtime::Runtime,
55 n_threads: usize,
56 name_prefix: String,
57}
58
59#[derive(Clone)]
60pub struct Runtime {
61 inner: Arc<RuntimeInner>,
62}
63
64impl Drop for RuntimeInner {
65 fn drop(&mut self) {
66 PARKED_THREADS
67 .remove_label_values(&[&self.name_prefix])
68 .ok();
69 NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
70 }
71}
72
73impl Runtime {
74 pub fn new<F>(
75 name_prefix: &str,
76 default_size: F,
77 configured_size: &AtomicUsize,
78 ) -> anyhow::Result<Self>
79 where
80 F: FnOnce(usize) -> usize,
81 {
82 let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
83 let n_threads = match std::env::var(env_name) {
84 Ok(n) => n.parse()?,
85 Err(_) => {
86 let configured = configured_size.load(Ordering::SeqCst);
87 if configured == 0 {
88 let cpus = available_parallelism()?;
89 (default_size)(cpus).max(1)
90 } else {
91 configured
92 }
93 }
94 };
95
96 let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
97 let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
98 num_threads.set(n_threads as i64);
99
100 let next_id = Arc::new(AtomicUsize::new(0));
101
102 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
103 .enable_io()
104 .enable_time()
105 .worker_threads(n_threads)
106 .event_interval(
107 std::env::var("KUMOD_EVENT_INTERVAL")
108 .ok()
109 .and_then(|n| n.parse().ok())
110 .unwrap_or(61),
111 )
112 .max_io_events_per_tick(
113 std::env::var("KUMOD_IO_EVENTS_PER_TICK")
114 .ok()
115 .and_then(|n| n.parse().ok())
116 .unwrap_or(1024),
117 )
118 .on_thread_park({
119 let num_parked = num_parked.clone();
120 move || {
121 kumo_server_memory::purge_thread_cache();
122 num_parked.inc();
123 }
124 })
125 .thread_name_fn({
126 let name_prefix = name_prefix.to_string();
127 let next_id = next_id.clone();
128 move || {
129 let id = next_id.fetch_add(1, Ordering::SeqCst);
130 format!("{name_prefix}-{id}")
131 }
132 })
133 .max_blocking_threads(
134 std::env::var(format!(
135 "KUMOD_{}_MAX_BLOCKING_THREADS",
136 name_prefix.to_uppercase()
137 ))
138 .ok()
139 .and_then(|n| n.parse().ok())
140 .unwrap_or(512),
141 )
142 .on_thread_unpark({
143 let num_parked = num_parked.clone();
144 move || {
145 num_parked.dec();
146 }
147 })
148 .build()?;
149
150 let runtime = Self {
151 inner: Arc::new(RuntimeInner {
152 tokio_runtime,
153 n_threads,
154 name_prefix: name_prefix.to_string(),
155 }),
156 };
157
158 let mut runtimes = RUNTIMES.lock();
159 if runtimes.contains_key(name_prefix) {
160 anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
161 }
162
163 runtimes.insert(name_prefix.to_string(), runtime.clone());
164
165 Ok(runtime)
166 }
167
168 pub fn handle(&self) -> &tokio::runtime::Handle {
169 self.inner.tokio_runtime.handle()
170 }
171
172 pub fn get_num_threads(&self) -> usize {
173 self.inner.n_threads
174 }
175
176 pub fn spawn<FUT, N: AsRef<str>>(
178 &self,
179 name: N,
180 fut: FUT,
181 ) -> std::io::Result<JoinHandle<FUT::Output>>
182 where
183 FUT: Future + Send + 'static,
184 FUT::Output: Send,
185 {
186 tokio::task::Builder::new()
187 .name(name.as_ref())
188 .spawn_on(fut, self.handle())
189 }
190}
191
192pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
195where
196 FUT: Future + Send + 'static,
197 FUT::Output: Send,
198{
199 tokio::task::Builder::new()
200 .name(name.as_ref())
201 .spawn_on(fut, RUNTIME.handle())
202}
203
204pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
207where
208 FUT: Future + Send + 'static,
209 FUT::Output: Send,
210{
211 tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
212}
213
214pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
217where
218 F: FnOnce() -> R + Send + 'static,
219 R: Send + 'static,
220 N: AsRef<str>,
221{
222 tokio::task::Builder::new()
223 .name(name.as_ref())
224 .spawn_blocking(func)
225}
226
227pub fn spawn_blocking_on<F, N, R>(
230 name: N,
231 func: F,
232 runtime: &Handle,
233) -> std::io::Result<JoinHandle<R>>
234where
235 F: FnOnce() -> R + Send + 'static,
236 R: Send + 'static,
237 N: AsRef<str>,
238{
239 tokio::task::Builder::new()
240 .name(name.as_ref())
241 .spawn_blocking_on(func, runtime)
242}
243
244pub fn available_parallelism() -> anyhow::Result<usize> {
245 match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
246 Ok(n) => n
247 .parse()
248 .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
249 Err(_) => Ok(std::thread::available_parallelism()
250 .context("failed to get available_parallelism")?
251 .get()),
252 }
253}