message/
dkim.rs

1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use lruttl::declare_cache;
6use mlua::prelude::LuaUserData;
7use mlua::{Lua, Value};
8use prometheus::{Counter, Histogram};
9use serde::Deserialize;
10use std::sync::{Arc, LazyLock, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::{Duration, Instant};
13
14declare_cache! {
15static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
16}
17declare_cache! {
18static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
19}
20
21static SIGNER_KEY_FETCH: LazyLock<Histogram> = LazyLock::new(|| {
22    prometheus::register_histogram!(
23        "dkim_signer_key_fetch",
24        "how long it takes to obtain a dkim key"
25    )
26    .unwrap()
27});
28static SIGNER_CREATE: LazyLock<Histogram> = LazyLock::new(|| {
29    prometheus::register_histogram!(
30        "dkim_signer_creation",
31        "how long it takes to create a signer on a cache miss"
32    )
33    .unwrap()
34});
35static SIGNER_SIGN: LazyLock<Histogram> = LazyLock::new(|| {
36    prometheus::register_histogram!(
37        "dkim_signer_sign",
38        "how long it takes to dkim sign parsed messages"
39    )
40    .unwrap()
41});
42static SIGNER_PARSE: LazyLock<Histogram> = LazyLock::new(|| {
43    prometheus::register_histogram!(
44        "dkim_signer_message_parse",
45        "how long it takes to parse messages as prep for signing"
46    )
47    .unwrap()
48});
49static SIGNER_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
50    prometheus::register_counter!(
51        "dkim_signer_cache_hit",
52        "how many cache dkim signer requests hit cache"
53    )
54    .unwrap()
55});
56static SIGNER_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
57    prometheus::register_counter!(
58        "dkim_signer_cache_miss",
59        "how many cache dkim signer requests miss cache"
60    )
61    .unwrap()
62});
63static SIGNER_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
64    prometheus::register_counter!(
65        "dkim_signer_cache_lookup_count",
66        "how many cache dkim signer requests occurred"
67    )
68    .unwrap()
69});
70
71static KEY_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
72    prometheus::register_counter!(
73        "dkim_signer_key_cache_hit",
74        "how many cache dkim signer requests hit key cache"
75    )
76    .unwrap()
77});
78static KEY_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
79    prometheus::register_counter!(
80        "dkim_signer_key_cache_miss",
81        "how many cache dkim signer requests miss key cache"
82    )
83    .unwrap()
84});
85static KEY_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
86    prometheus::register_counter!(
87        "dkim_signer_key_cache_lookup_count",
88        "how many cache dkim key requests occurred"
89    )
90    .unwrap()
91});
92
93#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
94pub enum Canon {
95    Relaxed,
96    Simple,
97}
98
99impl Default for Canon {
100    fn default() -> Self {
101        Self::Relaxed
102    }
103}
104
105#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
106pub enum HashAlgo {
107    Sha1,
108    Sha256,
109}
110
111#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
112pub struct SignerConfig {
113    domain: String,
114    selector: String,
115    headers: Vec<String>,
116    #[serde(default)]
117    atps: Option<String>,
118    #[serde(default)]
119    atpsh: Option<HashAlgo>,
120    #[serde(default)]
121    agent_user_identifier: Option<String>,
122    #[serde(default)]
123    expiration: Option<u64>,
124    #[serde(default)]
125    body_length: bool,
126    #[serde(default)]
127    reporting: bool,
128    #[serde(default)]
129    header_canonicalization: Canon,
130    #[serde(default)]
131    body_canonicalization: Canon,
132
133    key: KeySource,
134    #[serde(default)]
135    over_sign: bool,
136
137    #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
138    ttl: Duration,
139}
140
141impl SignerConfig {
142    fn default_ttl() -> Duration {
143        Duration::from_secs(300)
144    }
145
146    fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
147        if self.atps.is_some() {
148            anyhow::bail!("atps is not currently supported for RSA keys");
149        }
150        if self.atpsh.is_some() {
151            anyhow::bail!("atpsh is not currently supported for RSA keys");
152        }
153        if self.agent_user_identifier.is_some() {
154            anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
155        }
156        if self.body_length {
157            anyhow::bail!("body_length is not currently supported for RSA keys");
158        }
159        if self.reporting {
160            anyhow::bail!("reporting is not currently supported for RSA keys");
161        }
162
163        let mut signer = kumo_dkim::SignerBuilder::new()
164            .with_signed_headers(&self.headers)
165            .context("configure signed headers")?
166            .with_private_key(key)
167            .with_selector(&self.selector)
168            .with_signing_domain(&self.domain)
169            .with_over_signing(self.over_sign)
170            .with_header_canonicalization(match self.header_canonicalization {
171                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
172                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
173            })
174            .with_body_canonicalization(match self.body_canonicalization {
175                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
176                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
177            });
178        if let Some(exp) = self.expiration {
179            signer =
180                signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
181                    anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
182                })?);
183        }
184
185        signer.build().context("build signer")
186    }
187}
188
189pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
190
191#[derive(Clone)]
192#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
193pub struct Signer(Arc<CFSigner>);
194
195impl Signer {
196    pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
197        self.0.sign(message)
198    }
199}
200
201impl LuaUserData for Signer {}
202
203async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
204    KEY_CACHE_LOOKUP.inc();
205    if let Some(pkey) = KEY_CACHE.get(key) {
206        KEY_CACHE_HIT.inc();
207        return Ok(pkey);
208    }
209
210    KEY_CACHE_MISS.inc();
211    let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
212    let data = key.get().await?;
213
214    let pkey = Arc::new(DkimPrivateKey::key(&data)?);
215    key_fetch_timer.stop_and_record();
216
217    KEY_CACHE
218        .insert(key.clone(), pkey.clone(), Instant::now() + ttl)
219        .await;
220    Ok(pkey)
221}
222
223pub fn register(lua: &Lua) -> anyhow::Result<()> {
224    let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
225    dkim_mod.set(
226        "set_signing_threads",
227        lua.create_function(move |_lua, n: usize| {
228            let runtime = tokio::runtime::Builder::new_multi_thread()
229                .thread_name("dkimsign")
230                .worker_threads(1)
231                .max_blocking_threads(n)
232                .build()
233                .map_err(any_err)?;
234            SIGN_POOL
235                .set(runtime)
236                .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
237            println!("started dkimsign pool with {n} threads");
238            Ok(())
239        })?,
240    )?;
241    dkim_mod.set(
242        "rsa_sha256_signer",
243        lua.create_async_function(|lua, params: Value| async move {
244            let params: SignerConfig = from_lua_value(&lua, params)?;
245
246            SIGNER_CACHE_LOOKUP.inc();
247            if let Some(inner) = SIGNER_CACHE.get(&params) {
248                SIGNER_CACHE_HIT.inc();
249                return Ok(Signer(inner));
250            }
251            SIGNER_CACHE_MISS.inc();
252
253            let signer_creation_timer = SIGNER_CREATE.start_timer();
254
255            let key = cached_key_load(&params.key, params.ttl)
256                .await
257                .map_err(|err| mlua::Error::external(format!("{:?}: {err:#}", params.key)))?;
258
259            let signer = params
260                .configure_kumo_dkim(key)
261                .map_err(|err| mlua::Error::external(format!("{err:#}")))?;
262
263            let inner = Arc::new(CFSigner { signer });
264
265            let expiration = Instant::now() + params.ttl;
266            SIGNER_CACHE
267                .insert(params, Arc::clone(&inner), expiration)
268                .await;
269
270            signer_creation_timer.stop_and_record();
271            Ok(Signer(inner))
272        })?,
273    )?;
274
275    dkim_mod.set(
276        "ed25519_signer",
277        lua.create_async_function(|lua, params: Value| async move {
278            let params: SignerConfig = from_lua_value(&lua, params)?;
279
280            if let Some(inner) = SIGNER_CACHE.get(&params) {
281                return Ok(Signer(inner));
282            }
283
284            let signer_creation_timer = SIGNER_CREATE.start_timer();
285
286            let key = cached_key_load(&params.key, params.ttl)
287                .await
288                .map_err(|err| mlua::Error::external(format!("{:?}: {err:#}", params.key)))?;
289
290            let signer = params
291                .configure_kumo_dkim(key)
292                .map_err(|err| mlua::Error::external(format!("{err:#}")))?;
293
294            let inner = Arc::new(CFSigner { signer });
295
296            let expiration = Instant::now() + params.ttl;
297            SIGNER_CACHE
298                .insert(params, Arc::clone(&inner), expiration)
299                .await;
300
301            signer_creation_timer.stop_and_record();
302            Ok(Signer(inner))
303        })?,
304    )?;
305    Ok(())
306}
307
308#[derive(Debug)]
309pub struct CFSigner {
310    signer: kumo_dkim::Signer,
311}
312
313impl CFSigner {
314    fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
315        let parse_timer = SIGNER_PARSE.start_timer();
316        let message_str =
317            std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
318        let mail = kumo_dkim::ParsedEmail::parse(message_str)
319            .context("failed to parse message to pass to dkim signer")?;
320        parse_timer.stop_and_record();
321
322        let sign_timer = SIGNER_SIGN.start_timer();
323        let dkim_header = self.signer.sign(&mail)?;
324        sign_timer.stop_and_record();
325
326        Ok(dkim_header)
327    }
328}