message/
dkim.rs

1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use lruttl::declare_cache;
6use mlua::prelude::LuaUserData;
7use mlua::{Lua, Value};
8use prometheus::{Counter, Histogram};
9use serde::Deserialize;
10use std::sync::{Arc, LazyLock, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::Duration;
13
14declare_cache! {
15/// Caches dkim signer specs to signer objects
16static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
17}
18declare_cache! {
19/// Caches dkim loaded signing keys based on their KeySource spec
20static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
21}
22
23static SIGNER_KEY_FETCH: LazyLock<Histogram> = LazyLock::new(|| {
24    prometheus::register_histogram!(
25        "dkim_signer_key_fetch",
26        "how long it takes to obtain a dkim key"
27    )
28    .unwrap()
29});
30static SIGNER_CREATE: LazyLock<Histogram> = LazyLock::new(|| {
31    prometheus::register_histogram!(
32        "dkim_signer_creation",
33        "how long it takes to create a signer on a cache miss"
34    )
35    .unwrap()
36});
37static SIGNER_SIGN: LazyLock<Histogram> = LazyLock::new(|| {
38    prometheus::register_histogram!(
39        "dkim_signer_sign",
40        "how long it takes to dkim sign parsed messages"
41    )
42    .unwrap()
43});
44static SIGNER_PARSE: LazyLock<Histogram> = LazyLock::new(|| {
45    prometheus::register_histogram!(
46        "dkim_signer_message_parse",
47        "how long it takes to parse messages as prep for signing"
48    )
49    .unwrap()
50});
51static SIGNER_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
52    prometheus::register_counter!(
53        "dkim_signer_cache_hit",
54        "how many cache dkim signer requests hit cache"
55    )
56    .unwrap()
57});
58static SIGNER_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
59    prometheus::register_counter!(
60        "dkim_signer_cache_miss",
61        "how many cache dkim signer requests miss cache"
62    )
63    .unwrap()
64});
65static SIGNER_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
66    prometheus::register_counter!(
67        "dkim_signer_cache_lookup_count",
68        "how many cache dkim signer requests occurred"
69    )
70    .unwrap()
71});
72
73static KEY_CACHE_HIT: LazyLock<Counter> = LazyLock::new(|| {
74    prometheus::register_counter!(
75        "dkim_signer_key_cache_hit",
76        "how many cache dkim signer requests hit key cache"
77    )
78    .unwrap()
79});
80static KEY_CACHE_MISS: LazyLock<Counter> = LazyLock::new(|| {
81    prometheus::register_counter!(
82        "dkim_signer_key_cache_miss",
83        "how many cache dkim signer requests miss key cache"
84    )
85    .unwrap()
86});
87static KEY_CACHE_LOOKUP: LazyLock<Counter> = LazyLock::new(|| {
88    prometheus::register_counter!(
89        "dkim_signer_key_cache_lookup_count",
90        "how many cache dkim key requests occurred"
91    )
92    .unwrap()
93});
94
95#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
96pub enum Canon {
97    Relaxed,
98    Simple,
99}
100
101impl Default for Canon {
102    fn default() -> Self {
103        Self::Relaxed
104    }
105}
106
107#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
108pub enum HashAlgo {
109    Sha1,
110    Sha256,
111}
112
113#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
114pub struct SignerConfig {
115    domain: String,
116    selector: String,
117    headers: Vec<String>,
118    #[serde(default)]
119    atps: Option<String>,
120    #[serde(default)]
121    atpsh: Option<HashAlgo>,
122    #[serde(default)]
123    agent_user_identifier: Option<String>,
124    #[serde(default)]
125    expiration: Option<u64>,
126    #[serde(default)]
127    body_length: bool,
128    #[serde(default)]
129    reporting: bool,
130    #[serde(default)]
131    header_canonicalization: Canon,
132    #[serde(default)]
133    body_canonicalization: Canon,
134
135    key: KeySource,
136    #[serde(default)]
137    over_sign: bool,
138
139    #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
140    ttl: Duration,
141}
142
143impl SignerConfig {
144    fn default_ttl() -> Duration {
145        Duration::from_secs(300)
146    }
147
148    fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
149        if self.atps.is_some() {
150            anyhow::bail!("atps is not currently supported for RSA keys");
151        }
152        if self.atpsh.is_some() {
153            anyhow::bail!("atpsh is not currently supported for RSA keys");
154        }
155        if self.agent_user_identifier.is_some() {
156            anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
157        }
158        if self.body_length {
159            anyhow::bail!("body_length is not currently supported for RSA keys");
160        }
161        if self.reporting {
162            anyhow::bail!("reporting is not currently supported for RSA keys");
163        }
164
165        let mut signer = kumo_dkim::SignerBuilder::new()
166            .with_signed_headers(&self.headers)
167            .context("configure signed headers")?
168            .with_private_key(key)
169            .with_selector(&self.selector)
170            .with_signing_domain(&self.domain)
171            .with_over_signing(self.over_sign)
172            .with_header_canonicalization(match self.header_canonicalization {
173                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
174                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
175            })
176            .with_body_canonicalization(match self.body_canonicalization {
177                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
178                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
179            });
180        if let Some(exp) = self.expiration {
181            signer =
182                signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
183                    anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
184                })?);
185        }
186
187        signer.build().context("build signer")
188    }
189}
190
191pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
192
193#[derive(Clone)]
194#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
195pub struct Signer(Arc<CFSigner>);
196
197impl Signer {
198    pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
199        self.0.sign(message)
200    }
201
202    pub fn signer(&self) -> &kumo_dkim::Signer {
203        self.0.signer()
204    }
205}
206
207impl LuaUserData for Signer {}
208
209async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
210    KEY_CACHE_LOOKUP.inc();
211    KEY_CACHE
212        .get_or_try_insert(key, |_| ttl, async {
213            let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
214            let data = key.get().await?;
215            let pkey = Arc::new(DkimPrivateKey::key(&data)?);
216            key_fetch_timer.stop_and_record();
217            Ok::<Arc<DkimPrivateKey>, anyhow::Error>(pkey)
218        })
219        .await
220        .map_err(|err| anyhow::anyhow!("{err:#}"))
221        .map(|lookup| {
222            if !lookup.is_fresh {
223                KEY_CACHE_HIT.inc();
224            } else {
225                KEY_CACHE_MISS.inc();
226            }
227            lookup.item
228        })
229}
230
231pub fn register(lua: &Lua) -> anyhow::Result<()> {
232    let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
233    dkim_mod.set(
234        "set_signing_threads",
235        lua.create_function(move |_lua, n: usize| {
236            let runtime = tokio::runtime::Builder::new_multi_thread()
237                .thread_name("dkimsign")
238                .worker_threads(1)
239                .max_blocking_threads(n)
240                .build()
241                .map_err(any_err)?;
242            SIGN_POOL
243                .set(runtime)
244                .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
245            println!("started dkimsign pool with {n} threads");
246            Ok(())
247        })?,
248    )?;
249
250    async fn generic_signer_ctor(lua: Lua, params: Value) -> mlua::Result<Signer> {
251        let params: SignerConfig = from_lua_value(&lua, params)?;
252
253        SIGNER_CACHE_LOOKUP.inc();
254        SIGNER_CACHE
255            .get_or_try_insert(&params, |_| params.ttl, async {
256                let signer_creation_timer = SIGNER_CREATE.start_timer();
257
258                let key = cached_key_load(&params.key, params.ttl)
259                    .await
260                    .map_err(|err| anyhow::anyhow!("{:?}: {err:#}", params.key))?;
261
262                let signer = params
263                    .configure_kumo_dkim(key)
264                    .map_err(|err| anyhow::anyhow!("{err:#}"))?;
265
266                let inner = Arc::new(CFSigner { signer });
267
268                signer_creation_timer.stop_and_record();
269                Ok::<Arc<CFSigner>, anyhow::Error>(inner)
270            })
271            .await
272            .map_err(any_err)
273            .map(|lookup| {
274                if !lookup.is_fresh {
275                    SIGNER_CACHE_HIT.inc();
276                } else {
277                    SIGNER_CACHE_MISS.inc();
278                }
279                Signer(lookup.item)
280            })
281    }
282
283    dkim_mod.set(
284        "rsa_sha256_signer",
285        lua.create_async_function(generic_signer_ctor)?,
286    )?;
287
288    dkim_mod.set(
289        "ed25519_signer",
290        lua.create_async_function(generic_signer_ctor)?,
291    )?;
292    Ok(())
293}
294
295#[derive(Debug)]
296pub struct CFSigner {
297    signer: kumo_dkim::Signer,
298}
299
300impl CFSigner {
301    fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
302        let parse_timer = SIGNER_PARSE.start_timer();
303        let message_str =
304            std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
305        let mail = kumo_dkim::ParsedEmail::parse(message_str)
306            .context("failed to parse message to pass to dkim signer")?;
307        parse_timer.stop_and_record();
308
309        let sign_timer = SIGNER_SIGN.start_timer();
310        let dkim_header = self.signer.sign(&mail)?;
311        sign_timer.stop_and_record();
312
313        Ok(dkim_header)
314    }
315
316    fn signer(&self) -> &kumo_dkim::Signer {
317        &self.signer
318    }
319}