kumo_server_lifecycle/
lib.rs1use parking_lot::Mutex;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{LazyLock, OnceLock};
9use thiserror::Error;
10use tokio::signal::unix::SignalKind;
11use tokio::sync::mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender};
12use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
13use uuid::Uuid;
14
15static ACTIVE: OnceLock<Mutex<Option<Activity>>> = OnceLock::new();
16static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
17static STOPPING: OnceLock<ShutdownState> = OnceLock::new();
18
19static ACTIVE_LABELS: LazyLock<Mutex<HashMap<Uuid, String>>> = LazyLock::new(Mutex::default);
20
21#[derive(Error, Debug, Clone)]
22#[error("{0} refused: shutting down")]
23pub struct ShuttingDownError(String);
24
25impl ShuttingDownError {
26 pub fn is_shutting_down(err: &anyhow::Error) -> bool {
27 if err
28 .root_cause()
29 .downcast_ref::<ShuttingDownError>()
30 .is_some()
31 {
32 return true;
33 }
34 format!("{err:#}").contains("shutting down")
35 }
36
37 pub fn new(label: impl Into<String>) -> Self {
38 Self(label.into())
39 }
40}
41
42pub struct Activity {
47 tx: MPSCSender<()>,
48 uuid: Uuid,
49}
50
51impl std::fmt::Debug for Activity {
52 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
53 fmt.debug_struct("Activity").finish()
54 }
55}
56
57impl Clone for Activity {
58 fn clone(&self) -> Self {
59 let uuid = Uuid::new_v4();
60 let mut labels = ACTIVE_LABELS.lock();
61
62 let label = match labels.get(&self.uuid) {
63 Some(existing) => format!("clone of {existing}"),
64 None => format!("impossible missing label for {}", self.uuid),
65 };
66 labels.insert(uuid, label);
67
68 Activity {
69 tx: self.tx.clone(),
70 uuid,
71 }
72 }
73}
74
75impl Drop for Activity {
76 fn drop(&mut self) {
77 ACTIVE_LABELS.lock().remove(&self.uuid);
78 }
79}
80
81impl Activity {
82 pub fn get_opt(label: String) -> Option<Self> {
86 Self::get(label).ok()
87 }
88
89 pub fn get(label: String) -> Result<Self, ShuttingDownError> {
93 let uuid = Uuid::new_v4();
94 let Some(active) = ACTIVE.get().map(|a| a.lock()) else {
95 return Err(ShuttingDownError(label));
96 };
97 let Some(activity) = active.as_ref() else {
98 return Err(ShuttingDownError(label));
99 };
100 ACTIVE_LABELS.lock().insert(uuid, label);
101 Ok(Activity {
102 tx: activity.tx.clone(),
103 uuid,
104 })
105 }
106
107 pub fn is_shutting_down(&self) -> bool {
109 SHUTTING_DOWN.load(Ordering::Relaxed)
110 }
111}
112
113pub fn is_shutting_down() -> bool {
114 SHUTTING_DOWN.load(Ordering::Relaxed)
115}
116
117struct ShutdownState {
118 tx: WatchSender<()>,
119 rx: WatchReceiver<()>,
120 request_shutdown_tx: MPSCSender<()>,
121 stop_requested: AtomicBool,
122}
123
124pub struct ShutdownSubcription {
129 rx: WatchReceiver<()>,
130}
131
132impl ShutdownSubcription {
133 pub fn get() -> Self {
135 Self {
136 rx: STOPPING.get().unwrap().rx.clone(),
137 }
138 }
139
140 pub fn try_get() -> Option<Self> {
141 Some(Self {
142 rx: STOPPING.get()?.rx.clone(),
143 })
144 }
145
146 pub async fn shutting_down(&mut self) {
148 self.rx.changed().await.ok();
149 }
150}
151
152pub struct LifeCycle {
156 activity_rx: MPSCReceiver<()>,
157 request_shutdown_rx: MPSCReceiver<()>,
158}
159
160impl Default for LifeCycle {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166impl LifeCycle {
167 pub fn new() -> Self {
170 let (activity_tx, activity_rx) = tokio::sync::mpsc::channel(1);
171 let uuid = Uuid::new_v4();
172 ACTIVE_LABELS
173 .lock()
174 .insert(uuid, "Root LifeCycle".to_string());
175 ACTIVE
176 .set(Mutex::new(Some(Activity {
177 tx: activity_tx,
178 uuid,
179 })))
180 .map_err(|_| ())
181 .unwrap();
182
183 let (request_shutdown_tx, request_shutdown_rx) = tokio::sync::mpsc::channel(1);
184
185 let (tx, rx) = tokio::sync::watch::channel(());
186 STOPPING
187 .set(ShutdownState {
188 tx,
189 rx,
190 request_shutdown_tx,
191 stop_requested: AtomicBool::new(false),
192 })
193 .map_err(|_| ())
194 .unwrap();
195
196 Self {
197 activity_rx,
198 request_shutdown_rx,
199 }
200 }
201
202 pub async fn request_shutdown() {
207 tracing::debug!("shutdown has been requested");
208 if let Some(state) = STOPPING.get() {
209 if state.stop_requested.compare_exchange(
210 false,
211 true,
212 Ordering::SeqCst,
213 Ordering::SeqCst,
214 ) == Ok(false)
215 {
216 state.request_shutdown_tx.send(()).await.ok();
217 }
218 } else {
219 tracing::error!("request_shutdown: STOPPING channel is unavailable");
220 }
221 }
222
223 pub async fn wait_for_shutdown(&mut self) {
227 tracing::debug!("Waiting for interrupt");
229 let mut sig_term =
230 tokio::signal::unix::signal(SignalKind::terminate()).expect("listen for SIGTERM");
231 let mut sig_hup =
232 tokio::signal::unix::signal(SignalKind::hangup()).expect("listen for SIGUP");
233
234 tokio::select! {
235 _ = sig_term.recv() => {}
236 _ = sig_hup.recv() => {}
237 _ = tokio::signal::ctrl_c() => {}
238 _ = self.request_shutdown_rx.recv() => {}
239 };
240 tracing::debug!("wait_for_shutdown: shutdown requested!");
241 tracing::info!(
242 "Shutdown requested, please wait while in-flight messages are delivered \
243 (based on your configured smtp client timeout duration) and \
244 deferred spool messages are saved. \
245 Interrupting shutdown may cause loss of message accountability \
246 and/or duplicate delivery so please be patient!"
247 );
248 tracing::debug!("Signal tasks that we are stopping");
250 SHUTTING_DOWN.store(true, Ordering::SeqCst);
251 ACTIVE.get().map(|a| a.lock().take());
252 STOPPING.get().map(|s| s.tx.send(()).ok());
253 tracing::debug!("Waiting for tasks to wrap up");
255 loop {
256 tokio::select! {
257 _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
258 let labels = ACTIVE_LABELS.lock().clone();
259 let n = labels.len();
260 let summary :Vec<&str> = labels.values().map(|s| s.as_str()).take(10).collect();
261 let summary = summary.join(", ");
262 let summary = if labels.len() > 10 {
263 format!("{summary} (and {} others)", labels.len() - 10)
264 } else {
265 summary
266 };
267 tracing::info!("Still waiting for {n} pending activities... {summary}");
268 }
269 _ = self.activity_rx.recv() => {
270 return
271 }
272 }
273 }
274 }
275}