kumo_server_runtime/
lib.rs1use anyhow::Context;
2use kumo_prometheus::declare_metric;
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::future::Future;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, LazyLock};
8use tokio::runtime::Handle;
9use tokio::task::JoinHandle;
10
11pub static RUNTIME: LazyLock<Runtime> =
12 LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap());
13
14declare_metric! {
15static PARKED_THREADS: IntGaugeVec(
17 "thread_pool_parked",
18 &["pool"]);
19}
20
21declare_metric! {
22static NUM_THREADS: IntGaugeVec(
24 "thread_pool_size",
25 &["pool"]);
26}
27
28static RUNTIMES: LazyLock<Mutex<HashMap<String, Runtime>>> =
29 LazyLock::new(|| Mutex::new(HashMap::new()));
30
31pub fn get_named_runtime(name: &str) -> Option<Runtime> {
32 RUNTIMES.lock().get(name).cloned()
33}
34
35pub static MAIN_RUNTIME: Mutex<Option<tokio::runtime::Handle>> = Mutex::new(None);
36
37pub fn assign_main_runtime(handle: tokio::runtime::Handle) {
38 MAIN_RUNTIME.lock().replace(handle);
39}
40
41pub fn get_main_runtime() -> tokio::runtime::Handle {
42 MAIN_RUNTIME.lock().as_ref().map(|r| r.clone()).unwrap()
43}
44
45static LOCALSET_THREADS: AtomicUsize = AtomicUsize::new(0);
46
47pub fn set_localset_threads(n: usize) {
48 LOCALSET_THREADS.store(n, Ordering::SeqCst);
49}
50
51struct RuntimeInner {
52 tokio_runtime: tokio::runtime::Runtime,
53 n_threads: usize,
54 name_prefix: String,
55}
56
57#[derive(Clone)]
58pub struct Runtime {
59 inner: Arc<RuntimeInner>,
60}
61
62impl Drop for RuntimeInner {
63 fn drop(&mut self) {
64 PARKED_THREADS
65 .remove_label_values(&[&self.name_prefix])
66 .ok();
67 NUM_THREADS.remove_label_values(&[&self.name_prefix]).ok();
68 }
69}
70
71impl Runtime {
72 pub fn new<F>(
73 name_prefix: &str,
74 default_size: F,
75 configured_size: &AtomicUsize,
76 ) -> anyhow::Result<Self>
77 where
78 F: FnOnce(usize) -> usize,
79 {
80 let env_name = format!("KUMOD_{}_THREADS", name_prefix.to_uppercase());
81 let n_threads = match std::env::var(env_name) {
82 Ok(n) => n.parse()?,
83 Err(_) => {
84 let configured = configured_size.load(Ordering::SeqCst);
85 if configured == 0 {
86 let cpus = available_parallelism()?;
87 (default_size)(cpus).max(1)
88 } else {
89 configured
90 }
91 }
92 };
93
94 let num_parked = PARKED_THREADS.get_metric_with_label_values(&[name_prefix])?;
95 let num_threads = NUM_THREADS.get_metric_with_label_values(&[name_prefix])?;
96 num_threads.set(n_threads as i64);
97
98 let next_id = Arc::new(AtomicUsize::new(0));
99
100 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
101 .enable_io()
102 .enable_time()
103 .worker_threads(n_threads)
104 .event_interval(
105 std::env::var("KUMOD_EVENT_INTERVAL")
106 .ok()
107 .and_then(|n| n.parse().ok())
108 .unwrap_or(61),
109 )
110 .max_io_events_per_tick(
111 std::env::var("KUMOD_IO_EVENTS_PER_TICK")
112 .ok()
113 .and_then(|n| n.parse().ok())
114 .unwrap_or(1024),
115 )
116 .on_thread_park({
117 let num_parked = num_parked.clone();
118 move || {
119 kumo_server_memory::purge_thread_cache();
120 num_parked.inc();
121 }
122 })
123 .thread_name_fn({
124 let name_prefix = name_prefix.to_string();
125 let next_id = next_id.clone();
126 move || {
127 let id = next_id.fetch_add(1, Ordering::SeqCst);
128 format!("{name_prefix}-{id}")
129 }
130 })
131 .max_blocking_threads(
132 std::env::var(format!(
133 "KUMOD_{}_MAX_BLOCKING_THREADS",
134 name_prefix.to_uppercase()
135 ))
136 .ok()
137 .and_then(|n| n.parse().ok())
138 .unwrap_or(512),
139 )
140 .on_thread_unpark({
141 let num_parked = num_parked.clone();
142 move || {
143 num_parked.dec();
144 }
145 })
146 .build()?;
147
148 let runtime = Self {
149 inner: Arc::new(RuntimeInner {
150 tokio_runtime,
151 n_threads,
152 name_prefix: name_prefix.to_string(),
153 }),
154 };
155
156 let mut runtimes = RUNTIMES.lock();
157 if runtimes.contains_key(name_prefix) {
158 anyhow::bail!("thread pool runtime with name `{name_prefix}` already exists!");
159 }
160
161 runtimes.insert(name_prefix.to_string(), runtime.clone());
162
163 Ok(runtime)
164 }
165
166 pub fn handle(&self) -> &tokio::runtime::Handle {
167 self.inner.tokio_runtime.handle()
168 }
169
170 pub fn get_num_threads(&self) -> usize {
171 self.inner.n_threads
172 }
173
174 pub fn spawn<FUT, N: AsRef<str>>(
176 &self,
177 name: N,
178 fut: FUT,
179 ) -> std::io::Result<JoinHandle<FUT::Output>>
180 where
181 FUT: Future + Send + 'static,
182 FUT::Output: Send,
183 {
184 tokio::task::Builder::new()
185 .name(name.as_ref())
186 .spawn_on(fut, self.handle())
187 }
188}
189
190pub fn rt_spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
193where
194 FUT: Future + Send + 'static,
195 FUT::Output: Send,
196{
197 tokio::task::Builder::new()
198 .name(name.as_ref())
199 .spawn_on(fut, RUNTIME.handle())
200}
201
202pub fn spawn<FUT, N: AsRef<str>>(name: N, fut: FUT) -> std::io::Result<JoinHandle<FUT::Output>>
205where
206 FUT: Future + Send + 'static,
207 FUT::Output: Send,
208{
209 tokio::task::Builder::new().name(name.as_ref()).spawn(fut)
210}
211
212pub fn spawn_blocking<F, N, R>(name: N, func: F) -> std::io::Result<JoinHandle<R>>
215where
216 F: FnOnce() -> R + Send + 'static,
217 R: Send + 'static,
218 N: AsRef<str>,
219{
220 tokio::task::Builder::new()
221 .name(name.as_ref())
222 .spawn_blocking(func)
223}
224
225pub fn spawn_blocking_on<F, N, R>(
228 name: N,
229 func: F,
230 runtime: &Handle,
231) -> std::io::Result<JoinHandle<R>>
232where
233 F: FnOnce() -> R + Send + 'static,
234 R: Send + 'static,
235 N: AsRef<str>,
236{
237 tokio::task::Builder::new()
238 .name(name.as_ref())
239 .spawn_blocking_on(func, runtime)
240}
241
242pub fn available_parallelism() -> anyhow::Result<usize> {
243 match std::env::var("KUMO_AVAILABLE_PARALLELISM") {
244 Ok(n) => n
245 .parse()
246 .context("failed to parse KUMO_AVAILABLE_PARALLELISM as a number"),
247 Err(_) => Ok(std::thread::available_parallelism()
248 .context("failed to get available_parallelism")?
249 .get()),
250 }
251}