kumo_api_types/
egress_path.rs

1use cidr_map::CidrSet;
2use data_loader::KeySource;
3#[cfg(feature = "lua")]
4use mlua::prelude::*;
5use openssl::ssl::SslOptions;
6use ordermap::OrderMap;
7use rfc5321::SmtpClientTimeouts;
8use rustls::crypto::aws_lc_rs::ALL_CIPHER_SUITES;
9use rustls::SupportedCipherSuite;
10use serde::{Deserialize, Deserializer, Serialize};
11use std::time::Duration;
12use throttle::{LimitSpec, ThrottleSpec};
13
14#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)]
15pub enum Tls {
16    /// Use it if available. If the peer has invalid or self-signed certificates, then
17    /// delivery will fail. Will NOT fallback to not using TLS if the peer advertises
18    /// STARTTLS.
19    Opportunistic,
20    /// Use it if available, and allow self-signed or otherwise invalid server certs.
21    /// Not recommended for sending to the public internet; this is for local/lab
22    /// testing scenarios only.
23    OpportunisticInsecure,
24    /// TLS with valid certs is required.
25    Required,
26    /// Required, and allow self-signed or otherwise invalid server certs.
27    /// Not recommended for sending to the public internet; this is for local/lab
28    /// testing scenarios only.
29    RequiredInsecure,
30    /// Do not try to use TLS
31    Disabled,
32}
33
34impl Tls {
35    pub fn allow_insecure(&self) -> bool {
36        match self {
37            Self::OpportunisticInsecure | Self::RequiredInsecure => true,
38            _ => false,
39        }
40    }
41
42    pub fn is_opportunistic(&self) -> bool {
43        match self {
44            Self::OpportunisticInsecure | Self::Opportunistic => true,
45            _ => false,
46        }
47    }
48}
49
50impl Default for Tls {
51    fn default() -> Self {
52        Self::Opportunistic
53    }
54}
55
56pub fn parse_openssl_options(option_list: &str) -> anyhow::Result<SslOptions> {
57    let mut result = SslOptions::empty();
58
59    for option in option_list.split('|') {
60        match SslOptions::from_name(option) {
61            Some(opt) => {
62                result.insert(opt);
63            }
64            None => {
65                let mut allowed: Vec<_> = SslOptions::all()
66                    .iter_names()
67                    .map(|(name, _)| format!("`{name}`"))
68                    .collect();
69                allowed.sort();
70                let allowed = allowed.join(", ");
71                anyhow::bail!(
72                    "`{option}` is not a valid SslOption name. \
73                    Possible values are {allowed} joined together by the pipe `|` character."
74                );
75            }
76        }
77    }
78
79    Ok(result)
80}
81
82fn deserialize_ssl_options<'de, D>(deserializer: D) -> Result<Option<SslOptions>, D::Error>
83where
84    D: Deserializer<'de>,
85{
86    use serde::de::Error;
87    let maybe_options = Option::<String>::deserialize(deserializer)?;
88
89    match maybe_options {
90        None => Ok(None),
91        Some(option_list) => match parse_openssl_options(&option_list) {
92            Ok(options) => Ok(Some(options)),
93            Err(err) => Err(D::Error::custom(format!("{err:#}"))),
94        },
95    }
96}
97
98fn deserialize_supported_ciphersuite<'de, D>(
99    deserializer: D,
100) -> Result<Vec<SupportedCipherSuite>, D::Error>
101where
102    D: Deserializer<'de>,
103{
104    use serde::de::Error;
105    let suites = Vec::<String>::deserialize(deserializer)?;
106    let mut result = vec![];
107
108    for s in suites {
109        match find_rustls_cipher_suite(&s) {
110            Some(s) => {
111                result.push(s);
112            }
113            None => {
114                return Err(D::Error::custom(format!(
115                    "`{s}` is not a valid rustls cipher suite"
116                )));
117            }
118        }
119    }
120
121    Ok(result)
122}
123
124pub fn find_rustls_cipher_suite(name: &str) -> Option<SupportedCipherSuite> {
125    for suite in ALL_CIPHER_SUITES {
126        let sname = format!("{:?}", suite.suite());
127        if sname.eq_ignore_ascii_case(name) {
128            return Some(*suite);
129        }
130    }
131    None
132}
133
134#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
135#[cfg_attr(feature = "lua", derive(FromLua))]
136pub enum WakeupStrategy {
137    #[default]
138    Aggressive,
139    Relaxed,
140}
141
142#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
143#[cfg_attr(feature = "lua", derive(FromLua))]
144pub enum MemoryReductionPolicy {
145    #[default]
146    ShrinkDataAndMeta,
147    ShrinkData,
148    NoShrink,
149}
150
151#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
152#[cfg_attr(feature = "lua", derive(FromLua))]
153pub enum ConfigRefreshStrategy {
154    #[default]
155    Ttl,
156    Epoch,
157}
158
159#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
160#[cfg_attr(feature = "lua", derive(FromLua))]
161pub enum ReconnectStrategy {
162    /// Close out the current connection session, allowing the maintainer
163    /// to decide about opening a new session and starting with a fresh
164    /// connection plan
165    TerminateSession,
166    /// Try to reconnect to the same host that we were using and where
167    /// we experienced the error
168    ReconnectSameHost,
169    /// Advance to the next host in the connection, if any. If none remain,
170    /// this is equivalent to TerminateSession
171    #[default]
172    ConnectNextHost,
173}
174
175#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
176#[cfg_attr(feature = "lua", derive(FromLua))]
177#[serde(deny_unknown_fields)]
178pub struct EgressPathConfig {
179    #[serde(default = "EgressPathConfig::default_connection_limit")]
180    pub connection_limit: LimitSpec,
181
182    #[serde(default)]
183    pub additional_connection_limits: OrderMap<String, LimitSpec>,
184
185    #[serde(default)]
186    pub enable_tls: Tls,
187
188    #[serde(default = "EgressPathConfig::default_enable_mta_sts")]
189    pub enable_mta_sts: bool,
190
191    #[serde(default = "EgressPathConfig::default_enable_dane")]
192    pub enable_dane: bool,
193
194    #[serde(default = "EgressPathConfig::default_enable_pipelining")]
195    pub enable_pipelining: bool,
196
197    #[serde(default = "EgressPathConfig::default_enable_rset")]
198    pub enable_rset: bool,
199
200    #[serde(default)]
201    pub tls_prefer_openssl: bool,
202
203    #[serde(default)]
204    pub tls_certificate: Option<KeySource>,
205
206    #[serde(default)]
207    pub tls_private_key: Option<KeySource>,
208
209    #[serde(default)]
210    pub openssl_cipher_list: Option<String>,
211    #[serde(default)]
212    pub openssl_cipher_suites: Option<String>,
213    #[serde(
214        default,
215        deserialize_with = "deserialize_ssl_options",
216        skip_serializing // FIXME
217    )]
218    pub openssl_options: Option<SslOptions>,
219
220    #[serde(
221        default,
222        deserialize_with = "deserialize_supported_ciphersuite",
223        skip_serializing // FIXME
224    )]
225    pub rustls_cipher_suites: Vec<SupportedCipherSuite>,
226
227    #[serde(flatten)]
228    pub client_timeouts: SmtpClientTimeouts,
229
230    /// How long to wait for an established session to gracefully
231    /// close when the system is shutting down. After this period
232    /// has elapsed, sessions will be aborted.
233    #[serde(default, with = "duration_serde")]
234    pub system_shutdown_timeout: Option<Duration>,
235
236    #[serde(default = "EgressPathConfig::default_max_ready")]
237    pub max_ready: usize,
238
239    #[serde(default = "EgressPathConfig::default_consecutive_connection_failures_before_delay")]
240    pub consecutive_connection_failures_before_delay: usize,
241
242    #[serde(default = "EgressPathConfig::default_smtp_port")]
243    pub smtp_port: u16,
244
245    #[serde(default)]
246    pub smtp_auth_plain_username: Option<String>,
247
248    #[serde(default)]
249    pub smtp_auth_plain_password: Option<KeySource>,
250
251    #[serde(default)]
252    pub allow_smtp_auth_plain_without_tls: bool,
253
254    #[serde(default)]
255    pub max_message_rate: Option<ThrottleSpec>,
256
257    #[serde(default)]
258    pub additional_message_rate_throttles: OrderMap<String, ThrottleSpec>,
259
260    #[serde(default)]
261    pub source_selection_rate: Option<ThrottleSpec>,
262
263    #[serde(default)]
264    pub additional_source_selection_rates: OrderMap<String, ThrottleSpec>,
265
266    #[serde(default)]
267    pub max_connection_rate: Option<ThrottleSpec>,
268
269    #[serde(default = "EgressPathConfig::default_max_deliveries_per_connection")]
270    pub max_deliveries_per_connection: usize,
271
272    #[serde(default = "CidrSet::default_prohibited_hosts")]
273    pub prohibited_hosts: CidrSet,
274
275    #[serde(default)]
276    pub skip_hosts: CidrSet,
277
278    #[serde(default)]
279    pub ehlo_domain: Option<String>,
280
281    // TODO: decide if we want to keep this and then document
282    #[serde(default)]
283    pub aggressive_connection_opening: bool,
284
285    /// How long to wait between calls to get_egress_path_config for
286    /// any given ready queue. Making this longer uses fewer
287    /// resources (in aggregate) but means that it will take longer
288    /// to detect and adjust to changes in the queue configuration.
289    #[serde(
290        default = "EgressPathConfig::default_refresh_interval",
291        with = "duration_serde"
292    )]
293    pub refresh_interval: Duration,
294    #[serde(default)]
295    pub refresh_strategy: ConfigRefreshStrategy,
296
297    #[serde(default)]
298    pub dispatcher_wakeup_strategy: WakeupStrategy,
299    #[serde(default)]
300    pub maintainer_wakeup_strategy: WakeupStrategy,
301
302    /// Specify an explicit provider name that should apply to this
303    /// path. The provider name will be used when computing metrics
304    /// rollups by provider. If omitted, then
305    #[serde(default)]
306    pub provider_name: Option<String>,
307
308    /// If set, a process-local cache will be used to remember if
309    /// a site has broken TLS for the duration specified.  Once
310    /// encountered, we will pretend that EHLO didn't advertise STARTTLS
311    /// on subsequent connection attempts.
312    #[serde(default, with = "duration_serde")]
313    pub remember_broken_tls: Option<Duration>,
314
315    /// If true, when a TLS handshake fails and TLS is set to
316    /// opportunistic, we will re-connect to that host with
317    /// TLS disabled.
318    #[serde(default)]
319    pub opportunistic_tls_reconnect_on_failed_handshake: bool,
320
321    /// If true, rather than ESMTP, use the LMTP protocol
322    #[serde(default)]
323    pub use_lmtp: bool,
324
325    /// How to behave if we experience either a 421 response, an IO Error,
326    /// or a timeout while talking to the peer.
327    #[serde(default)]
328    pub reconnect_strategy: ReconnectStrategy,
329
330    /// Which thread pool to use for processing the ready queue
331    #[serde(default)]
332    pub readyq_pool_name: Option<String>,
333
334    /// What to do to newly inserted messages when memory is low
335    #[serde(default)]
336    pub low_memory_reduction_policy: MemoryReductionPolicy,
337
338    /// What to do to newly inserted messages when memory is over the soft limit
339    #[serde(default)]
340    pub no_memory_reduction_policy: MemoryReductionPolicy,
341
342    /// If we experience a transport error during SMTP, should we retry the
343    /// current message on the next host in the connection plan, or
344    /// immediately consider it a transient failure for that message?
345    #[serde(default)]
346    pub try_next_host_on_transport_error: bool,
347}
348
349#[cfg(feature = "lua")]
350impl LuaUserData for EgressPathConfig {
351    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
352        config::impl_pairs_and_index(methods);
353    }
354}
355
356impl Default for EgressPathConfig {
357    fn default() -> Self {
358        Self {
359            connection_limit: Self::default_connection_limit(),
360            tls_prefer_openssl: false,
361            enable_tls: Tls::default(),
362            enable_mta_sts: Self::default_enable_mta_sts(),
363            enable_dane: Self::default_enable_dane(),
364            enable_rset: Self::default_enable_rset(),
365            enable_pipelining: Self::default_enable_pipelining(),
366            max_ready: Self::default_max_ready(),
367            consecutive_connection_failures_before_delay:
368                Self::default_consecutive_connection_failures_before_delay(),
369            smtp_port: Self::default_smtp_port(),
370            max_message_rate: None,
371            max_connection_rate: None,
372            max_deliveries_per_connection: Self::default_max_deliveries_per_connection(),
373            client_timeouts: SmtpClientTimeouts::default(),
374            system_shutdown_timeout: None,
375            prohibited_hosts: CidrSet::default_prohibited_hosts(),
376            skip_hosts: CidrSet::default(),
377            ehlo_domain: None,
378            allow_smtp_auth_plain_without_tls: false,
379            smtp_auth_plain_username: None,
380            smtp_auth_plain_password: None,
381            aggressive_connection_opening: false,
382            rustls_cipher_suites: vec![],
383            tls_certificate: None,
384            tls_private_key: None,
385            openssl_cipher_list: None,
386            openssl_cipher_suites: None,
387            openssl_options: None,
388            refresh_interval: Self::default_refresh_interval(),
389            refresh_strategy: ConfigRefreshStrategy::default(),
390            additional_message_rate_throttles: OrderMap::default(),
391            additional_connection_limits: OrderMap::default(),
392            source_selection_rate: None,
393            additional_source_selection_rates: OrderMap::default(),
394            provider_name: None,
395            remember_broken_tls: None,
396            opportunistic_tls_reconnect_on_failed_handshake: false,
397            use_lmtp: false,
398            reconnect_strategy: ReconnectStrategy::default(),
399            readyq_pool_name: None,
400            low_memory_reduction_policy: MemoryReductionPolicy::default(),
401            no_memory_reduction_policy: MemoryReductionPolicy::default(),
402            maintainer_wakeup_strategy: WakeupStrategy::default(),
403            dispatcher_wakeup_strategy: WakeupStrategy::default(),
404            try_next_host_on_transport_error: false,
405        }
406    }
407}
408
409impl EgressPathConfig {
410    fn default_connection_limit() -> LimitSpec {
411        LimitSpec::new(32)
412    }
413
414    fn default_enable_mta_sts() -> bool {
415        true
416    }
417
418    fn default_enable_pipelining() -> bool {
419        true
420    }
421
422    fn default_enable_rset() -> bool {
423        true
424    }
425
426    fn default_enable_dane() -> bool {
427        false
428    }
429
430    fn default_max_ready() -> usize {
431        1024
432    }
433
434    fn default_consecutive_connection_failures_before_delay() -> usize {
435        100
436    }
437
438    fn default_smtp_port() -> u16 {
439        25
440    }
441
442    fn default_max_deliveries_per_connection() -> usize {
443        1024
444    }
445
446    fn default_refresh_interval() -> Duration {
447        Duration::from_secs(60)
448    }
449}