1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use lruttl::declare_cache;
6use mlua::prelude::LuaUserData;
7use mlua::{Lua, Value};
8use prometheus::{Counter, Histogram};
9use serde::Deserialize;
10use std::sync::{Arc, LazyLock, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::Duration;
13
14declare_cache! {
15static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
17}
18declare_cache! {
19static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
21}
22
23static SIGNER_KEY_FETCH: LazyLock<Histogram> = LazyLock::new(|| {
24 prometheus::register_histogram!(
25 "dkim_signer_key_fetch",
26 "how long it takes to obtain a dkim key"
27 )
28 .unwrap()
29});
30static SIGNER_CREATE: LazyLock<Histogram> = LazyLock::new(|| {
31 prometheus::register_histogram!(
32 "dkim_signer_creation",
33 "how long it takes to create a signer on a cache miss"
34 )
35 .unwrap()
36});
37static SIGNER_SIGN: LazyLock<Histogram> = LazyLock::new(|| {
38 prometheus::register_histogram!(
39 "dkim_signer_sign",
40 "how long it takes to dkim sign parsed messages"
41 )
42 .unwrap()
43});
44static SIGNER_PARSE: LazyLock<Histogram> = LazyLock::new(|| {
45 prometheus::register_histogram!(
46 "dkim_signer_message_parse",
47 "how long it takes to parse messages as prep for signing"
48 )
49 .unwrap()
50});
51static SIGNER_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
52 prometheus::register_counter!(
53 "dkim_signer_cache_hit",
54 "how many cache dkim signer requests hit cache"
55 )
56 .unwrap()
57});
58static SIGNER_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
59 prometheus::register_counter!(
60 "dkim_signer_cache_miss",
61 "how many cache dkim signer requests miss cache"
62 )
63 .unwrap()
64});
65static SIGNER_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
66 prometheus::register_counter!(
67 "dkim_signer_cache_lookup_count",
68 "how many cache dkim signer requests occurred"
69 )
70 .unwrap()
71});
72
73static KEY_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
74 prometheus::register_counter!(
75 "dkim_signer_key_cache_hit",
76 "how many cache dkim signer requests hit key cache"
77 )
78 .unwrap()
79});
80static KEY_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
81 prometheus::register_counter!(
82 "dkim_signer_key_cache_miss",
83 "how many cache dkim signer requests miss key cache"
84 )
85 .unwrap()
86});
87static KEY_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
88 prometheus::register_counter!(
89 "dkim_signer_key_cache_lookup_count",
90 "how many cache dkim key requests occurred"
91 )
92 .unwrap()
93});
94
95#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
96pub enum Canon {
97 Relaxed,
98 Simple,
99}
100
101impl Default for Canon {
102 fn default() -> Self {
103 Self::Relaxed
104 }
105}
106
107#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
108pub enum HashAlgo {
109 Sha1,
110 Sha256,
111}
112
113#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
114pub struct SignerConfig {
115 domain: String,
116 selector: String,
117 headers: Vec<String>,
118 #[serde(default)]
119 atps: Option<String>,
120 #[serde(default)]
121 atpsh: Option<HashAlgo>,
122 #[serde(default)]
123 agent_user_identifier: Option<String>,
124 #[serde(default)]
125 expiration: Option<u64>,
126 #[serde(default)]
127 body_length: bool,
128 #[serde(default)]
129 reporting: bool,
130 #[serde(default)]
131 header_canonicalization: Canon,
132 #[serde(default)]
133 body_canonicalization: Canon,
134
135 key: KeySource,
136 #[serde(default)]
137 over_sign: bool,
138
139 #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
140 ttl: Duration,
141}
142
143impl SignerConfig {
144 fn default_ttl() -> Duration {
145 Duration::from_secs(300)
146 }
147
148 fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
149 if self.atps.is_some() {
150 anyhow::bail!("atps is not currently supported for RSA keys");
151 }
152 if self.atpsh.is_some() {
153 anyhow::bail!("atpsh is not currently supported for RSA keys");
154 }
155 if self.agent_user_identifier.is_some() {
156 anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
157 }
158 if self.body_length {
159 anyhow::bail!("body_length is not currently supported for RSA keys");
160 }
161 if self.reporting {
162 anyhow::bail!("reporting is not currently supported for RSA keys");
163 }
164
165 let mut signer = kumo_dkim::SignerBuilder::new()
166 .with_signed_headers(&self.headers)
167 .context("configure signed headers")?
168 .with_private_key(key)
169 .with_selector(&self.selector)
170 .with_signing_domain(&self.domain)
171 .with_over_signing(self.over_sign)
172 .with_header_canonicalization(match self.header_canonicalization {
173 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
174 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
175 })
176 .with_body_canonicalization(match self.body_canonicalization {
177 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
178 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
179 });
180 if let Some(exp) = self.expiration {
181 signer =
182 signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
183 anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
184 })?);
185 }
186
187 signer.build().context("build signer")
188 }
189}
190
191pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
192
193#[derive(Clone)]
194#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
195pub struct Signer(Arc<CFSigner>);
196
197impl Signer {
198 pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
199 self.0.sign(message)
200 }
201
202 pub fn signer(&self) -> &kumo_dkim::Signer {
203 self.0.signer()
204 }
205}
206
207impl LuaUserData for Signer {}
208
209async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
210 KEY_CACHE_LOOKUP.inc();
211 KEY_CACHE
212 .get_or_try_insert(key, |_| ttl, async {
213 let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
214 let data = key.get().await?;
215 let pkey = Arc::new(DkimPrivateKey::key(&data)?);
216 key_fetch_timer.stop_and_record();
217 Ok::<Arc<DkimPrivateKey>, anyhow::Error>(pkey)
218 })
219 .await
220 .map_err(|err| anyhow::anyhow!("{err:#}"))
221 .map(|lookup| {
222 if !lookup.is_fresh {
223 KEY_CACHE_HIT.inc();
224 } else {
225 KEY_CACHE_MISS.inc();
226 }
227 lookup.item
228 })
229}
230
231pub fn register(lua: &Lua) -> anyhow::Result<()> {
232 let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
233 dkim_mod.set(
234 "set_signing_threads",
235 lua.create_function(move |_lua, n: usize| {
236 let runtime = tokio::runtime::Builder::new_multi_thread()
237 .thread_name("dkimsign")
238 .worker_threads(1)
239 .max_blocking_threads(n)
240 .build()
241 .map_err(any_err)?;
242 SIGN_POOL
243 .set(runtime)
244 .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
245 println!("started dkimsign pool with {n} threads");
246 Ok(())
247 })?,
248 )?;
249
250 async fn generic_signer_ctor(lua: Lua, params: Value) -> mlua::Result<Signer> {
251 let params: SignerConfig = from_lua_value(&lua, params)?;
252
253 SIGNER_CACHE_LOOKUP.inc();
254 SIGNER_CACHE
255 .get_or_try_insert(¶ms, |_| params.ttl, async {
256 let signer_creation_timer = SIGNER_CREATE.start_timer();
257
258 let key = cached_key_load(¶ms.key, params.ttl)
259 .await
260 .map_err(|err| anyhow::anyhow!("{:?}: {err:#}", params.key))?;
261
262 let signer = params
263 .configure_kumo_dkim(key)
264 .map_err(|err| anyhow::anyhow!("{err:#}"))?;
265
266 let inner = Arc::new(CFSigner { signer });
267
268 signer_creation_timer.stop_and_record();
269 Ok::<Arc<CFSigner>, anyhow::Error>(inner)
270 })
271 .await
272 .map_err(any_err)
273 .map(|lookup| {
274 if !lookup.is_fresh {
275 SIGNER_CACHE_HIT.inc();
276 } else {
277 SIGNER_CACHE_MISS.inc();
278 }
279 Signer(lookup.item)
280 })
281 }
282
283 dkim_mod.set(
284 "rsa_sha256_signer",
285 lua.create_async_function(generic_signer_ctor)?,
286 )?;
287
288 dkim_mod.set(
289 "ed25519_signer",
290 lua.create_async_function(generic_signer_ctor)?,
291 )?;
292 Ok(())
293}
294
295#[derive(Debug)]
296pub struct CFSigner {
297 signer: kumo_dkim::Signer,
298}
299
300impl CFSigner {
301 fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
302 let parse_timer = SIGNER_PARSE.start_timer();
303 let message_str =
304 std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
305 let mail = kumo_dkim::ParsedEmail::parse(message_str)
306 .context("failed to parse message to pass to dkim signer")?;
307 parse_timer.stop_and_record();
308
309 let sign_timer = SIGNER_SIGN.start_timer();
310 let dkim_header = self.signer.sign(&mail)?;
311 sign_timer.stop_and_record();
312
313 Ok(dkim_header)
314 }
315
316 fn signer(&self) -> &kumo_dkim::Signer {
317 &self.signer
318 }
319}