kumo_api_types/
egress_path.rs

1use cidr_map::CidrSet;
2use data_loader::KeySource;
3use dns_resolver::IpLookupStrategy;
4#[cfg(feature = "lua")]
5use mlua::prelude::*;
6use openssl::ssl::SslOptions;
7use ordermap::OrderMap;
8use rfc5321::SmtpClientTimeouts;
9use rustls::crypto::aws_lc_rs::ALL_CIPHER_SUITES;
10use rustls::SupportedCipherSuite;
11use serde::{Deserialize, Deserializer, Serialize};
12use std::time::Duration;
13use throttle::{LimitSpec, ThrottleSpec};
14
15#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)]
16pub enum Tls {
17    /// Use it if available. If the peer has invalid or self-signed certificates, then
18    /// delivery will fail. Will NOT fallback to not using TLS if the peer advertises
19    /// STARTTLS.
20    Opportunistic,
21    /// Use it if available, and allow self-signed or otherwise invalid server certs.
22    /// Not recommended for sending to the public internet; this is for local/lab
23    /// testing scenarios only.
24    OpportunisticInsecure,
25    /// TLS with valid certs is required.
26    Required,
27    /// Required, and allow self-signed or otherwise invalid server certs.
28    /// Not recommended for sending to the public internet; this is for local/lab
29    /// testing scenarios only.
30    RequiredInsecure,
31    /// Do not try to use TLS
32    Disabled,
33}
34
35impl Tls {
36    pub fn allow_insecure(&self) -> bool {
37        match self {
38            Self::OpportunisticInsecure | Self::RequiredInsecure => true,
39            _ => false,
40        }
41    }
42
43    pub fn is_opportunistic(&self) -> bool {
44        match self {
45            Self::OpportunisticInsecure | Self::Opportunistic => true,
46            _ => false,
47        }
48    }
49}
50
51impl Default for Tls {
52    fn default() -> Self {
53        Self::Opportunistic
54    }
55}
56
57pub fn parse_openssl_options(option_list: &str) -> anyhow::Result<SslOptions> {
58    let mut result = SslOptions::empty();
59
60    for option in option_list.split('|') {
61        match SslOptions::from_name(option) {
62            Some(opt) => {
63                result.insert(opt);
64            }
65            None => {
66                let mut allowed: Vec<_> = SslOptions::all()
67                    .iter_names()
68                    .map(|(name, _)| format!("`{name}`"))
69                    .collect();
70                allowed.sort();
71                let allowed = allowed.join(", ");
72                anyhow::bail!(
73                    "`{option}` is not a valid SslOption name. \
74                    Possible values are {allowed} joined together by the pipe `|` character."
75                );
76            }
77        }
78    }
79
80    Ok(result)
81}
82
83fn deserialize_ssl_options<'de, D>(deserializer: D) -> Result<Option<SslOptions>, D::Error>
84where
85    D: Deserializer<'de>,
86{
87    use serde::de::Error;
88    let maybe_options = Option::<String>::deserialize(deserializer)?;
89
90    match maybe_options {
91        None => Ok(None),
92        Some(option_list) => match parse_openssl_options(&option_list) {
93            Ok(options) => Ok(Some(options)),
94            Err(err) => Err(D::Error::custom(format!("{err:#}"))),
95        },
96    }
97}
98
99fn deserialize_supported_ciphersuite<'de, D>(
100    deserializer: D,
101) -> Result<Vec<SupportedCipherSuite>, D::Error>
102where
103    D: Deserializer<'de>,
104{
105    use serde::de::Error;
106    let suites = Vec::<String>::deserialize(deserializer)?;
107    let mut result = vec![];
108
109    for s in suites {
110        match find_rustls_cipher_suite(&s) {
111            Some(s) => {
112                result.push(s);
113            }
114            None => {
115                return Err(D::Error::custom(format!(
116                    "`{s}` is not a valid rustls cipher suite"
117                )));
118            }
119        }
120    }
121
122    Ok(result)
123}
124
125pub fn find_rustls_cipher_suite(name: &str) -> Option<SupportedCipherSuite> {
126    for suite in ALL_CIPHER_SUITES {
127        let sname = format!("{:?}", suite.suite());
128        if sname.eq_ignore_ascii_case(name) {
129            return Some(*suite);
130        }
131    }
132    None
133}
134
135#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
136#[cfg_attr(feature = "lua", derive(FromLua))]
137pub enum WakeupStrategy {
138    #[default]
139    Aggressive,
140    Relaxed,
141}
142
143#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
144#[cfg_attr(feature = "lua", derive(FromLua))]
145pub enum MemoryReductionPolicy {
146    #[default]
147    ShrinkDataAndMeta,
148    ShrinkData,
149    NoShrink,
150}
151
152#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
153#[cfg_attr(feature = "lua", derive(FromLua))]
154pub enum ConfigRefreshStrategy {
155    #[default]
156    Ttl,
157    Epoch,
158}
159
160#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
161#[cfg_attr(feature = "lua", derive(FromLua))]
162pub enum ReconnectStrategy {
163    /// Close out the current connection session, allowing the maintainer
164    /// to decide about opening a new session and starting with a fresh
165    /// connection plan
166    TerminateSession,
167    /// Try to reconnect to the same host that we were using and where
168    /// we experienced the error
169    ReconnectSameHost,
170    /// Advance to the next host in the connection, if any. If none remain,
171    /// this is equivalent to TerminateSession
172    #[default]
173    ConnectNextHost,
174}
175
176#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
177#[cfg_attr(feature = "lua", derive(FromLua))]
178#[serde(deny_unknown_fields)]
179pub struct EgressPathConfig {
180    #[serde(default = "EgressPathConfig::default_connection_limit")]
181    pub connection_limit: LimitSpec,
182
183    #[serde(default)]
184    pub additional_connection_limits: OrderMap<String, LimitSpec>,
185
186    #[serde(default)]
187    pub enable_tls: Tls,
188
189    #[serde(default = "EgressPathConfig::default_enable_mta_sts")]
190    pub enable_mta_sts: bool,
191
192    #[serde(default = "EgressPathConfig::default_enable_dane")]
193    pub enable_dane: bool,
194
195    #[serde(default = "EgressPathConfig::default_enable_pipelining")]
196    pub enable_pipelining: bool,
197
198    #[serde(default = "EgressPathConfig::default_enable_rset")]
199    pub enable_rset: bool,
200
201    #[serde(default)]
202    pub tls_prefer_openssl: bool,
203
204    #[serde(default)]
205    pub tls_certificate: Option<KeySource>,
206
207    #[serde(default)]
208    pub tls_private_key: Option<KeySource>,
209
210    #[serde(default)]
211    pub openssl_cipher_list: Option<String>,
212    #[serde(default)]
213    pub openssl_cipher_suites: Option<String>,
214    #[serde(
215        default,
216        deserialize_with = "deserialize_ssl_options",
217        skip_serializing // FIXME
218    )]
219    pub openssl_options: Option<SslOptions>,
220
221    #[serde(
222        default,
223        deserialize_with = "deserialize_supported_ciphersuite",
224        skip_serializing // FIXME
225    )]
226    pub rustls_cipher_suites: Vec<SupportedCipherSuite>,
227
228    #[serde(flatten)]
229    pub client_timeouts: SmtpClientTimeouts,
230
231    /// How long to wait for an established session to gracefully
232    /// close when the system is shutting down. After this period
233    /// has elapsed, sessions will be aborted.
234    #[serde(default, with = "duration_serde")]
235    pub system_shutdown_timeout: Option<Duration>,
236
237    #[serde(default = "EgressPathConfig::default_max_ready")]
238    pub max_ready: usize,
239
240    #[serde(default = "EgressPathConfig::default_consecutive_connection_failures_before_delay")]
241    pub consecutive_connection_failures_before_delay: usize,
242
243    #[serde(default = "EgressPathConfig::default_smtp_port")]
244    pub smtp_port: u16,
245
246    #[serde(default)]
247    pub smtp_auth_plain_username: Option<String>,
248
249    #[serde(default)]
250    pub smtp_auth_plain_password: Option<KeySource>,
251
252    #[serde(default)]
253    pub allow_smtp_auth_plain_without_tls: bool,
254
255    #[serde(default)]
256    pub max_message_rate: Option<ThrottleSpec>,
257
258    #[serde(default)]
259    pub additional_message_rate_throttles: OrderMap<String, ThrottleSpec>,
260
261    #[serde(default)]
262    pub source_selection_rate: Option<ThrottleSpec>,
263
264    #[serde(default)]
265    pub additional_source_selection_rates: OrderMap<String, ThrottleSpec>,
266
267    #[serde(default)]
268    pub max_connection_rate: Option<ThrottleSpec>,
269
270    #[serde(default = "EgressPathConfig::default_max_deliveries_per_connection")]
271    pub max_deliveries_per_connection: usize,
272
273    #[serde(default = "EgressPathConfig::default_max_recipients_per_batch")]
274    pub max_recipients_per_batch: usize,
275
276    #[serde(default = "CidrSet::default_prohibited_hosts")]
277    pub prohibited_hosts: CidrSet,
278
279    #[serde(default)]
280    pub skip_hosts: CidrSet,
281
282    #[serde(default)]
283    pub ip_lookup_strategy: IpLookupStrategy,
284
285    #[serde(default)]
286    pub ehlo_domain: Option<String>,
287
288    // TODO: decide if we want to keep this and then document
289    #[serde(default)]
290    pub aggressive_connection_opening: bool,
291
292    /// How long to wait between calls to get_egress_path_config for
293    /// any given ready queue. Making this longer uses fewer
294    /// resources (in aggregate) but means that it will take longer
295    /// to detect and adjust to changes in the queue configuration.
296    #[serde(
297        default = "EgressPathConfig::default_refresh_interval",
298        with = "duration_serde"
299    )]
300    pub refresh_interval: Duration,
301    #[serde(default)]
302    pub refresh_strategy: ConfigRefreshStrategy,
303
304    #[serde(default)]
305    pub dispatcher_wakeup_strategy: WakeupStrategy,
306    #[serde(default)]
307    pub maintainer_wakeup_strategy: WakeupStrategy,
308
309    /// Specify an explicit provider name that should apply to this
310    /// path. The provider name will be used when computing metrics
311    /// rollups by provider. If omitted, then
312    #[serde(default)]
313    pub provider_name: Option<String>,
314
315    /// If set, a process-local cache will be used to remember if
316    /// a site has broken TLS for the duration specified.  Once
317    /// encountered, we will pretend that EHLO didn't advertise STARTTLS
318    /// on subsequent connection attempts.
319    #[serde(default, with = "duration_serde")]
320    pub remember_broken_tls: Option<Duration>,
321
322    /// If true, when a TLS handshake fails and TLS is set to
323    /// opportunistic, we will re-connect to that host with
324    /// TLS disabled.
325    #[serde(default)]
326    pub opportunistic_tls_reconnect_on_failed_handshake: bool,
327
328    /// If true, rather than ESMTP, use the LMTP protocol
329    #[serde(default)]
330    pub use_lmtp: bool,
331
332    /// How to behave if we experience either a 421 response, an IO Error,
333    /// or a timeout while talking to the peer.
334    #[serde(default)]
335    pub reconnect_strategy: ReconnectStrategy,
336
337    /// Which thread pool to use for processing the ready queue
338    #[serde(default)]
339    pub readyq_pool_name: Option<String>,
340
341    /// What to do to newly inserted messages when memory is low
342    #[serde(default)]
343    pub low_memory_reduction_policy: MemoryReductionPolicy,
344
345    /// What to do to newly inserted messages when memory is over the soft limit
346    #[serde(default)]
347    pub no_memory_reduction_policy: MemoryReductionPolicy,
348
349    /// If we experience a transport error during SMTP, should we retry the
350    /// current message on the next host in the connection plan, or
351    /// immediately consider it a transient failure for that message?
352    #[serde(default)]
353    pub try_next_host_on_transport_error: bool,
354
355    /// If true, don't check for 8bit compatibility issues during
356    /// sending, instead, leave it to the remote host to raise
357    /// an error.
358    #[serde(default)]
359    pub ignore_8bit_checks: bool,
360}
361
362#[cfg(feature = "lua")]
363impl LuaUserData for EgressPathConfig {
364    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
365        config::impl_pairs_and_index(methods);
366    }
367}
368
369impl Default for EgressPathConfig {
370    fn default() -> Self {
371        Self {
372            connection_limit: Self::default_connection_limit(),
373            tls_prefer_openssl: false,
374            enable_tls: Tls::default(),
375            enable_mta_sts: Self::default_enable_mta_sts(),
376            enable_dane: Self::default_enable_dane(),
377            enable_rset: Self::default_enable_rset(),
378            enable_pipelining: Self::default_enable_pipelining(),
379            max_ready: Self::default_max_ready(),
380            consecutive_connection_failures_before_delay:
381                Self::default_consecutive_connection_failures_before_delay(),
382            smtp_port: Self::default_smtp_port(),
383            max_message_rate: None,
384            max_connection_rate: None,
385            max_deliveries_per_connection: Self::default_max_deliveries_per_connection(),
386            max_recipients_per_batch: Self::default_max_recipients_per_batch(),
387            client_timeouts: SmtpClientTimeouts::default(),
388            system_shutdown_timeout: None,
389            prohibited_hosts: CidrSet::default_prohibited_hosts(),
390            skip_hosts: CidrSet::default(),
391            ehlo_domain: None,
392            allow_smtp_auth_plain_without_tls: false,
393            smtp_auth_plain_username: None,
394            smtp_auth_plain_password: None,
395            aggressive_connection_opening: false,
396            rustls_cipher_suites: vec![],
397            tls_certificate: None,
398            tls_private_key: None,
399            openssl_cipher_list: None,
400            openssl_cipher_suites: None,
401            openssl_options: None,
402            refresh_interval: Self::default_refresh_interval(),
403            refresh_strategy: ConfigRefreshStrategy::default(),
404            additional_message_rate_throttles: OrderMap::default(),
405            additional_connection_limits: OrderMap::default(),
406            source_selection_rate: None,
407            additional_source_selection_rates: OrderMap::default(),
408            provider_name: None,
409            remember_broken_tls: None,
410            opportunistic_tls_reconnect_on_failed_handshake: false,
411            use_lmtp: false,
412            reconnect_strategy: ReconnectStrategy::default(),
413            readyq_pool_name: None,
414            low_memory_reduction_policy: MemoryReductionPolicy::default(),
415            no_memory_reduction_policy: MemoryReductionPolicy::default(),
416            maintainer_wakeup_strategy: WakeupStrategy::default(),
417            dispatcher_wakeup_strategy: WakeupStrategy::default(),
418            try_next_host_on_transport_error: false,
419            ignore_8bit_checks: false,
420            ip_lookup_strategy: IpLookupStrategy::default(),
421        }
422    }
423}
424
425impl EgressPathConfig {
426    fn default_connection_limit() -> LimitSpec {
427        LimitSpec::new(32)
428    }
429
430    fn default_enable_mta_sts() -> bool {
431        true
432    }
433
434    fn default_enable_pipelining() -> bool {
435        true
436    }
437
438    fn default_enable_rset() -> bool {
439        true
440    }
441
442    fn default_enable_dane() -> bool {
443        false
444    }
445
446    fn default_max_ready() -> usize {
447        1024
448    }
449
450    fn default_consecutive_connection_failures_before_delay() -> usize {
451        100
452    }
453
454    fn default_smtp_port() -> u16 {
455        25
456    }
457
458    fn default_max_deliveries_per_connection() -> usize {
459        1024
460    }
461
462    fn default_max_recipients_per_batch() -> usize {
463        100
464    }
465
466    fn default_refresh_interval() -> Duration {
467        Duration::from_secs(60)
468    }
469}