config/
epoch.rs

1use filenamegen::Glob;
2use parking_lot::FairMutex as Mutex;
3use sha2::{Digest, Sha256};
4use std::collections::BTreeSet;
5use std::io::Read;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{Arc, LazyLock};
9use tokio::sync::watch::{channel, Receiver, Sender};
10use tokio::task::spawn_blocking;
11
12static CONFIG: LazyLock<Mutex<ConfigurationParams>> =
13    LazyLock::new(|| Mutex::new(ConfigurationParams::new()));
14
15static EPOCH: AtomicUsize = AtomicUsize::new(0);
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct ConfigEpoch(usize);
19
20pub struct ConfigurationParams {
21    pub globs: Arc<Vec<Glob>>,
22    sender: Sender<ConfigEpoch>,
23    receiver: Receiver<ConfigEpoch>,
24}
25
26impl Default for ConfigurationParams {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl ConfigurationParams {
33    pub fn new() -> Self {
34        let glob = Glob::new("/opt/kumomta/etc/**/*.{lua,json,toml,yaml}")
35            .expect("failed to parse default glob");
36
37        let (sender, receiver) = channel(ConfigEpoch(0));
38        Self {
39            globs: Arc::new(vec![glob]),
40            sender,
41            receiver,
42        }
43    }
44
45    pub fn set_globs(&mut self, patterns: Vec<String>) -> anyhow::Result<()> {
46        let mut globs = vec![];
47        for p in patterns {
48            globs.push(Glob::new(&p)?);
49        }
50        self.globs = Arc::new(globs);
51        Ok(())
52    }
53
54    async fn eval_globs() -> anyhow::Result<BTreeSet<PathBuf>> {
55        let globs = CONFIG.lock().globs.clone();
56        spawn_blocking(move || {
57            let mut paths: BTreeSet<PathBuf> = BTreeSet::new();
58            for glob in globs.iter() {
59                for path in glob.walk("/") {
60                    let path = if !path.is_absolute() {
61                        Path::new("/").join(path)
62                    } else {
63                        path
64                    };
65                    if path.is_file() {
66                        paths.insert(path);
67                    }
68                }
69            }
70            Ok(paths)
71        })
72        .await?
73    }
74
75    pub fn subscribe(&self) -> Receiver<ConfigEpoch> {
76        self.receiver.clone()
77    }
78
79    async fn config_epoch_task() -> anyhow::Result<()> {
80        tracing::info!("config_epoch_task: starting");
81
82        pub fn compute_hash(paths: BTreeSet<PathBuf>) -> String {
83            if paths.is_empty() {
84                tracing::warn!("config_epoch_task: glob evaluated to no paths");
85                return "-no-files-".into();
86            }
87
88            let mut ctx = Sha256::new();
89            for path in &paths {
90                tracing::trace!("hashing {}", path.display());
91                ctx.update(path.display().to_string());
92                match std::fs::File::open(path) {
93                    Ok(mut f) => {
94                        let mut buf = [0u8; 8192];
95                        while let Ok(n) = f.read(&mut buf) {
96                            if n == 0 {
97                                break;
98                            }
99                            ctx.update(&buf[0..n]);
100                        }
101                    }
102                    Err(err) => {
103                        tracing::error!("Error opening {}: {err:#}", path.display());
104                    }
105                }
106            }
107            let hash = ctx.finalize();
108            let hex = data_encoding::HEXLOWER.encode(&hash);
109
110            tracing::debug!("hashed {} files as {hex}", paths.len());
111
112            hex
113        }
114
115        let mut current_hash = String::new();
116
117        loop {
118            match Self::eval_globs().await {
119                Ok(paths) => match spawn_blocking(move || compute_hash(paths)).await {
120                    Ok(hash) => {
121                        if hash != current_hash {
122                            tracing::info!("config_epoch_task: config change detected {hash:?}");
123                            current_hash = hash.clone();
124
125                            bump_current_epoch();
126                        }
127                    }
128                    Err(err) => {
129                        tracing::error!("config_epoch_task: error computing epoch: {err:#}");
130                    }
131                },
132                Err(err) => {
133                    tracing::error!("config_epoch_task: error computing hashes: {err:#}");
134                }
135            }
136
137            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
138        }
139    }
140}
141
142pub fn bump_current_epoch() {
143    let epoch = 1 + EPOCH.fetch_add(1, Ordering::SeqCst);
144    CONFIG.lock().sender.send(ConfigEpoch(epoch)).ok();
145}
146
147pub fn get_current_epoch() -> ConfigEpoch {
148    ConfigEpoch(EPOCH.load(Ordering::SeqCst))
149}
150
151pub fn subscribe() -> Receiver<ConfigEpoch> {
152    CONFIG.lock().subscribe()
153}
154
155pub fn set_globs(globs: Vec<String>) -> anyhow::Result<()> {
156    CONFIG.lock().set_globs(globs)
157}
158
159pub async fn eval_globs() -> anyhow::Result<Vec<String>> {
160    let mut result = vec![];
161    let paths = ConfigurationParams::eval_globs().await?;
162    for p in paths {
163        result.push(
164            p.to_str()
165                .ok_or_else(|| anyhow::anyhow!("path {} cannot be converted to UTF8", p.display()))?
166                .to_string(),
167        );
168    }
169    Ok(result)
170}
171
172pub fn start_monitor() {
173    tokio::spawn(async move {
174        if let Err(err) = ConfigurationParams::config_epoch_task().await {
175            tracing::error!("config_epoch_task: {err:#}");
176        }
177    });
178}