kumo_api_types/
egress_path.rs

1use cidr_map::CidrSet;
2use data_loader::KeySource;
3#[cfg(feature = "lua")]
4use mlua::prelude::*;
5use openssl::ssl::SslOptions;
6use ordermap::OrderMap;
7use rfc5321::SmtpClientTimeouts;
8use rustls::crypto::aws_lc_rs::ALL_CIPHER_SUITES;
9use rustls::SupportedCipherSuite;
10use serde::{Deserialize, Deserializer, Serialize};
11use std::time::Duration;
12use throttle::{LimitSpec, ThrottleSpec};
13
14#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)]
15pub enum Tls {
16    /// Use it if available. If the peer has invalid or self-signed certificates, then
17    /// delivery will fail. Will NOT fallback to not using TLS if the peer advertises
18    /// STARTTLS.
19    Opportunistic,
20    /// Use it if available, and allow self-signed or otherwise invalid server certs.
21    /// Not recommended for sending to the public internet; this is for local/lab
22    /// testing scenarios only.
23    OpportunisticInsecure,
24    /// TLS with valid certs is required.
25    Required,
26    /// Required, and allow self-signed or otherwise invalid server certs.
27    /// Not recommended for sending to the public internet; this is for local/lab
28    /// testing scenarios only.
29    RequiredInsecure,
30    /// Do not try to use TLS
31    Disabled,
32}
33
34impl Tls {
35    pub fn allow_insecure(&self) -> bool {
36        match self {
37            Self::OpportunisticInsecure | Self::RequiredInsecure => true,
38            _ => false,
39        }
40    }
41
42    pub fn is_opportunistic(&self) -> bool {
43        match self {
44            Self::OpportunisticInsecure | Self::Opportunistic => true,
45            _ => false,
46        }
47    }
48}
49
50impl Default for Tls {
51    fn default() -> Self {
52        Self::Opportunistic
53    }
54}
55
56pub fn parse_openssl_options(option_list: &str) -> anyhow::Result<SslOptions> {
57    let mut result = SslOptions::empty();
58
59    for option in option_list.split('|') {
60        match SslOptions::from_name(option) {
61            Some(opt) => {
62                result.insert(opt);
63            }
64            None => {
65                let mut allowed: Vec<_> = SslOptions::all()
66                    .iter_names()
67                    .map(|(name, _)| format!("`{name}`"))
68                    .collect();
69                allowed.sort();
70                let allowed = allowed.join(", ");
71                anyhow::bail!(
72                    "`{option}` is not a valid SslOption name. \
73                    Possible values are {allowed} joined together by the pipe `|` character."
74                );
75            }
76        }
77    }
78
79    Ok(result)
80}
81
82fn deserialize_ssl_options<'de, D>(deserializer: D) -> Result<Option<SslOptions>, D::Error>
83where
84    D: Deserializer<'de>,
85{
86    use serde::de::Error;
87    let maybe_options = Option::<String>::deserialize(deserializer)?;
88
89    match maybe_options {
90        None => Ok(None),
91        Some(option_list) => match parse_openssl_options(&option_list) {
92            Ok(options) => Ok(Some(options)),
93            Err(err) => Err(D::Error::custom(format!("{err:#}"))),
94        },
95    }
96}
97
98fn deserialize_supported_ciphersuite<'de, D>(
99    deserializer: D,
100) -> Result<Vec<SupportedCipherSuite>, D::Error>
101where
102    D: Deserializer<'de>,
103{
104    use serde::de::Error;
105    let suites = Vec::<String>::deserialize(deserializer)?;
106    let mut result = vec![];
107
108    for s in suites {
109        match find_rustls_cipher_suite(&s) {
110            Some(s) => {
111                result.push(s);
112            }
113            None => {
114                return Err(D::Error::custom(format!(
115                    "`{s}` is not a valid rustls cipher suite"
116                )));
117            }
118        }
119    }
120
121    Ok(result)
122}
123
124pub fn find_rustls_cipher_suite(name: &str) -> Option<SupportedCipherSuite> {
125    for suite in ALL_CIPHER_SUITES {
126        let sname = format!("{:?}", suite.suite());
127        if sname.eq_ignore_ascii_case(name) {
128            return Some(*suite);
129        }
130    }
131    None
132}
133
134#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
135#[cfg_attr(feature = "lua", derive(FromLua))]
136pub enum WakeupStrategy {
137    #[default]
138    Aggressive,
139    Relaxed,
140}
141
142#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
143#[cfg_attr(feature = "lua", derive(FromLua))]
144pub enum MemoryReductionPolicy {
145    #[default]
146    ShrinkDataAndMeta,
147    ShrinkData,
148    NoShrink,
149}
150
151#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
152#[cfg_attr(feature = "lua", derive(FromLua))]
153pub enum ConfigRefreshStrategy {
154    #[default]
155    Ttl,
156    Epoch,
157}
158
159#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
160#[cfg_attr(feature = "lua", derive(FromLua))]
161pub enum ReconnectStrategy {
162    /// Close out the current connection session, allowing the maintainer
163    /// to decide about opening a new session and starting with a fresh
164    /// connection plan
165    TerminateSession,
166    /// Try to reconnect to the same host that we were using and where
167    /// we experienced the error
168    ReconnectSameHost,
169    /// Advance to the next host in the connection, if any. If none remain,
170    /// this is equivalent to TerminateSession
171    #[default]
172    ConnectNextHost,
173}
174
175#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
176#[cfg_attr(feature = "lua", derive(FromLua))]
177#[serde(deny_unknown_fields)]
178pub struct EgressPathConfig {
179    #[serde(default = "EgressPathConfig::default_connection_limit")]
180    pub connection_limit: LimitSpec,
181
182    #[serde(default)]
183    pub additional_connection_limits: OrderMap<String, LimitSpec>,
184
185    #[serde(default)]
186    pub enable_tls: Tls,
187
188    #[serde(default = "EgressPathConfig::default_enable_mta_sts")]
189    pub enable_mta_sts: bool,
190
191    #[serde(default = "EgressPathConfig::default_enable_dane")]
192    pub enable_dane: bool,
193
194    #[serde(default = "EgressPathConfig::default_enable_pipelining")]
195    pub enable_pipelining: bool,
196
197    #[serde(default = "EgressPathConfig::default_enable_rset")]
198    pub enable_rset: bool,
199
200    #[serde(default)]
201    pub tls_prefer_openssl: bool,
202
203    #[serde(default)]
204    pub tls_certificate: Option<KeySource>,
205
206    #[serde(default)]
207    pub tls_private_key: Option<KeySource>,
208
209    #[serde(default)]
210    pub openssl_cipher_list: Option<String>,
211    #[serde(default)]
212    pub openssl_cipher_suites: Option<String>,
213    #[serde(
214        default,
215        deserialize_with = "deserialize_ssl_options",
216        skip_serializing // FIXME
217    )]
218    pub openssl_options: Option<SslOptions>,
219
220    #[serde(
221        default,
222        deserialize_with = "deserialize_supported_ciphersuite",
223        skip_serializing // FIXME
224    )]
225    pub rustls_cipher_suites: Vec<SupportedCipherSuite>,
226
227    #[serde(flatten)]
228    pub client_timeouts: SmtpClientTimeouts,
229
230    /// How long to wait for an established session to gracefully
231    /// close when the system is shutting down. After this period
232    /// has elapsed, sessions will be aborted.
233    #[serde(default, with = "duration_serde")]
234    pub system_shutdown_timeout: Option<Duration>,
235
236    #[serde(default = "EgressPathConfig::default_max_ready")]
237    pub max_ready: usize,
238
239    #[serde(default = "EgressPathConfig::default_consecutive_connection_failures_before_delay")]
240    pub consecutive_connection_failures_before_delay: usize,
241
242    #[serde(default = "EgressPathConfig::default_smtp_port")]
243    pub smtp_port: u16,
244
245    #[serde(default)]
246    pub smtp_auth_plain_username: Option<String>,
247
248    #[serde(default)]
249    pub smtp_auth_plain_password: Option<KeySource>,
250
251    #[serde(default)]
252    pub allow_smtp_auth_plain_without_tls: bool,
253
254    #[serde(default)]
255    pub max_message_rate: Option<ThrottleSpec>,
256
257    #[serde(default)]
258    pub additional_message_rate_throttles: OrderMap<String, ThrottleSpec>,
259
260    #[serde(default)]
261    pub source_selection_rate: Option<ThrottleSpec>,
262
263    #[serde(default)]
264    pub additional_source_selection_rates: OrderMap<String, ThrottleSpec>,
265
266    #[serde(default)]
267    pub max_connection_rate: Option<ThrottleSpec>,
268
269    #[serde(default = "EgressPathConfig::default_max_deliveries_per_connection")]
270    pub max_deliveries_per_connection: usize,
271
272    #[serde(default = "EgressPathConfig::default_max_recipients_per_batch")]
273    pub max_recipients_per_batch: usize,
274
275    #[serde(default = "CidrSet::default_prohibited_hosts")]
276    pub prohibited_hosts: CidrSet,
277
278    #[serde(default)]
279    pub skip_hosts: CidrSet,
280
281    #[serde(default)]
282    pub ehlo_domain: Option<String>,
283
284    // TODO: decide if we want to keep this and then document
285    #[serde(default)]
286    pub aggressive_connection_opening: bool,
287
288    /// How long to wait between calls to get_egress_path_config for
289    /// any given ready queue. Making this longer uses fewer
290    /// resources (in aggregate) but means that it will take longer
291    /// to detect and adjust to changes in the queue configuration.
292    #[serde(
293        default = "EgressPathConfig::default_refresh_interval",
294        with = "duration_serde"
295    )]
296    pub refresh_interval: Duration,
297    #[serde(default)]
298    pub refresh_strategy: ConfigRefreshStrategy,
299
300    #[serde(default)]
301    pub dispatcher_wakeup_strategy: WakeupStrategy,
302    #[serde(default)]
303    pub maintainer_wakeup_strategy: WakeupStrategy,
304
305    /// Specify an explicit provider name that should apply to this
306    /// path. The provider name will be used when computing metrics
307    /// rollups by provider. If omitted, then
308    #[serde(default)]
309    pub provider_name: Option<String>,
310
311    /// If set, a process-local cache will be used to remember if
312    /// a site has broken TLS for the duration specified.  Once
313    /// encountered, we will pretend that EHLO didn't advertise STARTTLS
314    /// on subsequent connection attempts.
315    #[serde(default, with = "duration_serde")]
316    pub remember_broken_tls: Option<Duration>,
317
318    /// If true, when a TLS handshake fails and TLS is set to
319    /// opportunistic, we will re-connect to that host with
320    /// TLS disabled.
321    #[serde(default)]
322    pub opportunistic_tls_reconnect_on_failed_handshake: bool,
323
324    /// If true, rather than ESMTP, use the LMTP protocol
325    #[serde(default)]
326    pub use_lmtp: bool,
327
328    /// How to behave if we experience either a 421 response, an IO Error,
329    /// or a timeout while talking to the peer.
330    #[serde(default)]
331    pub reconnect_strategy: ReconnectStrategy,
332
333    /// Which thread pool to use for processing the ready queue
334    #[serde(default)]
335    pub readyq_pool_name: Option<String>,
336
337    /// What to do to newly inserted messages when memory is low
338    #[serde(default)]
339    pub low_memory_reduction_policy: MemoryReductionPolicy,
340
341    /// What to do to newly inserted messages when memory is over the soft limit
342    #[serde(default)]
343    pub no_memory_reduction_policy: MemoryReductionPolicy,
344
345    /// If we experience a transport error during SMTP, should we retry the
346    /// current message on the next host in the connection plan, or
347    /// immediately consider it a transient failure for that message?
348    #[serde(default)]
349    pub try_next_host_on_transport_error: bool,
350
351    /// If true, don't check for 8bit compatibility issues during
352    /// sending, instead, leave it to the remote host to raise
353    /// an error.
354    #[serde(default)]
355    pub ignore_8bit_checks: bool,
356}
357
358#[cfg(feature = "lua")]
359impl LuaUserData for EgressPathConfig {
360    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
361        config::impl_pairs_and_index(methods);
362    }
363}
364
365impl Default for EgressPathConfig {
366    fn default() -> Self {
367        Self {
368            connection_limit: Self::default_connection_limit(),
369            tls_prefer_openssl: false,
370            enable_tls: Tls::default(),
371            enable_mta_sts: Self::default_enable_mta_sts(),
372            enable_dane: Self::default_enable_dane(),
373            enable_rset: Self::default_enable_rset(),
374            enable_pipelining: Self::default_enable_pipelining(),
375            max_ready: Self::default_max_ready(),
376            consecutive_connection_failures_before_delay:
377                Self::default_consecutive_connection_failures_before_delay(),
378            smtp_port: Self::default_smtp_port(),
379            max_message_rate: None,
380            max_connection_rate: None,
381            max_deliveries_per_connection: Self::default_max_deliveries_per_connection(),
382            max_recipients_per_batch: Self::default_max_recipients_per_batch(),
383            client_timeouts: SmtpClientTimeouts::default(),
384            system_shutdown_timeout: None,
385            prohibited_hosts: CidrSet::default_prohibited_hosts(),
386            skip_hosts: CidrSet::default(),
387            ehlo_domain: None,
388            allow_smtp_auth_plain_without_tls: false,
389            smtp_auth_plain_username: None,
390            smtp_auth_plain_password: None,
391            aggressive_connection_opening: false,
392            rustls_cipher_suites: vec![],
393            tls_certificate: None,
394            tls_private_key: None,
395            openssl_cipher_list: None,
396            openssl_cipher_suites: None,
397            openssl_options: None,
398            refresh_interval: Self::default_refresh_interval(),
399            refresh_strategy: ConfigRefreshStrategy::default(),
400            additional_message_rate_throttles: OrderMap::default(),
401            additional_connection_limits: OrderMap::default(),
402            source_selection_rate: None,
403            additional_source_selection_rates: OrderMap::default(),
404            provider_name: None,
405            remember_broken_tls: None,
406            opportunistic_tls_reconnect_on_failed_handshake: false,
407            use_lmtp: false,
408            reconnect_strategy: ReconnectStrategy::default(),
409            readyq_pool_name: None,
410            low_memory_reduction_policy: MemoryReductionPolicy::default(),
411            no_memory_reduction_policy: MemoryReductionPolicy::default(),
412            maintainer_wakeup_strategy: WakeupStrategy::default(),
413            dispatcher_wakeup_strategy: WakeupStrategy::default(),
414            try_next_host_on_transport_error: false,
415            ignore_8bit_checks: false,
416        }
417    }
418}
419
420impl EgressPathConfig {
421    fn default_connection_limit() -> LimitSpec {
422        LimitSpec::new(32)
423    }
424
425    fn default_enable_mta_sts() -> bool {
426        true
427    }
428
429    fn default_enable_pipelining() -> bool {
430        true
431    }
432
433    fn default_enable_rset() -> bool {
434        true
435    }
436
437    fn default_enable_dane() -> bool {
438        false
439    }
440
441    fn default_max_ready() -> usize {
442        1024
443    }
444
445    fn default_consecutive_connection_failures_before_delay() -> usize {
446        100
447    }
448
449    fn default_smtp_port() -> u16 {
450        25
451    }
452
453    fn default_max_deliveries_per_connection() -> usize {
454        1024
455    }
456
457    fn default_max_recipients_per_batch() -> usize {
458        100
459    }
460
461    fn default_refresh_interval() -> Duration {
462        Duration::from_secs(60)
463    }
464}