1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use lruttl::declare_cache;
6use mlua::prelude::LuaUserData;
7use mlua::{Lua, Value};
8use prometheus::{Counter, Histogram};
9use serde::Deserialize;
10use std::sync::{Arc, LazyLock, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::{Duration, Instant};
13
14declare_cache! {
15static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
16}
17declare_cache! {
18static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
19}
20
21static SIGNER_KEY_FETCH: LazyLock<Histogram> = LazyLock::new(|| {
22 prometheus::register_histogram!(
23 "dkim_signer_key_fetch",
24 "how long it takes to obtain a dkim key"
25 )
26 .unwrap()
27});
28static SIGNER_CREATE: LazyLock<Histogram> = LazyLock::new(|| {
29 prometheus::register_histogram!(
30 "dkim_signer_creation",
31 "how long it takes to create a signer on a cache miss"
32 )
33 .unwrap()
34});
35static SIGNER_SIGN: LazyLock<Histogram> = LazyLock::new(|| {
36 prometheus::register_histogram!(
37 "dkim_signer_sign",
38 "how long it takes to dkim sign parsed messages"
39 )
40 .unwrap()
41});
42static SIGNER_PARSE: LazyLock<Histogram> = LazyLock::new(|| {
43 prometheus::register_histogram!(
44 "dkim_signer_message_parse",
45 "how long it takes to parse messages as prep for signing"
46 )
47 .unwrap()
48});
49static SIGNER_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
50 prometheus::register_counter!(
51 "dkim_signer_cache_hit",
52 "how many cache dkim signer requests hit cache"
53 )
54 .unwrap()
55});
56static SIGNER_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
57 prometheus::register_counter!(
58 "dkim_signer_cache_miss",
59 "how many cache dkim signer requests miss cache"
60 )
61 .unwrap()
62});
63static SIGNER_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
64 prometheus::register_counter!(
65 "dkim_signer_cache_lookup_count",
66 "how many cache dkim signer requests occurred"
67 )
68 .unwrap()
69});
70
71static KEY_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
72 prometheus::register_counter!(
73 "dkim_signer_key_cache_hit",
74 "how many cache dkim signer requests hit key cache"
75 )
76 .unwrap()
77});
78static KEY_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
79 prometheus::register_counter!(
80 "dkim_signer_key_cache_miss",
81 "how many cache dkim signer requests miss key cache"
82 )
83 .unwrap()
84});
85static KEY_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
86 prometheus::register_counter!(
87 "dkim_signer_key_cache_lookup_count",
88 "how many cache dkim key requests occurred"
89 )
90 .unwrap()
91});
92
93#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
94pub enum Canon {
95 Relaxed,
96 Simple,
97}
98
99impl Default for Canon {
100 fn default() -> Self {
101 Self::Relaxed
102 }
103}
104
105#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
106pub enum HashAlgo {
107 Sha1,
108 Sha256,
109}
110
111#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
112pub struct SignerConfig {
113 domain: String,
114 selector: String,
115 headers: Vec<String>,
116 #[serde(default)]
117 atps: Option<String>,
118 #[serde(default)]
119 atpsh: Option<HashAlgo>,
120 #[serde(default)]
121 agent_user_identifier: Option<String>,
122 #[serde(default)]
123 expiration: Option<u64>,
124 #[serde(default)]
125 body_length: bool,
126 #[serde(default)]
127 reporting: bool,
128 #[serde(default)]
129 header_canonicalization: Canon,
130 #[serde(default)]
131 body_canonicalization: Canon,
132
133 key: KeySource,
134 #[serde(default)]
135 over_sign: bool,
136
137 #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
138 ttl: Duration,
139}
140
141impl SignerConfig {
142 fn default_ttl() -> Duration {
143 Duration::from_secs(300)
144 }
145
146 fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
147 if self.atps.is_some() {
148 anyhow::bail!("atps is not currently supported for RSA keys");
149 }
150 if self.atpsh.is_some() {
151 anyhow::bail!("atpsh is not currently supported for RSA keys");
152 }
153 if self.agent_user_identifier.is_some() {
154 anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
155 }
156 if self.body_length {
157 anyhow::bail!("body_length is not currently supported for RSA keys");
158 }
159 if self.reporting {
160 anyhow::bail!("reporting is not currently supported for RSA keys");
161 }
162
163 let mut signer = kumo_dkim::SignerBuilder::new()
164 .with_signed_headers(&self.headers)
165 .context("configure signed headers")?
166 .with_private_key(key)
167 .with_selector(&self.selector)
168 .with_signing_domain(&self.domain)
169 .with_over_signing(self.over_sign)
170 .with_header_canonicalization(match self.header_canonicalization {
171 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
172 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
173 })
174 .with_body_canonicalization(match self.body_canonicalization {
175 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
176 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
177 });
178 if let Some(exp) = self.expiration {
179 signer =
180 signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
181 anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
182 })?);
183 }
184
185 signer.build().context("build signer")
186 }
187}
188
189pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
190
191#[derive(Clone)]
192#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
193pub struct Signer(Arc<CFSigner>);
194
195impl Signer {
196 pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
197 self.0.sign(message)
198 }
199}
200
201impl LuaUserData for Signer {}
202
203async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
204 KEY_CACHE_LOOKUP.inc();
205 if let Some(pkey) = KEY_CACHE.get(key) {
206 KEY_CACHE_HIT.inc();
207 return Ok(pkey);
208 }
209
210 KEY_CACHE_MISS.inc();
211 let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
212 let data = key.get().await?;
213
214 let pkey = Arc::new(DkimPrivateKey::key(&data)?);
215 key_fetch_timer.stop_and_record();
216
217 KEY_CACHE
218 .insert(key.clone(), pkey.clone(), Instant::now() + ttl)
219 .await;
220 Ok(pkey)
221}
222
223pub fn register(lua: &Lua) -> anyhow::Result<()> {
224 let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
225 dkim_mod.set(
226 "set_signing_threads",
227 lua.create_function(move |_lua, n: usize| {
228 let runtime = tokio::runtime::Builder::new_multi_thread()
229 .thread_name("dkimsign")
230 .worker_threads(1)
231 .max_blocking_threads(n)
232 .build()
233 .map_err(any_err)?;
234 SIGN_POOL
235 .set(runtime)
236 .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
237 println!("started dkimsign pool with {n} threads");
238 Ok(())
239 })?,
240 )?;
241 dkim_mod.set(
242 "rsa_sha256_signer",
243 lua.create_async_function(|lua, params: Value| async move {
244 let params: SignerConfig = from_lua_value(&lua, params)?;
245
246 SIGNER_CACHE_LOOKUP.inc();
247 if let Some(inner) = SIGNER_CACHE.get(¶ms) {
248 SIGNER_CACHE_HIT.inc();
249 return Ok(Signer(inner));
250 }
251 SIGNER_CACHE_MISS.inc();
252
253 let signer_creation_timer = SIGNER_CREATE.start_timer();
254
255 let key = cached_key_load(¶ms.key, params.ttl)
256 .await
257 .map_err(|err| mlua::Error::external(format!("{:?}: {err:#}", params.key)))?;
258
259 let signer = params
260 .configure_kumo_dkim(key)
261 .map_err(|err| mlua::Error::external(format!("{err:#}")))?;
262
263 let inner = Arc::new(CFSigner { signer });
264
265 let expiration = Instant::now() + params.ttl;
266 SIGNER_CACHE
267 .insert(params, Arc::clone(&inner), expiration)
268 .await;
269
270 signer_creation_timer.stop_and_record();
271 Ok(Signer(inner))
272 })?,
273 )?;
274
275 dkim_mod.set(
276 "ed25519_signer",
277 lua.create_async_function(|lua, params: Value| async move {
278 let params: SignerConfig = from_lua_value(&lua, params)?;
279
280 if let Some(inner) = SIGNER_CACHE.get(¶ms) {
281 return Ok(Signer(inner));
282 }
283
284 let signer_creation_timer = SIGNER_CREATE.start_timer();
285
286 let key = cached_key_load(¶ms.key, params.ttl)
287 .await
288 .map_err(|err| mlua::Error::external(format!("{:?}: {err:#}", params.key)))?;
289
290 let signer = params
291 .configure_kumo_dkim(key)
292 .map_err(|err| mlua::Error::external(format!("{err:#}")))?;
293
294 let inner = Arc::new(CFSigner { signer });
295
296 let expiration = Instant::now() + params.ttl;
297 SIGNER_CACHE
298 .insert(params, Arc::clone(&inner), expiration)
299 .await;
300
301 signer_creation_timer.stop_and_record();
302 Ok(Signer(inner))
303 })?,
304 )?;
305 Ok(())
306}
307
308#[derive(Debug)]
309pub struct CFSigner {
310 signer: kumo_dkim::Signer,
311}
312
313impl CFSigner {
314 fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
315 let parse_timer = SIGNER_PARSE.start_timer();
316 let message_str =
317 std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
318 let mail = kumo_dkim::ParsedEmail::parse(message_str)
319 .context("failed to parse message to pass to dkim signer")?;
320 parse_timer.stop_and_record();
321
322 let sign_timer = SIGNER_SIGN.start_timer();
323 let dkim_header = self.signer.sign(&mail)?;
324 sign_timer.stop_and_record();
325
326 Ok(dkim_header)
327 }
328}