message/
dkim.rs

1use anyhow::Context;
2use config::{any_err, from_lua_value, get_or_create_sub_module};
3use data_loader::KeySource;
4use kumo_dkim::DkimPrivateKey;
5use kumo_prometheus::declare_metric;
6use lruttl::declare_cache;
7use mlua::prelude::LuaUserData;
8use mlua::{Lua, Value};
9use serde::Deserialize;
10use std::sync::{Arc, OnceLock};
11use tokio::runtime::Runtime;
12use tokio::time::Duration;
13
14declare_cache! {
15/// Caches dkim signer specs to signer objects
16static SIGNER_CACHE: LruCacheWithTtl<SignerConfig, Arc<CFSigner>>::new("dkim_signer_cache", 1024);
17}
18declare_cache! {
19/// Caches dkim loaded signing keys based on their KeySource spec
20static KEY_CACHE: LruCacheWithTtl<KeySource, Arc<DkimPrivateKey>>::new("dkim_key_cache", 1024);
21}
22
23declare_metric! {
24/// How long it takes to obtain a dkim key.
25///
26/// This measures that time that it takes to load dkim
27/// private keys from whatever storage medium is configured.
28static SIGNER_KEY_FETCH: Histogram("dkim_signer_key_fetch");
29}
30
31declare_metric! {
32/// How many seconds it takes to create a signer on a cache miss.
33static SIGNER_CREATE: Histogram("dkim_signer_creation");
34}
35
36declare_metric! {
37/// How many seconds it takes to dkim sign parsed messages.
38///
39/// Long durations may indicate that the system is over-provisioned
40/// and has insufficient CPU.  You should check whether and how you
41/// might have configured
42/// [kumo.dkim.set_signing_threads](../../kumo.dkim/set_signing_threads.md).
43static SIGNER_SIGN: Histogram("dkim_signer_sign");
44}
45
46declare_metric! {
47/// How many seconds it takes to parse messages as prep for signing.
48///
49/// Long durations may simply indicate that you have very large
50/// messages passing through the system.
51static SIGNER_PARSE: Histogram("dkim_signer_message_parse");
52}
53
54declare_metric! {
55/// How many dkim signer requests hit cache.
56///
57/// This is redundant with the newer
58/// [lruttl_hit_count{cache_name="dkim_signer_cache"}](lruttl_hit_count.md)
59/// counter.
60static SIGNER_CACHE_HIT: IntCounter("dkim_signer_cache_hit");
61}
62
63declare_metric! {
64/// How many dkim signer requests miss cache.
65///
66/// This is redundant with the newer
67/// [lruttl_miss_count{cache_name="dkim_signer_cache"}](lruttl_miss_count.md)
68/// counter.
69static SIGNER_CACHE_MISS: IntCounter("dkim_signer_cache_miss");
70}
71
72declare_metric! {
73/// How many cache dkim signer requests occurred.
74///
75/// This is redundant with the newer
76/// [lruttl_lookup_count{cache_name="dkim_signer_cache"}](lruttl_lookup_count.md)
77/// counter.
78static SIGNER_CACHE_LOOKUP: IntCounter("dkim_signer_cache_lookup_count");
79}
80
81declare_metric! {
82/// How many cache dkim signer requests hit key cache.
83///
84/// This is redundant with the newer
85/// [lruttl_hit_count{cache_name="dkim_key_cache"}](lruttl_hit_count.md)
86/// counter.
87static KEY_CACHE_HIT: IntCounter("dkim_signer_key_cache_hit");
88}
89
90declare_metric! {
91/// How many cache dkim signer requests miss key cache.
92///
93/// This is redundant with the newer
94/// [lruttl_miss_count{cache_name="dkim_key_cache"}](lruttl_miss_count.md)
95/// counter.
96static KEY_CACHE_MISS: IntCounter("dkim_signer_key_cache_miss");
97}
98
99declare_metric! {
100/// How many cache dkim key requests occurred.
101///
102/// This is redundant with the newer
103/// [lruttl_lookup_count{cache_name="dkim_key_cache"}](lruttl_lookup_count.md)
104/// counter.
105static KEY_CACHE_LOOKUP: IntCounter("dkim_signer_key_cache_lookup_count");
106}
107
108#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
109pub enum Canon {
110    Relaxed,
111    Simple,
112}
113
114impl Default for Canon {
115    fn default() -> Self {
116        Self::Relaxed
117    }
118}
119
120#[derive(Deserialize, Hash, Eq, PartialEq, Copy, Clone, Debug)]
121pub enum HashAlgo {
122    Sha1,
123    Sha256,
124}
125
126#[derive(Deserialize, Hash, PartialEq, Eq, Clone, Debug)]
127pub struct SignerConfig {
128    domain: String,
129    selector: String,
130    headers: Vec<String>,
131    #[serde(default)]
132    atps: Option<String>,
133    #[serde(default)]
134    atpsh: Option<HashAlgo>,
135    #[serde(default)]
136    agent_user_identifier: Option<String>,
137    #[serde(default)]
138    expiration: Option<u64>,
139    #[serde(default)]
140    body_length: bool,
141    #[serde(default)]
142    reporting: bool,
143    #[serde(default)]
144    header_canonicalization: Canon,
145    #[serde(default)]
146    body_canonicalization: Canon,
147
148    key: KeySource,
149    #[serde(default)]
150    over_sign: bool,
151
152    #[serde(default = "SignerConfig::default_ttl", with = "duration_serde")]
153    ttl: Duration,
154}
155
156impl SignerConfig {
157    fn default_ttl() -> Duration {
158        Duration::from_secs(300)
159    }
160
161    fn configure_kumo_dkim(&self, key: Arc<DkimPrivateKey>) -> anyhow::Result<kumo_dkim::Signer> {
162        if self.atps.is_some() {
163            anyhow::bail!("atps is not currently supported for RSA keys");
164        }
165        if self.atpsh.is_some() {
166            anyhow::bail!("atpsh is not currently supported for RSA keys");
167        }
168        if self.agent_user_identifier.is_some() {
169            anyhow::bail!("agent_user_identifier is not currently supported for RSA keys");
170        }
171        if self.body_length {
172            anyhow::bail!("body_length is not currently supported for RSA keys");
173        }
174        if self.reporting {
175            anyhow::bail!("reporting is not currently supported for RSA keys");
176        }
177
178        let mut signer = kumo_dkim::SignerBuilder::new()
179            .with_signed_headers(&self.headers)
180            .context("configure signed headers")?
181            .with_private_key(key)
182            .with_selector(&self.selector)
183            .with_signing_domain(&self.domain)
184            .with_over_signing(self.over_sign)
185            .with_header_canonicalization(match self.header_canonicalization {
186                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
187                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
188            })
189            .with_body_canonicalization(match self.body_canonicalization {
190                Canon::Relaxed => kumo_dkim::canonicalization::Type::Relaxed,
191                Canon::Simple => kumo_dkim::canonicalization::Type::Simple,
192            });
193        if let Some(exp) = self.expiration {
194            signer =
195                signer.with_expiry(chrono::Duration::try_seconds(exp as i64).ok_or_else(|| {
196                    anyhow::anyhow!("{exp} is out of range for chrono::Duration::try_seconds")
197                })?);
198        }
199
200        signer.build().context("build signer")
201    }
202}
203
204pub static SIGN_POOL: OnceLock<Runtime> = OnceLock::new();
205
206#[derive(Clone)]
207#[cfg_attr(feature = "impl", derive(mlua::FromLua))]
208pub struct Signer(Arc<CFSigner>);
209
210impl Signer {
211    pub fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
212        self.0.sign(message)
213    }
214
215    pub fn signer(&self) -> &kumo_dkim::Signer {
216        self.0.signer()
217    }
218}
219
220impl LuaUserData for Signer {}
221
222async fn cached_key_load(key: &KeySource, ttl: Duration) -> anyhow::Result<Arc<DkimPrivateKey>> {
223    KEY_CACHE_LOOKUP.inc();
224    KEY_CACHE
225        .get_or_try_insert(key, |_| ttl, async {
226            let key_fetch_timer = SIGNER_KEY_FETCH.start_timer();
227            let data = key.get().await?;
228            let pkey = Arc::new(DkimPrivateKey::key(&data)?);
229            key_fetch_timer.stop_and_record();
230            Ok::<Arc<DkimPrivateKey>, anyhow::Error>(pkey)
231        })
232        .await
233        .map_err(|err| anyhow::anyhow!("{err:#}"))
234        .map(|lookup| {
235            if !lookup.is_fresh {
236                KEY_CACHE_HIT.inc();
237            } else {
238                KEY_CACHE_MISS.inc();
239            }
240            lookup.item
241        })
242}
243
244pub fn register(lua: &Lua) -> anyhow::Result<()> {
245    let dkim_mod = get_or_create_sub_module(lua, "dkim")?;
246    dkim_mod.set(
247        "set_signing_threads",
248        lua.create_function(move |_lua, n: usize| {
249            let runtime = tokio::runtime::Builder::new_multi_thread()
250                .thread_name("dkimsign")
251                .worker_threads(1)
252                .max_blocking_threads(n)
253                .build()
254                .map_err(any_err)?;
255            SIGN_POOL
256                .set(runtime)
257                .map_err(|_| mlua::Error::external("dkimsign pool is already configured"))?;
258            println!("started dkimsign pool with {n} threads");
259            Ok(())
260        })?,
261    )?;
262
263    async fn generic_signer_ctor(lua: Lua, params: Value) -> mlua::Result<Signer> {
264        let params: SignerConfig = from_lua_value(&lua, params)?;
265
266        SIGNER_CACHE_LOOKUP.inc();
267        SIGNER_CACHE
268            .get_or_try_insert(&params, |_| params.ttl, async {
269                let signer_creation_timer = SIGNER_CREATE.start_timer();
270
271                let key = cached_key_load(&params.key, params.ttl)
272                    .await
273                    .map_err(|err| anyhow::anyhow!("{:?}: {err:#}", params.key))?;
274
275                let signer = params
276                    .configure_kumo_dkim(key)
277                    .map_err(|err| anyhow::anyhow!("{err:#}"))?;
278
279                let inner = Arc::new(CFSigner { signer });
280
281                signer_creation_timer.stop_and_record();
282                Ok::<Arc<CFSigner>, anyhow::Error>(inner)
283            })
284            .await
285            .map_err(any_err)
286            .map(|lookup| {
287                if !lookup.is_fresh {
288                    SIGNER_CACHE_HIT.inc();
289                } else {
290                    SIGNER_CACHE_MISS.inc();
291                }
292                Signer(lookup.item)
293            })
294    }
295
296    dkim_mod.set(
297        "rsa_sha256_signer",
298        lua.create_async_function(generic_signer_ctor)?,
299    )?;
300
301    dkim_mod.set(
302        "ed25519_signer",
303        lua.create_async_function(generic_signer_ctor)?,
304    )?;
305    Ok(())
306}
307
308#[derive(Debug)]
309pub struct CFSigner {
310    signer: kumo_dkim::Signer,
311}
312
313impl CFSigner {
314    fn sign(&self, message: &[u8]) -> anyhow::Result<String> {
315        let parse_timer = SIGNER_PARSE.start_timer();
316        let message_str =
317            std::str::from_utf8(message).context("DKIM signer: message is not ASCII or UTF-8")?;
318        let mail = kumo_dkim::ParsedEmail::parse(message_str)
319            .context("failed to parse message to pass to dkim signer")?;
320        parse_timer.stop_and_record();
321
322        let sign_timer = SIGNER_SIGN.start_timer();
323        let dkim_header = self.signer.sign(&mail)?;
324        sign_timer.stop_and_record();
325
326        Ok(dkim_header)
327    }
328
329    fn signer(&self) -> &kumo_dkim::Signer {
330        &self.signer
331    }
332}