kumo_server_lifecycle/
lib.rs

1//! This module helps to manage the life cycle of the process
2//! and to shut things down gracefully.
3//!
4//! See <https://tokio.rs/tokio/topics/shutdown> for more information.
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{LazyLock, Mutex, OnceLock};
8use tokio::signal::unix::SignalKind;
9use tokio::sync::mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender};
10use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
11use uuid::Uuid;
12
13static ACTIVE: OnceLock<Mutex<Option<Activity>>> = OnceLock::new();
14static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
15static STOPPING: OnceLock<ShutdownState> = OnceLock::new();
16
17static ACTIVE_LABELS: LazyLock<Mutex<HashMap<Uuid, String>>> = LazyLock::new(Mutex::default);
18
19/// Represents some activity which cannot be ruthlessly interrupted.
20/// Obtain an Activity instance via Activity::get(). While any
21/// Activity instances are alive in the program, LifeCycle::wait_for_shutdown
22/// cannot complete.
23pub struct Activity {
24    tx: MPSCSender<()>,
25    uuid: Uuid,
26}
27
28impl std::fmt::Debug for Activity {
29    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
30        fmt.debug_struct("Activity").finish()
31    }
32}
33
34impl Clone for Activity {
35    fn clone(&self) -> Self {
36        let uuid = Uuid::new_v4();
37        let mut labels = ACTIVE_LABELS.lock().unwrap();
38
39        let label = match labels.get(&self.uuid) {
40            Some(existing) => format!("clone of {existing}"),
41            None => format!("impossible missing label for {}", self.uuid),
42        };
43        labels.insert(uuid, label);
44
45        Activity {
46            tx: self.tx.clone(),
47            uuid,
48        }
49    }
50}
51
52impl Drop for Activity {
53    fn drop(&mut self) {
54        ACTIVE_LABELS.lock().unwrap().remove(&self.uuid);
55    }
56}
57
58impl Activity {
59    /// Obtain an Activity instance.
60    /// If None is returned then the process is shutting down
61    /// and no new activity can be initiated.
62    pub fn get_opt(label: String) -> Option<Self> {
63        let uuid = Uuid::new_v4();
64        let active = ACTIVE.get()?.lock().unwrap();
65        let activity = active.as_ref()?;
66        ACTIVE_LABELS.lock().unwrap().insert(uuid, label);
67        Some(Activity {
68            tx: activity.tx.clone(),
69            uuid,
70        })
71    }
72
73    /// Obtain an Activity instance.
74    /// Returns Err if the process is shutting down and no new
75    /// activity can be initiated
76    pub fn get(label: String) -> anyhow::Result<Self> {
77        Self::get_opt(label).ok_or_else(|| anyhow::anyhow!("shutting down"))
78    }
79
80    /// Returns true if the process is shutting down.
81    pub fn is_shutting_down(&self) -> bool {
82        SHUTTING_DOWN.load(Ordering::Relaxed)
83    }
84}
85
86pub fn is_shutting_down() -> bool {
87    SHUTTING_DOWN.load(Ordering::Relaxed)
88}
89
90struct ShutdownState {
91    tx: WatchSender<()>,
92    rx: WatchReceiver<()>,
93    request_shutdown_tx: MPSCSender<()>,
94    stop_requested: AtomicBool,
95}
96
97/// ShutdownSubcription can be used by code that is idling.
98/// Select on your timeout and ShutdownSubcription::shutting_down
99/// to wake up when either the timeout expires or the process is
100/// about to shut down.
101pub struct ShutdownSubcription {
102    rx: WatchReceiver<()>,
103}
104
105impl ShutdownSubcription {
106    /// Obtain a shutdown subscription
107    pub fn get() -> Self {
108        Self {
109            rx: STOPPING.get().unwrap().rx.clone(),
110        }
111    }
112
113    pub fn try_get() -> Option<Self> {
114        Some(Self {
115            rx: STOPPING.get()?.rx.clone(),
116        })
117    }
118
119    /// Await the shutdown of the process
120    pub async fn shutting_down(&mut self) {
121        self.rx.changed().await.ok();
122    }
123}
124
125/// The LifeCycle struct represents the life_cycle of this server process.
126/// Creating an instance of it will prepare the global state of the
127/// process and allow other code to work with Activity and ShutdownSubcription.
128pub struct LifeCycle {
129    activity_rx: MPSCReceiver<()>,
130    request_shutdown_rx: MPSCReceiver<()>,
131}
132
133impl Default for LifeCycle {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl LifeCycle {
140    /// Initialize the process life_cycle.
141    /// May be called only once; will panic if called multiple times.
142    pub fn new() -> Self {
143        let (activity_tx, activity_rx) = tokio::sync::mpsc::channel(1);
144        let uuid = Uuid::new_v4();
145        ACTIVE_LABELS
146            .lock()
147            .unwrap()
148            .insert(uuid, "Root LifeCycle".to_string());
149        ACTIVE
150            .set(Mutex::new(Some(Activity {
151                tx: activity_tx,
152                uuid,
153            })))
154            .map_err(|_| ())
155            .unwrap();
156
157        let (request_shutdown_tx, request_shutdown_rx) = tokio::sync::mpsc::channel(1);
158
159        let (tx, rx) = tokio::sync::watch::channel(());
160        STOPPING
161            .set(ShutdownState {
162                tx,
163                rx,
164                request_shutdown_tx,
165                stop_requested: AtomicBool::new(false),
166            })
167            .map_err(|_| ())
168            .unwrap();
169
170        Self {
171            activity_rx,
172            request_shutdown_rx,
173        }
174    }
175
176    /// Request that we shutdown the process.
177    /// This will cause the wait_for_shutdown method on the process
178    /// LifeCycle instance to wake up and initiate the shutdown
179    /// procedure.
180    pub async fn request_shutdown() {
181        tracing::debug!("shutdown has been requested");
182        if let Some(state) = STOPPING.get() {
183            if state.stop_requested.compare_exchange(
184                false,
185                true,
186                Ordering::SeqCst,
187                Ordering::SeqCst,
188            ) == Ok(false)
189            {
190                state.request_shutdown_tx.send(()).await.ok();
191            }
192        } else {
193            tracing::error!("request_shutdown: STOPPING channel is unavailable");
194        }
195    }
196
197    /// Wait for a shutdown request, then propagate that state
198    /// to running tasks, and then wait for those tasks to complete
199    /// before returning to the caller.
200    pub async fn wait_for_shutdown(&mut self) {
201        // Wait for interrupt
202        tracing::debug!("Waiting for interrupt");
203        let mut sig_term =
204            tokio::signal::unix::signal(SignalKind::terminate()).expect("listen for SIGTERM");
205        let mut sig_hup =
206            tokio::signal::unix::signal(SignalKind::hangup()).expect("listen for SIGUP");
207
208        tokio::select! {
209            _ = sig_term.recv() => {}
210            _ = sig_hup.recv() => {}
211            _ = tokio::signal::ctrl_c() => {}
212            _ = self.request_shutdown_rx.recv() => {}
213        };
214        tracing::debug!("wait_for_shutdown: shutdown requested!");
215        tracing::info!(
216            "Shutdown requested, please wait while in-flight messages are delivered \
217             (based on your configured smtp client timeout duration) and \
218             deferred spool messages are saved. \
219             Interrupting shutdown may cause loss of message accountability \
220             and/or duplicate delivery so please be patient!"
221        );
222        // Signal that we are stopping
223        tracing::debug!("Signal tasks that we are stopping");
224        SHUTTING_DOWN.store(true, Ordering::SeqCst);
225        ACTIVE.get().map(|a| a.lock().unwrap().take());
226        STOPPING.get().map(|s| s.tx.send(()).ok());
227        // Wait for all pending activity to finish
228        tracing::debug!("Waiting for tasks to wrap up");
229        loop {
230            tokio::select! {
231                _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
232                    let labels = ACTIVE_LABELS.lock().unwrap().clone();
233                    let n = labels.len();
234                    let summary :Vec<&str> = labels.values().map(|s| s.as_str()).take(10).collect();
235                    let summary = summary.join(", ");
236                    let summary = if labels.len() > 10 {
237                        format!("{summary} (and {} others)", labels.len() - 10)
238                    } else {
239                        summary
240                    };
241                    tracing::info!("Still waiting for {n} pending activities... {summary}");
242                }
243                _ = self.activity_rx.recv() => {
244                    return
245                }
246            }
247        }
248    }
249}