1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use kumo_prometheus::declare_metric;
6use lruttl::declare_cache;
7use mlua::prelude::LuaUserData;
8use mlua::{Lua, Value};
9use serde::Deserialize;
10use std::sync::{Arc, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::Duration;
13
14declare_cache! {
15static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
17}
18declare_cache! {
19static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
21}
22
23declare_metric! {
24static SIGNER_KEY_FETCH: Histogram("dkim_signer_key_fetch");
29}
30
31declare_metric! {
32static SIGNER_CREATE: Histogram("dkim_signer_creation");
34}
35
36declare_metric! {
37static SIGNER_SIGN: Histogram("dkim_signer_sign");
44}
45
46declare_metric! {
47static SIGNER_PARSE: Histogram("dkim_signer_message_parse");
52}
53
54declare_metric! {
55static SIGNER_CACHE_HIT: IntCounter("dkim_signer_cache_hit");
61}
62
63declare_metric! {
64static SIGNER_CACHE_MISS: IntCounter("dkim_signer_cache_miss");
70}
71
72declare_metric! {
73static SIGNER_CACHE_LOOKUP: IntCounter("dkim_signer_cache_lookup_count");
79}
80
81declare_metric! {
82static KEY_CACHE_HIT: IntCounter("dkim_signer_key_cache_hit");
88}
89
90declare_metric! {
91static KEY_CACHE_MISS: IntCounter("dkim_signer_key_cache_miss");
97}
98
99declare_metric! {
100static KEY_CACHE_LOOKUP: IntCounter("dkim_signer_key_cache_lookup_count");
106}
107
108#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
109pub enum Canon {
110 Relaxed,
111 Simple,
112}
113
114impl Default for Canon {
115 fn default() -> Self {
116 Self::Relaxed
117 }
118}
119
120#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
121pub enum HashAlgo {
122 Sha1,
123 Sha256,
124}
125
126#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
127pub struct SignerConfig {
128 domain: String,
129 selector: String,
130 headers: Vec<String>,
131 #[serde(default)]
132 atps: Option<String>,
133 #[serde(default)]
134 atpsh: Option<HashAlgo>,
135 #[serde(default)]
136 agent_user_identifier: Option<String>,
137 #[serde(default)]
138 expiration: Option<u64>,
139 #[serde(default)]
140 body_length: bool,
141 #[serde(default)]
142 reporting: bool,
143 #[serde(default)]
144 header_canonicalization: Canon,
145 #[serde(default)]
146 body_canonicalization: Canon,
147
148 key: KeySource,
149 #[serde(default)]
150 over_sign: bool,
151
152 #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
153 ttl: Duration,
154}
155
156impl SignerConfig {
157 fn default_ttl() -> Duration {
158 Duration::from_secs(300)
159 }
160
161 fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
162 if self.atps.is_some() {
163 anyhow::bail!("atps is not currently supported for RSA keys");
164 }
165 if self.atpsh.is_some() {
166 anyhow::bail!("atpsh is not currently supported for RSA keys");
167 }
168 if self.agent_user_identifier.is_some() {
169 anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
170 }
171 if self.body_length {
172 anyhow::bail!("body_length is not currently supported for RSA keys");
173 }
174 if self.reporting {
175 anyhow::bail!("reporting is not currently supported for RSA keys");
176 }
177
178 let mut signer = kumo_dkim::SignerBuilder::new()
179 .with_signed_headers(&self.headers)
180 .context("configure signed headers")?
181 .with_private_key(key)
182 .with_selector(&self.selector)
183 .with_signing_domain(&self.domain)
184 .with_over_signing(self.over_sign)
185 .with_header_canonicalization(match self.header_canonicalization {
186 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
187 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
188 })
189 .with_body_canonicalization(match self.body_canonicalization {
190 Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
191 Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
192 });
193 if let Some(exp) = self.expiration {
194 signer =
195 signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
196 anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
197 })?);
198 }
199
200 signer.build().context("build signer")
201 }
202}
203
204pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
205
206#[derive(Clone)]
207#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
208pub struct Signer(Arc<CFSigner>);
209
210impl Signer {
211 pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
212 self.0.sign(message)
213 }
214
215 pub fn signer(&self) -> &kumo_dkim::Signer {
216 self.0.signer()
217 }
218}
219
220impl LuaUserData for Signer {}
221
222async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
223 KEY_CACHE_LOOKUP.inc();
224 KEY_CACHE
225 .get_or_try_insert(key, |_| ttl, async {
226 let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
227 let data = key.get().await?;
228 let pkey = Arc::new(DkimPrivateKey::key(&data)?);
229 key_fetch_timer.stop_and_record();
230 Ok::<Arc<DkimPrivateKey>, anyhow::Error>(pkey)
231 })
232 .await
233 .map_err(|err| anyhow::anyhow!("{err:#}"))
234 .map(|lookup| {
235 if !lookup.is_fresh {
236 KEY_CACHE_HIT.inc();
237 } else {
238 KEY_CACHE_MISS.inc();
239 }
240 lookup.item
241 })
242}
243
244pub fn register(lua: &Lua) -> anyhow::Result<()> {
245 let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
246 dkim_mod.set(
247 "set_signing_threads",
248 lua.create_function(move |_lua, n: usize| {
249 let runtime = tokio::runtime::Builder::new_multi_thread()
250 .thread_name("dkimsign")
251 .worker_threads(1)
252 .max_blocking_threads(n)
253 .build()
254 .map_err(any_err)?;
255 SIGN_POOL
256 .set(runtime)
257 .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
258 println!("started dkimsign pool with {n} threads");
259 Ok(())
260 })?,
261 )?;
262
263 async fn generic_signer_ctor(lua: Lua, params: Value) -> mlua::Result<Signer> {
264 let params: SignerConfig = from_lua_value(&lua, params)?;
265
266 SIGNER_CACHE_LOOKUP.inc();
267 SIGNER_CACHE
268 .get_or_try_insert(¶ms, |_| params.ttl, async {
269 let signer_creation_timer = SIGNER_CREATE.start_timer();
270
271 let key = cached_key_load(¶ms.key, params.ttl)
272 .await
273 .map_err(|err| anyhow::anyhow!("{:?}: {err:#}", params.key))?;
274
275 let signer = params
276 .configure_kumo_dkim(key)
277 .map_err(|err| anyhow::anyhow!("{err:#}"))?;
278
279 let inner = Arc::new(CFSigner { signer });
280
281 signer_creation_timer.stop_and_record();
282 Ok::<Arc<CFSigner>, anyhow::Error>(inner)
283 })
284 .await
285 .map_err(any_err)
286 .map(|lookup| {
287 if !lookup.is_fresh {
288 SIGNER_CACHE_HIT.inc();
289 } else {
290 SIGNER_CACHE_MISS.inc();
291 }
292 Signer(lookup.item)
293 })
294 }
295
296 dkim_mod.set(
297 "rsa_sha256_signer",
298 lua.create_async_function(generic_signer_ctor)?,
299 )?;
300
301 dkim_mod.set(
302 "ed25519_signer",
303 lua.create_async_function(generic_signer_ctor)?,
304 )?;
305 Ok(())
306}
307
308#[derive(Debug)]
309pub struct CFSigner {
310 signer: kumo_dkim::Signer,
311}
312
313impl CFSigner {
314 fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
315 let parse_timer = SIGNER_PARSE.start_timer();
316 let message_str =
317 std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
318 let mail = kumo_dkim::ParsedEmail::parse(message_str)
319 .context("failed to parse message to pass to dkim signer")?;
320 parse_timer.stop_and_record();
321
322 let sign_timer = SIGNER_SIGN.start_timer();
323 let dkim_header = self.signer.sign(&mail)?;
324 sign_timer.stop_and_record();
325
326 Ok(dkim_header)
327 }
328
329 fn signer(&self) -> &kumo_dkim::Signer {
330 &self.signer
331 }
332}