kumo_server_lifecycle/
lib.rs

1//! This module helps to manage the life cycle of the process
2//! and to shut things down gracefully.
3//!
4//! See <https://tokio.rs/tokio/topics/shutdown> for more information.
5use parking_lot::Mutex;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{LazyLock, OnceLock};
9use thiserror::Error;
10use tokio::signal::unix::SignalKind;
11use tokio::sync::mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender};
12use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
13use uuid::Uuid;
14
15static ACTIVE: OnceLock<Mutex<Option<Activity>>> = OnceLock::new();
16static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
17static STOPPING: OnceLock<ShutdownState> = OnceLock::new();
18
19static ACTIVE_LABELS: LazyLock<Mutex<HashMap<Uuid, String>>> = LazyLock::new(Mutex::default);
20
21#[derive(Error, Debug, Clone)]
22#[error("{0} refused: shutting down")]
23pub struct ShuttingDownError(String);
24
25impl ShuttingDownError {
26    pub fn is_shutting_down(err: &anyhow::Error) -> bool {
27        if err
28            .root_cause()
29            .downcast_ref::<ShuttingDownError>()
30            .is_some()
31        {
32            return true;
33        }
34        format!("{err:#}").contains("shutting down")
35    }
36
37    pub fn new(label: impl Into<String>) -> Self {
38        Self(label.into())
39    }
40}
41
42/// Represents some activity which cannot be ruthlessly interrupted.
43/// Obtain an Activity instance via Activity::get(). While any
44/// Activity instances are alive in the program, LifeCycle::wait_for_shutdown
45/// cannot complete.
46pub struct Activity {
47    tx: MPSCSender<()>,
48    uuid: Uuid,
49}
50
51impl std::fmt::Debug for Activity {
52    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
53        fmt.debug_struct("Activity").finish()
54    }
55}
56
57impl Clone for Activity {
58    fn clone(&self) -> Self {
59        let uuid = Uuid::new_v4();
60        let mut labels = ACTIVE_LABELS.lock();
61
62        let label = match labels.get(&self.uuid) {
63            Some(existing) => format!("clone of {existing}"),
64            None => format!("impossible missing label for {}", self.uuid),
65        };
66        labels.insert(uuid, label);
67
68        Activity {
69            tx: self.tx.clone(),
70            uuid,
71        }
72    }
73}
74
75impl Drop for Activity {
76    fn drop(&mut self) {
77        ACTIVE_LABELS.lock().remove(&self.uuid);
78    }
79}
80
81impl Activity {
82    /// Obtain an Activity instance.
83    /// If None is returned then the process is shutting down
84    /// and no new activity can be initiated.
85    pub fn get_opt(label: String) -> Option<Self> {
86        Self::get(label).ok()
87    }
88
89    /// Obtain an Activity instance.
90    /// Returns Err if the process is shutting down and no new
91    /// activity can be initiated
92    pub fn get(label: String) -> Result<Self, ShuttingDownError> {
93        let uuid = Uuid::new_v4();
94        let Some(active) = ACTIVE.get().map(|a| a.lock()) else {
95            return Err(ShuttingDownError(label));
96        };
97        let Some(activity) = active.as_ref() else {
98            return Err(ShuttingDownError(label));
99        };
100        ACTIVE_LABELS.lock().insert(uuid, label);
101        Ok(Activity {
102            tx: activity.tx.clone(),
103            uuid,
104        })
105    }
106
107    /// Returns true if the process is shutting down.
108    pub fn is_shutting_down(&self) -> bool {
109        SHUTTING_DOWN.load(Ordering::Relaxed)
110    }
111}
112
113pub fn is_shutting_down() -> bool {
114    SHUTTING_DOWN.load(Ordering::Relaxed)
115}
116
117struct ShutdownState {
118    tx: WatchSender<()>,
119    rx: WatchReceiver<()>,
120    request_shutdown_tx: MPSCSender<()>,
121    stop_requested: AtomicBool,
122}
123
124/// ShutdownSubcription can be used by code that is idling.
125/// Select on your timeout and ShutdownSubcription::shutting_down
126/// to wake up when either the timeout expires or the process is
127/// about to shut down.
128pub struct ShutdownSubcription {
129    rx: WatchReceiver<()>,
130}
131
132impl ShutdownSubcription {
133    /// Obtain a shutdown subscription
134    pub fn get() -> Self {
135        Self {
136            rx: STOPPING.get().unwrap().rx.clone(),
137        }
138    }
139
140    pub fn try_get() -> Option<Self> {
141        Some(Self {
142            rx: STOPPING.get()?.rx.clone(),
143        })
144    }
145
146    /// Await the shutdown of the process
147    pub async fn shutting_down(&mut self) {
148        self.rx.changed().await.ok();
149    }
150}
151
152/// The LifeCycle struct represents the life_cycle of this server process.
153/// Creating an instance of it will prepare the global state of the
154/// process and allow other code to work with Activity and ShutdownSubcription.
155pub struct LifeCycle {
156    activity_rx: MPSCReceiver<()>,
157    request_shutdown_rx: MPSCReceiver<()>,
158}
159
160impl Default for LifeCycle {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl LifeCycle {
167    /// Initialize the process life_cycle.
168    /// May be called only once; will panic if called multiple times.
169    pub fn new() -> Self {
170        let (activity_tx, activity_rx) = tokio::sync::mpsc::channel(1);
171        let uuid = Uuid::new_v4();
172        ACTIVE_LABELS
173            .lock()
174            .insert(uuid, "Root LifeCycle".to_string());
175        ACTIVE
176            .set(Mutex::new(Some(Activity {
177                tx: activity_tx,
178                uuid,
179            })))
180            .map_err(|_| ())
181            .unwrap();
182
183        let (request_shutdown_tx, request_shutdown_rx) = tokio::sync::mpsc::channel(1);
184
185        let (tx, rx) = tokio::sync::watch::channel(());
186        STOPPING
187            .set(ShutdownState {
188                tx,
189                rx,
190                request_shutdown_tx,
191                stop_requested: AtomicBool::new(false),
192            })
193            .map_err(|_| ())
194            .unwrap();
195
196        Self {
197            activity_rx,
198            request_shutdown_rx,
199        }
200    }
201
202    /// Request that we shutdown the process.
203    /// This will cause the wait_for_shutdown method on the process
204    /// LifeCycle instance to wake up and initiate the shutdown
205    /// procedure.
206    pub async fn request_shutdown() {
207        tracing::debug!("shutdown has been requested");
208        if let Some(state) = STOPPING.get() {
209            if state.stop_requested.compare_exchange(
210                false,
211                true,
212                Ordering::SeqCst,
213                Ordering::SeqCst,
214            ) == Ok(false)
215            {
216                state.request_shutdown_tx.send(()).await.ok();
217            }
218        } else {
219            tracing::error!("request_shutdown: STOPPING channel is unavailable");
220        }
221    }
222
223    /// Wait for a shutdown request, then propagate that state
224    /// to running tasks, and then wait for those tasks to complete
225    /// before returning to the caller.
226    pub async fn wait_for_shutdown(&mut self) {
227        // Wait for interrupt
228        tracing::debug!("Waiting for interrupt");
229        let mut sig_term =
230            tokio::signal::unix::signal(SignalKind::terminate()).expect("listen for SIGTERM");
231        let mut sig_hup =
232            tokio::signal::unix::signal(SignalKind::hangup()).expect("listen for SIGUP");
233
234        tokio::select! {
235            _ = sig_term.recv() => {}
236            _ = sig_hup.recv() => {}
237            _ = tokio::signal::ctrl_c() => {}
238            _ = self.request_shutdown_rx.recv() => {}
239        };
240        tracing::debug!("wait_for_shutdown: shutdown requested!");
241        tracing::info!(
242            "Shutdown requested, please wait while in-flight messages are delivered \
243             (based on your configured smtp client timeout duration) and \
244             deferred spool messages are saved. \
245             Interrupting shutdown may cause loss of message accountability \
246             and/or duplicate delivery so please be patient!"
247        );
248        // Signal that we are stopping
249        tracing::debug!("Signal tasks that we are stopping");
250        SHUTTING_DOWN.store(true, Ordering::SeqCst);
251        ACTIVE.get().map(|a| a.lock().take());
252        STOPPING.get().map(|s| s.tx.send(()).ok());
253        // Wait for all pending activity to finish
254        tracing::debug!("Waiting for tasks to wrap up");
255        loop {
256            tokio::select! {
257                _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
258                    let labels = ACTIVE_LABELS.lock().clone();
259                    let n = labels.len();
260                    let summary :Vec<&str> = labels.values().map(|s| s.as_str()).take(10).collect();
261                    let summary = summary.join(", ");
262                    let summary = if labels.len() > 10 {
263                        format!("{summary} (and {} others)", labels.len() - 10)
264                    } else {
265                        summary
266                    };
267                    tracing::info!("Still waiting for {n} pending activities... {summary}");
268                }
269                _ = self.activity_rx.recv() => {
270                    return
271                }
272            }
273        }
274    }
275}