1use filenamegen::Glob;
2use parking_lot::FairMutex as Mutex;
3use sha2::{Digest, Sha256};
4use std::collections::BTreeSet;
5use std::io::Read;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{Arc, LazyLock};
9use tokio::sync::watch::{channel, Receiver, Sender};
10use tokio::task::spawn_blocking;
11
12static CONFIG: LazyLock<Mutex<ConfigurationParams>> =
13 LazyLock::new(|| Mutex::new(ConfigurationParams::new()));
14
15static EPOCH: AtomicUsize = AtomicUsize::new(0);
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct ConfigEpoch(usize);
19
20pub struct ConfigurationParams {
21 pub globs: Arc<Vec<Glob>>,
22 sender: Sender<ConfigEpoch>,
23 receiver: Receiver<ConfigEpoch>,
24}
25
26impl Default for ConfigurationParams {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl ConfigurationParams {
33 pub fn new() -> Self {
34 let glob = Glob::new("/opt/kumomta/etc/**/*.{lua,json,toml,yaml}")
35 .expect("failed to parse default glob");
36
37 let (sender, receiver) = channel(ConfigEpoch(0));
38 Self {
39 globs: Arc::new(vec![glob]),
40 sender,
41 receiver,
42 }
43 }
44
45 pub fn set_globs(&mut self, patterns: Vec<String>) -> anyhow::Result<()> {
46 let mut globs = vec![];
47 for p in patterns {
48 globs.push(Glob::new(&p)?);
49 }
50 self.globs = Arc::new(globs);
51 Ok(())
52 }
53
54 async fn eval_globs() -> anyhow::Result<BTreeSet<PathBuf>> {
55 let globs = CONFIG.lock().globs.clone();
56 spawn_blocking(move || {
57 let mut paths: BTreeSet<PathBuf> = BTreeSet::new();
58 for glob in globs.iter() {
59 for path in glob.walk("/") {
60 let path = if !path.is_absolute() {
61 Path::new("/").join(path)
62 } else {
63 path
64 };
65 if path.is_file() {
66 paths.insert(path);
67 }
68 }
69 }
70 Ok(paths)
71 })
72 .await?
73 }
74
75 pub fn subscribe(&self) -> Receiver<ConfigEpoch> {
76 self.receiver.clone()
77 }
78
79 async fn config_epoch_task() -> anyhow::Result<()> {
80 tracing::info!("config_epoch_task: starting");
81
82 pub fn compute_hash(paths: BTreeSet<PathBuf>) -> String {
83 if paths.is_empty() {
84 tracing::warn!("config_epoch_task: glob evaluated to no paths");
85 return "-no-files-".into();
86 }
87
88 let mut ctx = Sha256::new();
89 for path in &paths {
90 tracing::trace!("hashing {}", path.display());
91 ctx.update(path.display().to_string());
92 match std::fs::File::open(path) {
93 Ok(mut f) => {
94 let mut buf = [0u8; 8192];
95 while let Ok(n) = f.read(&mut buf) {
96 if n == 0 {
97 break;
98 }
99 ctx.update(&buf[0..n]);
100 }
101 }
102 Err(err) => {
103 tracing::error!("Error opening {}: {err:#}", path.display());
104 }
105 }
106 }
107 let hash = ctx.finalize();
108 let hex = data_encoding::HEXLOWER.encode(&hash);
109
110 tracing::debug!("hashed {} files as {hex}", paths.len());
111
112 hex
113 }
114
115 let mut current_hash = String::new();
116
117 loop {
118 match Self::eval_globs().await {
119 Ok(paths) => match spawn_blocking(move || compute_hash(paths)).await {
120 Ok(hash) => {
121 if hash != current_hash {
122 tracing::info!("config_epoch_task: config change detected {hash:?}");
123 current_hash = hash.clone();
124
125 bump_current_epoch();
126 }
127 }
128 Err(err) => {
129 tracing::error!("config_epoch_task: error computing epoch: {err:#}");
130 }
131 },
132 Err(err) => {
133 tracing::error!("config_epoch_task: error computing hashes: {err:#}");
134 }
135 }
136
137 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
138 }
139 }
140}
141
142pub fn bump_current_epoch() {
143 let epoch = 1 + EPOCH.fetch_add(1, Ordering::SeqCst);
144 CONFIG.lock().sender.send(ConfigEpoch(epoch)).ok();
145}
146
147pub fn get_current_epoch() -> ConfigEpoch {
148 ConfigEpoch(EPOCH.load(Ordering::SeqCst))
149}
150
151pub fn subscribe() -> Receiver<ConfigEpoch> {
152 CONFIG.lock().subscribe()
153}
154
155pub fn set_globs(globs: Vec<String>) -> anyhow::Result<()> {
156 CONFIG.lock().set_globs(globs)
157}
158
159pub async fn eval_globs() -> anyhow::Result<Vec<String>> {
160 let mut result = vec![];
161 let paths = ConfigurationParams::eval_globs().await?;
162 for p in paths {
163 result.push(
164 p.to_str()
165 .ok_or_else(|| anyhow::anyhow!("path {} cannot be converted to UTF8", p.display()))?
166 .to_string(),
167 );
168 }
169 Ok(result)
170}
171
172pub fn start_monitor() {
173 tokio::spawn(async move {
174 if let Err(err) = ConfigurationParams::config_epoch_task().await {
175 tracing::error!("config_epoch_task: {err:#}");
176 }
177 });
178}