kumo_api_types/
egress_path.rs

1use cidr_map::CidrSet;
2use data_loader::KeySource;
3#[cfg(feature = "lua")]
4use mlua::prelude::*;
5use openssl::ssl::SslOptions;
6use ordermap::OrderMap;
7use rfc5321::SmtpClientTimeouts;
8use rustls::crypto::aws_lc_rs::ALL_CIPHER_SUITES;
9use rustls::SupportedCipherSuite;
10use serde::{Deserialize, Deserializer, Serialize};
11use std::time::Duration;
12use throttle::{LimitSpec, ThrottleSpec};
13
14#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)]
15pub enum Tls {
16    /// Use it if available. If the peer has invalid or self-signed certificates, then
17    /// delivery will fail. Will NOT fallback to not using TLS if the peer advertises
18    /// STARTTLS.
19    Opportunistic,
20    /// Use it if available, and allow self-signed or otherwise invalid server certs.
21    /// Not recommended for sending to the public internet; this is for local/lab
22    /// testing scenarios only.
23    OpportunisticInsecure,
24    /// TLS with valid certs is required.
25    Required,
26    /// Required, and allow self-signed or otherwise invalid server certs.
27    /// Not recommended for sending to the public internet; this is for local/lab
28    /// testing scenarios only.
29    RequiredInsecure,
30    /// Do not try to use TLS
31    Disabled,
32}
33
34impl Tls {
35    pub fn allow_insecure(&self) -> bool {
36        match self {
37            Self::OpportunisticInsecure | Self::RequiredInsecure => true,
38            _ => false,
39        }
40    }
41
42    pub fn is_opportunistic(&self) -> bool {
43        match self {
44            Self::OpportunisticInsecure | Self::Opportunistic => true,
45            _ => false,
46        }
47    }
48}
49
50impl Default for Tls {
51    fn default() -> Self {
52        Self::Opportunistic
53    }
54}
55
56pub fn parse_openssl_options(option_list: &str) -> anyhow::Result<SslOptions> {
57    let mut result = SslOptions::empty();
58
59    for option in option_list.split('|') {
60        match SslOptions::from_name(option) {
61            Some(opt) => {
62                result.insert(opt);
63            }
64            None => {
65                let mut allowed: Vec<_> = SslOptions::all()
66                    .iter_names()
67                    .map(|(name, _)| format!("`{name}`"))
68                    .collect();
69                allowed.sort();
70                let allowed = allowed.join(", ");
71                anyhow::bail!(
72                    "`{option}` is not a valid SslOption name. \
73                    Possible values are {allowed} joined together by the pipe `|` character."
74                );
75            }
76        }
77    }
78
79    Ok(result)
80}
81
82fn deserialize_ssl_options<'de, D>(deserializer: D) -> Result<Option<SslOptions>, D::Error>
83where
84    D: Deserializer<'de>,
85{
86    use serde::de::Error;
87    let maybe_options = Option::<String>::deserialize(deserializer)?;
88
89    match maybe_options {
90        None => Ok(None),
91        Some(option_list) => match parse_openssl_options(&option_list) {
92            Ok(options) => Ok(Some(options)),
93            Err(err) => Err(D::Error::custom(format!("{err:#}"))),
94        },
95    }
96}
97
98fn deserialize_supported_ciphersuite<'de, D>(
99    deserializer: D,
100) -> Result<Vec<SupportedCipherSuite>, D::Error>
101where
102    D: Deserializer<'de>,
103{
104    use serde::de::Error;
105    let suites = Vec::<String>::deserialize(deserializer)?;
106    let mut result = vec![];
107
108    for s in suites {
109        match find_rustls_cipher_suite(&s) {
110            Some(s) => {
111                result.push(s);
112            }
113            None => {
114                return Err(D::Error::custom(format!(
115                    "`{s}` is not a valid rustls cipher suite"
116                )));
117            }
118        }
119    }
120
121    Ok(result)
122}
123
124pub fn find_rustls_cipher_suite(name: &str) -> Option<SupportedCipherSuite> {
125    for suite in ALL_CIPHER_SUITES {
126        let sname = format!("{:?}", suite.suite());
127        if sname.eq_ignore_ascii_case(name) {
128            return Some(*suite);
129        }
130    }
131    None
132}
133
134#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
135#[cfg_attr(feature = "lua", derive(FromLua))]
136pub enum WakeupStrategy {
137    #[default]
138    Aggressive,
139    Relaxed,
140}
141
142#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
143#[cfg_attr(feature = "lua", derive(FromLua))]
144pub enum MemoryReductionPolicy {
145    #[default]
146    ShrinkDataAndMeta,
147    ShrinkData,
148    NoShrink,
149}
150
151#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
152#[cfg_attr(feature = "lua", derive(FromLua))]
153pub enum ConfigRefreshStrategy {
154    #[default]
155    Ttl,
156    Epoch,
157}
158
159#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
160#[cfg_attr(feature = "lua", derive(FromLua))]
161pub enum ReconnectStrategy {
162    /// Close out the current connection session, allowing the maintainer
163    /// to decide about opening a new session and starting with a fresh
164    /// connection plan
165    TerminateSession,
166    /// Try to reconnect to the same host that we were using and where
167    /// we experienced the error
168    ReconnectSameHost,
169    /// Advance to the next host in the connection, if any. If none remain,
170    /// this is equivalent to TerminateSession
171    #[default]
172    ConnectNextHost,
173}
174
175#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
176#[cfg_attr(feature = "lua", derive(FromLua))]
177#[serde(deny_unknown_fields)]
178pub struct EgressPathConfig {
179    #[serde(default = "EgressPathConfig::default_connection_limit")]
180    pub connection_limit: LimitSpec,
181
182    #[serde(default)]
183    pub additional_connection_limits: OrderMap<String, LimitSpec>,
184
185    #[serde(default)]
186    pub enable_tls: Tls,
187
188    #[serde(default = "EgressPathConfig::default_enable_mta_sts")]
189    pub enable_mta_sts: bool,
190
191    #[serde(default = "EgressPathConfig::default_enable_dane")]
192    pub enable_dane: bool,
193
194    #[serde(default = "EgressPathConfig::default_enable_pipelining")]
195    pub enable_pipelining: bool,
196
197    #[serde(default = "EgressPathConfig::default_enable_rset")]
198    pub enable_rset: bool,
199
200    #[serde(default)]
201    pub tls_prefer_openssl: bool,
202
203    #[serde(default)]
204    pub openssl_cipher_list: Option<String>,
205    #[serde(default)]
206    pub openssl_cipher_suites: Option<String>,
207    #[serde(
208        default,
209        deserialize_with = "deserialize_ssl_options",
210        skip_serializing // FIXME
211    )]
212    pub openssl_options: Option<SslOptions>,
213
214    #[serde(
215        default,
216        deserialize_with = "deserialize_supported_ciphersuite",
217        skip_serializing // FIXME
218    )]
219    pub rustls_cipher_suites: Vec<SupportedCipherSuite>,
220
221    #[serde(flatten)]
222    pub client_timeouts: SmtpClientTimeouts,
223
224    /// How long to wait for an established session to gracefully
225    /// close when the system is shutting down. After this period
226    /// has elapsed, sessions will be aborted.
227    #[serde(default, with = "duration_serde")]
228    pub system_shutdown_timeout: Option<Duration>,
229
230    #[serde(default = "EgressPathConfig::default_max_ready")]
231    pub max_ready: usize,
232
233    #[serde(default = "EgressPathConfig::default_consecutive_connection_failures_before_delay")]
234    pub consecutive_connection_failures_before_delay: usize,
235
236    #[serde(default = "EgressPathConfig::default_smtp_port")]
237    pub smtp_port: u16,
238
239    #[serde(default)]
240    pub smtp_auth_plain_username: Option<String>,
241
242    #[serde(default)]
243    pub smtp_auth_plain_password: Option<KeySource>,
244
245    #[serde(default)]
246    pub allow_smtp_auth_plain_without_tls: bool,
247
248    #[serde(default)]
249    pub max_message_rate: Option<ThrottleSpec>,
250
251    #[serde(default)]
252    pub additional_message_rate_throttles: OrderMap<String, ThrottleSpec>,
253
254    #[serde(default)]
255    pub source_selection_rate: Option<ThrottleSpec>,
256
257    #[serde(default)]
258    pub additional_source_selection_rates: OrderMap<String, ThrottleSpec>,
259
260    #[serde(default)]
261    pub max_connection_rate: Option<ThrottleSpec>,
262
263    #[serde(default = "EgressPathConfig::default_max_deliveries_per_connection")]
264    pub max_deliveries_per_connection: usize,
265
266    #[serde(default = "CidrSet::default_prohibited_hosts")]
267    pub prohibited_hosts: CidrSet,
268
269    #[serde(default)]
270    pub skip_hosts: CidrSet,
271
272    #[serde(default)]
273    pub ehlo_domain: Option<String>,
274
275    // TODO: decide if we want to keep this and then document
276    #[serde(default)]
277    pub aggressive_connection_opening: bool,
278
279    /// How long to wait between calls to get_egress_path_config for
280    /// any given ready queue. Making this longer uses fewer
281    /// resources (in aggregate) but means that it will take longer
282    /// to detect and adjust to changes in the queue configuration.
283    #[serde(
284        default = "EgressPathConfig::default_refresh_interval",
285        with = "duration_serde"
286    )]
287    pub refresh_interval: Duration,
288    #[serde(default)]
289    pub refresh_strategy: ConfigRefreshStrategy,
290
291    #[serde(default)]
292    pub dispatcher_wakeup_strategy: WakeupStrategy,
293    #[serde(default)]
294    pub maintainer_wakeup_strategy: WakeupStrategy,
295
296    /// Specify an explicit provider name that should apply to this
297    /// path. The provider name will be used when computing metrics
298    /// rollups by provider. If omitted, then
299    #[serde(default)]
300    pub provider_name: Option<String>,
301
302    /// If set, a process-local cache will be used to remember if
303    /// a site has broken TLS for the duration specified.  Once
304    /// encountered, we will pretend that EHLO didn't advertise STARTTLS
305    /// on subsequent connection attempts.
306    #[serde(default, with = "duration_serde")]
307    pub remember_broken_tls: Option<Duration>,
308
309    /// If true, when a TLS handshake fails and TLS is set to
310    /// opportunistic, we will re-connect to that host with
311    /// TLS disabled.
312    #[serde(default)]
313    pub opportunistic_tls_reconnect_on_failed_handshake: bool,
314
315    /// If true, rather than ESMTP, use the LMTP protocol
316    #[serde(default)]
317    pub use_lmtp: bool,
318
319    /// How to behave if we experience either a 421 response, an IO Error,
320    /// or a timeout while talking to the peer.
321    #[serde(default)]
322    pub reconnect_strategy: ReconnectStrategy,
323
324    /// Which thread pool to use for processing the ready queue
325    #[serde(default)]
326    pub readyq_pool_name: Option<String>,
327
328    /// What to do to newly inserted messages when memory is low
329    #[serde(default)]
330    pub low_memory_reduction_policy: MemoryReductionPolicy,
331
332    /// What to do to newly inserted messages when memory is over the soft limit
333    #[serde(default)]
334    pub no_memory_reduction_policy: MemoryReductionPolicy,
335
336    /// If we experience a transport error during SMTP, should we retry the
337    /// current message on the next host in the connection plan, or
338    /// immediately consider it a transient failure for that message?
339    #[serde(default)]
340    pub try_next_host_on_transport_error: bool,
341}
342
343#[cfg(feature = "lua")]
344impl LuaUserData for EgressPathConfig {
345    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
346        config::impl_pairs_and_index(methods);
347    }
348}
349
350impl Default for EgressPathConfig {
351    fn default() -> Self {
352        Self {
353            connection_limit: Self::default_connection_limit(),
354            tls_prefer_openssl: false,
355            enable_tls: Tls::default(),
356            enable_mta_sts: Self::default_enable_mta_sts(),
357            enable_dane: Self::default_enable_dane(),
358            enable_rset: Self::default_enable_rset(),
359            enable_pipelining: Self::default_enable_pipelining(),
360            max_ready: Self::default_max_ready(),
361            consecutive_connection_failures_before_delay:
362                Self::default_consecutive_connection_failures_before_delay(),
363            smtp_port: Self::default_smtp_port(),
364            max_message_rate: None,
365            max_connection_rate: None,
366            max_deliveries_per_connection: Self::default_max_deliveries_per_connection(),
367            client_timeouts: SmtpClientTimeouts::default(),
368            system_shutdown_timeout: None,
369            prohibited_hosts: CidrSet::default_prohibited_hosts(),
370            skip_hosts: CidrSet::default(),
371            ehlo_domain: None,
372            allow_smtp_auth_plain_without_tls: false,
373            smtp_auth_plain_username: None,
374            smtp_auth_plain_password: None,
375            aggressive_connection_opening: false,
376            rustls_cipher_suites: vec![],
377            openssl_cipher_list: None,
378            openssl_cipher_suites: None,
379            openssl_options: None,
380            refresh_interval: Self::default_refresh_interval(),
381            refresh_strategy: ConfigRefreshStrategy::default(),
382            additional_message_rate_throttles: OrderMap::default(),
383            additional_connection_limits: OrderMap::default(),
384            source_selection_rate: None,
385            additional_source_selection_rates: OrderMap::default(),
386            provider_name: None,
387            remember_broken_tls: None,
388            opportunistic_tls_reconnect_on_failed_handshake: false,
389            use_lmtp: false,
390            reconnect_strategy: ReconnectStrategy::default(),
391            readyq_pool_name: None,
392            low_memory_reduction_policy: MemoryReductionPolicy::default(),
393            no_memory_reduction_policy: MemoryReductionPolicy::default(),
394            maintainer_wakeup_strategy: WakeupStrategy::default(),
395            dispatcher_wakeup_strategy: WakeupStrategy::default(),
396            try_next_host_on_transport_error: false,
397        }
398    }
399}
400
401impl EgressPathConfig {
402    fn default_connection_limit() -> LimitSpec {
403        LimitSpec::new(32)
404    }
405
406    fn default_enable_mta_sts() -> bool {
407        true
408    }
409
410    fn default_enable_pipelining() -> bool {
411        true
412    }
413
414    fn default_enable_rset() -> bool {
415        true
416    }
417
418    fn default_enable_dane() -> bool {
419        false
420    }
421
422    fn default_max_ready() -> usize {
423        1024
424    }
425
426    fn default_consecutive_connection_failures_before_delay() -> usize {
427        100
428    }
429
430    fn default_smtp_port() -> u16 {
431        25
432    }
433
434    fn default_max_deliveries_per_connection() -> usize {
435        1024
436    }
437
438    fn default_refresh_interval() -> Duration {
439        Duration::from_secs(60)
440    }
441}