kumo_server_lifecycle/
lib.rs

1//! This module helps to manage the life cycle of the process
2//! and to shut things down gracefully.
3//!
4//! See <https://tokio.rs/tokio/topics/shutdown> for more information.
5use parking_lot::Mutex;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{LazyLock, OnceLock};
9use thiserror::Error;
10use tokio::signal::unix::SignalKind;
11use tokio::sync::mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender};
12use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
13use uuid::Uuid;
14
15static ACTIVE: OnceLock<Mutex<Option<Activity>>> = OnceLock::new();
16static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
17static STOPPING: OnceLock<ShutdownState> = OnceLock::new();
18
19static ACTIVE_LABELS: LazyLock<Mutex<HashMap<Uuid, String>>> = LazyLock::new(Mutex::default);
20
21#[derive(Error, Debug, Clone)]
22#[error("{0} refused: shutting down")]
23pub struct ShuttingDownError(String);
24
25impl ShuttingDownError {
26    pub fn is_shutting_down(err: &anyhow::Error) -> bool {
27        if err
28            .root_cause()
29            .downcast_ref::<ShuttingDownError>()
30            .is_some()
31        {
32            return true;
33        }
34        format!("{err:#}").contains("shutting down")
35    }
36
37    pub fn new(label: impl Into<String>) -> Self {
38        Self(label.into())
39    }
40}
41
42/// Represents some activity which cannot be ruthlessly interrupted.
43/// Obtain an Activity instance via Activity::get(). While any
44/// Activity instances are alive in the program, LifeCycle::wait_for_shutdown
45/// cannot complete.
46pub struct Activity {
47    tx: MPSCSender<()>,
48    uuid: Uuid,
49}
50
51impl std::fmt::Debug for Activity {
52    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
53        fmt.debug_struct("Activity").finish()
54    }
55}
56
57impl Clone for Activity {
58    fn clone(&self) -> Self {
59        let uuid = Uuid::new_v4();
60        let mut labels = ACTIVE_LABELS.lock();
61
62        let label = match labels.get(&self.uuid) {
63            Some(existing) => format!("clone of {existing}"),
64            None => format!("impossible missing label for {}", self.uuid),
65        };
66        labels.insert(uuid, label);
67
68        Activity {
69            tx: self.tx.clone(),
70            uuid,
71        }
72    }
73}
74
75impl Drop for Activity {
76    fn drop(&mut self) {
77        ACTIVE_LABELS.lock().remove(&self.uuid);
78    }
79}
80
81impl Activity {
82    /// Obtain an Activity instance.
83    /// If None is returned then the process is shutting down
84    /// and no new activity can be initiated.
85    pub fn get_opt(label: String) -> Option<Self> {
86        Self::get(label).ok()
87    }
88
89    /// Obtain an Activity instance.
90    /// Returns Err if the process is shutting down and no new
91    /// activity can be initiated
92    pub fn get(label: String) -> Result<Self, ShuttingDownError> {
93        let uuid = Uuid::new_v4();
94        let Some(active) = ACTIVE.get().map(|a| a.lock()) else {
95            return Err(ShuttingDownError(label));
96        };
97        let Some(activity) = active.as_ref() else {
98            return Err(ShuttingDownError(label));
99        };
100        ACTIVE_LABELS.lock().insert(uuid, label);
101        Ok(Activity {
102            tx: activity.tx.clone(),
103            uuid,
104        })
105    }
106
107    /// Returns true if the process is shutting down.
108    pub fn is_shutting_down(&self) -> bool {
109        SHUTTING_DOWN.load(Ordering::Relaxed)
110    }
111}
112
113pub fn is_shutting_down() -> bool {
114    SHUTTING_DOWN.load(Ordering::Relaxed)
115}
116
117struct ShutdownState {
118    tx: WatchSender<()>,
119    rx: WatchReceiver<()>,
120    request_shutdown_tx: MPSCSender<anyhow::Result<()>>,
121    stop_requested: AtomicBool,
122}
123
124/// ShutdownSubcription can be used by code that is idling.
125/// Select on your timeout and ShutdownSubcription::shutting_down
126/// to wake up when either the timeout expires or the process is
127/// about to shut down.
128pub struct ShutdownSubcription {
129    rx: WatchReceiver<()>,
130}
131
132impl ShutdownSubcription {
133    /// Obtain a shutdown subscription
134    pub fn get() -> Self {
135        Self {
136            rx: STOPPING.get().unwrap().rx.clone(),
137        }
138    }
139
140    pub fn try_get() -> Option<Self> {
141        Some(Self {
142            rx: STOPPING.get()?.rx.clone(),
143        })
144    }
145
146    /// Await the shutdown of the process
147    pub async fn shutting_down(&mut self) {
148        self.rx.changed().await.ok();
149    }
150}
151
152/// The LifeCycle struct represents the life_cycle of this server process.
153/// Creating an instance of it will prepare the global state of the
154/// process and allow other code to work with Activity and ShutdownSubcription.
155pub struct LifeCycle {
156    activity_rx: MPSCReceiver<()>,
157    request_shutdown_rx: MPSCReceiver<anyhow::Result<()>>,
158}
159
160impl Default for LifeCycle {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl LifeCycle {
167    /// Initialize the process life_cycle.
168    /// May be called only once; will panic if called multiple times.
169    pub fn new() -> Self {
170        let (activity_tx, activity_rx) = tokio::sync::mpsc::channel(1);
171        let uuid = Uuid::new_v4();
172        ACTIVE_LABELS
173            .lock()
174            .insert(uuid, "Root LifeCycle".to_string());
175        ACTIVE
176            .set(Mutex::new(Some(Activity {
177                tx: activity_tx,
178                uuid,
179            })))
180            .map_err(|_| ())
181            .unwrap();
182
183        let (request_shutdown_tx, request_shutdown_rx) = tokio::sync::mpsc::channel(1);
184
185        let (tx, rx) = tokio::sync::watch::channel(());
186        STOPPING
187            .set(ShutdownState {
188                tx,
189                rx,
190                request_shutdown_tx,
191                stop_requested: AtomicBool::new(false),
192            })
193            .map_err(|_| ())
194            .unwrap();
195
196        Self {
197            activity_rx,
198            request_shutdown_rx,
199        }
200    }
201
202    /// Request that we shutdown the process.
203    /// This will cause the wait_for_shutdown method on the process
204    /// LifeCycle instance to wake up and initiate the shutdown
205    /// procedure.
206    pub async fn request_shutdown(final_result: anyhow::Result<()>) {
207        tracing::debug!("shutdown has been requested");
208        if let Some(state) = STOPPING.get() {
209            if state.stop_requested.compare_exchange(
210                false,
211                true,
212                Ordering::SeqCst,
213                Ordering::SeqCst,
214            ) == Ok(false)
215            {
216                state.request_shutdown_tx.send(final_result).await.ok();
217            }
218        } else {
219            tracing::error!("request_shutdown: STOPPING channel is unavailable");
220        }
221    }
222
223    /// Wait for a shutdown request, then propagate that state
224    /// to running tasks, and then wait for those tasks to complete
225    /// before returning to the caller.
226    pub async fn wait_for_shutdown(&mut self) -> Option<anyhow::Result<()>> {
227        // Wait for interrupt
228        tracing::debug!("Waiting for interrupt");
229        let mut sig_term =
230            tokio::signal::unix::signal(SignalKind::terminate()).expect("listen for SIGTERM");
231        let mut sig_hup =
232            tokio::signal::unix::signal(SignalKind::hangup()).expect("listen for SIGHUP");
233
234        let mut final_result: Option<anyhow::Result<()>> = None;
235        tokio::select! {
236            _ = sig_term.recv() => {}
237            _ = sig_hup.recv() => {}
238            _ = tokio::signal::ctrl_c() => {}
239            res = self.request_shutdown_rx.recv() => {
240                final_result = res;
241            }
242        };
243        tracing::debug!("wait_for_shutdown: shutdown requested!");
244        tracing::info!(
245            "Shutdown requested, please wait while in-flight messages are delivered \
246             (based on your configured smtp client timeout duration) and \
247             deferred spool messages are saved. \
248             Interrupting shutdown may cause loss of message accountability \
249             and/or duplicate delivery so please be patient!"
250        );
251        // Signal that we are stopping
252        tracing::debug!("Signal tasks that we are stopping");
253        SHUTTING_DOWN.store(true, Ordering::SeqCst);
254        ACTIVE.get().map(|a| a.lock().take());
255        STOPPING.get().map(|s| s.tx.send(()).ok());
256        // Wait for all pending activity to finish
257        tracing::debug!("Waiting for tasks to wrap up");
258        loop {
259            tokio::select! {
260                _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
261                    let labels = ACTIVE_LABELS.lock().clone();
262                    let n = labels.len();
263                    let summary :Vec<&str> = labels.values().map(|s| s.as_str()).take(10).collect();
264                    let summary = summary.join(", ");
265                    let summary = if labels.len() > 10 {
266                        format!("{summary} (and {} others)", labels.len() - 10)
267                    } else {
268                        summary
269                    };
270                    tracing::info!("Still waiting for {n} pending activities... {summary}");
271                }
272                _ = self.activity_rx.recv() => {
273                    return final_result;
274                }
275            }
276        }
277    }
278}