kumo_server_lifecycle/
lib.rs1use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{LazyLock, Mutex, OnceLock};
8use tokio::signal::unix::SignalKind;
9use tokio::sync::mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender};
10use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
11use uuid::Uuid;
12
13static ACTIVE: OnceLock<Mutex<Option<Activity>>> = OnceLock::new();
14static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
15static STOPPING: OnceLock<ShutdownState> = OnceLock::new();
16
17static ACTIVE_LABELS: LazyLock<Mutex<HashMap<Uuid, String>>> = LazyLock::new(Mutex::default);
18
19pub struct Activity {
24 tx: MPSCSender<()>,
25 uuid: Uuid,
26}
27
28impl std::fmt::Debug for Activity {
29 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
30 fmt.debug_struct("Activity").finish()
31 }
32}
33
34impl Clone for Activity {
35 fn clone(&self) -> Self {
36 let uuid = Uuid::new_v4();
37 let mut labels = ACTIVE_LABELS.lock().unwrap();
38
39 let label = match labels.get(&self.uuid) {
40 Some(existing) => format!("clone of {existing}"),
41 None => format!("impossible missing label for {}", self.uuid),
42 };
43 labels.insert(uuid, label);
44
45 Activity {
46 tx: self.tx.clone(),
47 uuid,
48 }
49 }
50}
51
52impl Drop for Activity {
53 fn drop(&mut self) {
54 ACTIVE_LABELS.lock().unwrap().remove(&self.uuid);
55 }
56}
57
58impl Activity {
59 pub fn get_opt(label: String) -> Option<Self> {
63 let uuid = Uuid::new_v4();
64 let active = ACTIVE.get()?.lock().unwrap();
65 let activity = active.as_ref()?;
66 ACTIVE_LABELS.lock().unwrap().insert(uuid, label);
67 Some(Activity {
68 tx: activity.tx.clone(),
69 uuid,
70 })
71 }
72
73 pub fn get(label: String) -> anyhow::Result<Self> {
77 Self::get_opt(label).ok_or_else(|| anyhow::anyhow!("shutting down"))
78 }
79
80 pub fn is_shutting_down(&self) -> bool {
82 SHUTTING_DOWN.load(Ordering::Relaxed)
83 }
84}
85
86pub fn is_shutting_down() -> bool {
87 SHUTTING_DOWN.load(Ordering::Relaxed)
88}
89
90struct ShutdownState {
91 tx: WatchSender<()>,
92 rx: WatchReceiver<()>,
93 request_shutdown_tx: MPSCSender<()>,
94 stop_requested: AtomicBool,
95}
96
97pub struct ShutdownSubcription {
102 rx: WatchReceiver<()>,
103}
104
105impl ShutdownSubcription {
106 pub fn get() -> Self {
108 Self {
109 rx: STOPPING.get().unwrap().rx.clone(),
110 }
111 }
112
113 pub fn try_get() -> Option<Self> {
114 Some(Self {
115 rx: STOPPING.get()?.rx.clone(),
116 })
117 }
118
119 pub async fn shutting_down(&mut self) {
121 self.rx.changed().await.ok();
122 }
123}
124
125pub struct LifeCycle {
129 activity_rx: MPSCReceiver<()>,
130 request_shutdown_rx: MPSCReceiver<()>,
131}
132
133impl Default for LifeCycle {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl LifeCycle {
140 pub fn new() -> Self {
143 let (activity_tx, activity_rx) = tokio::sync::mpsc::channel(1);
144 let uuid = Uuid::new_v4();
145 ACTIVE_LABELS
146 .lock()
147 .unwrap()
148 .insert(uuid, "Root LifeCycle".to_string());
149 ACTIVE
150 .set(Mutex::new(Some(Activity {
151 tx: activity_tx,
152 uuid,
153 })))
154 .map_err(|_| ())
155 .unwrap();
156
157 let (request_shutdown_tx, request_shutdown_rx) = tokio::sync::mpsc::channel(1);
158
159 let (tx, rx) = tokio::sync::watch::channel(());
160 STOPPING
161 .set(ShutdownState {
162 tx,
163 rx,
164 request_shutdown_tx,
165 stop_requested: AtomicBool::new(false),
166 })
167 .map_err(|_| ())
168 .unwrap();
169
170 Self {
171 activity_rx,
172 request_shutdown_rx,
173 }
174 }
175
176 pub async fn request_shutdown() {
181 tracing::debug!("shutdown has been requested");
182 if let Some(state) = STOPPING.get() {
183 if state.stop_requested.compare_exchange(
184 false,
185 true,
186 Ordering::SeqCst,
187 Ordering::SeqCst,
188 ) == Ok(false)
189 {
190 state.request_shutdown_tx.send(()).await.ok();
191 }
192 } else {
193 tracing::error!("request_shutdown: STOPPING channel is unavailable");
194 }
195 }
196
197 pub async fn wait_for_shutdown(&mut self) {
201 tracing::debug!("Waiting for interrupt");
203 let mut sig_term =
204 tokio::signal::unix::signal(SignalKind::terminate()).expect("listen for SIGTERM");
205 let mut sig_hup =
206 tokio::signal::unix::signal(SignalKind::hangup()).expect("listen for SIGUP");
207
208 tokio::select! {
209 _ = sig_term.recv() => {}
210 _ = sig_hup.recv() => {}
211 _ = tokio::signal::ctrl_c() => {}
212 _ = self.request_shutdown_rx.recv() => {}
213 };
214 tracing::debug!("wait_for_shutdown: shutdown requested!");
215 tracing::info!(
216 "Shutdown requested, please wait while in-flight messages are delivered \
217 (based on your configured smtp client timeout duration) and \
218 deferred spool messages are saved. \
219 Interrupting shutdown may cause loss of message accountability \
220 and/or duplicate delivery so please be patient!"
221 );
222 tracing::debug!("Signal tasks that we are stopping");
224 SHUTTING_DOWN.store(true, Ordering::SeqCst);
225 ACTIVE.get().map(|a| a.lock().unwrap().take());
226 STOPPING.get().map(|s| s.tx.send(()).ok());
227 tracing::debug!("Waiting for tasks to wrap up");
229 loop {
230 tokio::select! {
231 _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
232 let labels = ACTIVE_LABELS.lock().unwrap().clone();
233 let n = labels.len();
234 let summary :Vec<&str> = labels.values().map(|s| s.as_str()).take(10).collect();
235 let summary = summary.join(", ");
236 let summary = if labels.len() > 10 {
237 format!("{summary} (and {} others)", labels.len() - 10)
238 } else {
239 summary
240 };
241 tracing::info!("Still waiting for {n} pending activities... {summary}");
242 }
243 _ = self.activity_rx.recv() => {
244 return
245 }
246 }
247 }
248 }
249}