kumo_api_types/
egress_path.rs

1use cidr_map::CidrSet;
2use data_loader::KeySource;
3#[cfg(feature = "lua")]
4use mlua::prelude::*;
5use openssl::ssl::SslOptions;
6use ordermap::OrderMap;
7use rfc5321::SmtpClientTimeouts;
8use rustls::crypto::aws_lc_rs::ALL_CIPHER_SUITES;
9use rustls::SupportedCipherSuite;
10use serde::{Deserialize, Deserializer, Serialize};
11use std::time::Duration;
12use throttle::{LimitSpec, ThrottleSpec};
13
14#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)]
15pub enum Tls {
16    /// Use it if available. If the peer has invalid or self-signed certificates, then
17    /// delivery will fail. Will NOT fallback to not using TLS if the peer advertises
18    /// STARTTLS.
19    Opportunistic,
20    /// Use it if available, and allow self-signed or otherwise invalid server certs.
21    /// Not recommended for sending to the public internet; this is for local/lab
22    /// testing scenarios only.
23    OpportunisticInsecure,
24    /// TLS with valid certs is required.
25    Required,
26    /// Required, and allow self-signed or otherwise invalid server certs.
27    /// Not recommended for sending to the public internet; this is for local/lab
28    /// testing scenarios only.
29    RequiredInsecure,
30    /// Do not try to use TLS
31    Disabled,
32}
33
34impl Tls {
35    pub fn allow_insecure(&self) -> bool {
36        match self {
37            Self::OpportunisticInsecure | Self::RequiredInsecure => true,
38            _ => false,
39        }
40    }
41
42    pub fn is_opportunistic(&self) -> bool {
43        match self {
44            Self::OpportunisticInsecure | Self::Opportunistic => true,
45            _ => false,
46        }
47    }
48}
49
50impl Default for Tls {
51    fn default() -> Self {
52        Self::Opportunistic
53    }
54}
55
56pub fn parse_openssl_options(option_list: &str) -> anyhow::Result<SslOptions> {
57    let mut result = SslOptions::empty();
58
59    for option in option_list.split('|') {
60        match SslOptions::from_name(option) {
61            Some(opt) => {
62                result.insert(opt);
63            }
64            None => {
65                let mut allowed: Vec<_> = SslOptions::all()
66                    .iter_names()
67                    .map(|(name, _)| format!("`{name}`"))
68                    .collect();
69                allowed.sort();
70                let allowed = allowed.join(", ");
71                anyhow::bail!(
72                    "`{option}` is not a valid SslOption name. \
73                    Possible values are {allowed} joined together by the pipe `|` character."
74                );
75            }
76        }
77    }
78
79    Ok(result)
80}
81
82fn deserialize_ssl_options<'de, D>(deserializer: D) -> Result<Option<SslOptions>, D::Error>
83where
84    D: Deserializer<'de>,
85{
86    use serde::de::Error;
87    let maybe_options = Option::<String>::deserialize(deserializer)?;
88
89    match maybe_options {
90        None => Ok(None),
91        Some(option_list) => match parse_openssl_options(&option_list) {
92            Ok(options) => Ok(Some(options)),
93            Err(err) => Err(D::Error::custom(format!("{err:#}"))),
94        },
95    }
96}
97
98fn deserialize_supported_ciphersuite<'de, D>(
99    deserializer: D,
100) -> Result<Vec<SupportedCipherSuite>, D::Error>
101where
102    D: Deserializer<'de>,
103{
104    use serde::de::Error;
105    let suites = Vec::<String>::deserialize(deserializer)?;
106    let mut result = vec![];
107
108    for s in suites {
109        match find_rustls_cipher_suite(&s) {
110            Some(s) => {
111                result.push(s);
112            }
113            None => {
114                return Err(D::Error::custom(format!(
115                    "`{s}` is not a valid rustls cipher suite"
116                )));
117            }
118        }
119    }
120
121    Ok(result)
122}
123
124pub fn find_rustls_cipher_suite(name: &str) -> Option<SupportedCipherSuite> {
125    for suite in ALL_CIPHER_SUITES {
126        let sname = format!("{:?}", suite.suite());
127        if sname.eq_ignore_ascii_case(name) {
128            return Some(*suite);
129        }
130    }
131    None
132}
133
134#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
135#[cfg_attr(feature = "lua", derive(FromLua))]
136pub enum WakeupStrategy {
137    #[default]
138    Aggressive,
139    Relaxed,
140}
141
142#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
143#[cfg_attr(feature = "lua", derive(FromLua))]
144pub enum MemoryReductionPolicy {
145    #[default]
146    ShrinkDataAndMeta,
147    ShrinkData,
148    NoShrink,
149}
150
151#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
152#[cfg_attr(feature = "lua", derive(FromLua))]
153pub enum ConfigRefreshStrategy {
154    #[default]
155    Ttl,
156    Epoch,
157}
158
159#[derive(Deserialize, Serialize, Debug, Clone, Default, Copy, PartialEq, Eq)]
160#[cfg_attr(feature = "lua", derive(FromLua))]
161pub enum ReconnectStrategy {
162    /// Close out the current connection session, allowing the maintainer
163    /// to decide about opening a new session and starting with a fresh
164    /// connection plan
165    TerminateSession,
166    /// Try to reconnect to the same host that we were using and where
167    /// we experienced the error
168    ReconnectSameHost,
169    /// Advance to the next host in the connection, if any. If none remain,
170    /// this is equivalent to TerminateSession
171    #[default]
172    ConnectNextHost,
173}
174
175#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
176#[cfg_attr(feature = "lua", derive(FromLua))]
177#[serde(deny_unknown_fields)]
178pub struct EgressPathConfig {
179    #[serde(default = "EgressPathConfig::default_connection_limit")]
180    pub connection_limit: LimitSpec,
181
182    #[serde(default)]
183    pub additional_connection_limits: OrderMap<String, LimitSpec>,
184
185    #[serde(default)]
186    pub enable_tls: Tls,
187
188    #[serde(default = "EgressPathConfig::default_enable_mta_sts")]
189    pub enable_mta_sts: bool,
190
191    #[serde(default = "EgressPathConfig::default_enable_dane")]
192    pub enable_dane: bool,
193
194    #[serde(default = "EgressPathConfig::default_enable_pipelining")]
195    pub enable_pipelining: bool,
196
197    #[serde(default = "EgressPathConfig::default_enable_rset")]
198    pub enable_rset: bool,
199
200    #[serde(default)]
201    pub tls_prefer_openssl: bool,
202
203    #[serde(default)]
204    pub openssl_cipher_list: Option<String>,
205    #[serde(default)]
206    pub openssl_cipher_suites: Option<String>,
207    #[serde(
208        default,
209        deserialize_with = "deserialize_ssl_options",
210        skip_serializing // FIXME
211    )]
212    pub openssl_options: Option<SslOptions>,
213
214    #[serde(
215        default,
216        deserialize_with = "deserialize_supported_ciphersuite",
217        skip_serializing // FIXME
218    )]
219    pub rustls_cipher_suites: Vec<SupportedCipherSuite>,
220
221    #[serde(flatten)]
222    pub client_timeouts: SmtpClientTimeouts,
223
224    /// How long to wait for an established session to gracefully
225    /// close when the system is shutting down. After this period
226    /// has elapsed, sessions will be aborted.
227    #[serde(default, with = "duration_serde")]
228    pub system_shutdown_timeout: Option<Duration>,
229
230    #[serde(default = "EgressPathConfig::default_max_ready")]
231    pub max_ready: usize,
232
233    #[serde(default = "EgressPathConfig::default_consecutive_connection_failures_before_delay")]
234    pub consecutive_connection_failures_before_delay: usize,
235
236    #[serde(default = "EgressPathConfig::default_smtp_port")]
237    pub smtp_port: u16,
238
239    #[serde(default)]
240    pub smtp_auth_plain_username: Option<String>,
241
242    #[serde(default)]
243    pub smtp_auth_plain_password: Option<KeySource>,
244
245    #[serde(default)]
246    pub allow_smtp_auth_plain_without_tls: bool,
247
248    #[serde(default)]
249    pub max_message_rate: Option<ThrottleSpec>,
250
251    #[serde(default)]
252    pub additional_message_rate_throttles: OrderMap<String, ThrottleSpec>,
253
254    #[serde(default)]
255    pub source_selection_rate: Option<ThrottleSpec>,
256
257    #[serde(default)]
258    pub additional_source_selection_rates: OrderMap<String, ThrottleSpec>,
259
260    #[serde(default)]
261    pub max_connection_rate: Option<ThrottleSpec>,
262
263    #[serde(default = "EgressPathConfig::default_max_deliveries_per_connection")]
264    pub max_deliveries_per_connection: usize,
265
266    #[serde(default = "CidrSet::default_prohibited_hosts")]
267    pub prohibited_hosts: CidrSet,
268
269    #[serde(default)]
270    pub skip_hosts: CidrSet,
271
272    #[serde(default)]
273    pub ehlo_domain: Option<String>,
274
275    // TODO: decide if we want to keep this and then document
276    #[serde(default)]
277    pub aggressive_connection_opening: bool,
278
279    /// How long to wait between calls to get_egress_path_config for
280    /// any given ready queue. Making this longer uses fewer
281    /// resources (in aggregate) but means that it will take longer
282    /// to detect and adjust to changes in the queue configuration.
283    #[serde(
284        default = "EgressPathConfig::default_refresh_interval",
285        with = "duration_serde"
286    )]
287    pub refresh_interval: Duration,
288    #[serde(default)]
289    pub refresh_strategy: ConfigRefreshStrategy,
290
291    #[serde(default)]
292    pub dispatcher_wakeup_strategy: WakeupStrategy,
293    #[serde(default)]
294    pub maintainer_wakeup_strategy: WakeupStrategy,
295
296    /// Specify an explicit provider name that should apply to this
297    /// path. The provider name will be used when computing metrics
298    /// rollups by provider. If omitted, then
299    #[serde(default)]
300    pub provider_name: Option<String>,
301
302    /// If set, a process-local cache will be used to remember if
303    /// a site has broken TLS for the duration specified.  Once
304    /// encountered, we will pretend that EHLO didn't advertise STARTTLS
305    /// on subsequent connection attempts.
306    #[serde(default, with = "duration_serde")]
307    pub remember_broken_tls: Option<Duration>,
308
309    /// If true, when a TLS handshake fails and TLS is set to
310    /// opportunistic, we will re-connect to that host with
311    /// TLS disabled.
312    #[serde(default)]
313    pub opportunistic_tls_reconnect_on_failed_handshake: bool,
314
315    /// If true, rather than ESMTP, use the LMTP protocol
316    #[serde(default)]
317    pub use_lmtp: bool,
318
319    /// How to behave if we experience either a 421 response, an IO Error,
320    /// or a timeout while talking to the peer.
321    #[serde(default)]
322    pub reconnect_strategy: ReconnectStrategy,
323
324    /// Which thread pool to use for processing the ready queue
325    #[serde(default)]
326    pub readyq_pool_name: Option<String>,
327
328    /// What to do to newly inserted messages when memory is low
329    #[serde(default)]
330    pub low_memory_reduction_policy: MemoryReductionPolicy,
331
332    /// What to do to newly inserted messages when memory is over the soft limit
333    #[serde(default)]
334    pub no_memory_reduction_policy: MemoryReductionPolicy,
335}
336
337#[cfg(feature = "lua")]
338impl LuaUserData for EgressPathConfig {
339    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
340        config::impl_pairs_and_index(methods);
341    }
342}
343
344impl Default for EgressPathConfig {
345    fn default() -> Self {
346        Self {
347            connection_limit: Self::default_connection_limit(),
348            tls_prefer_openssl: false,
349            enable_tls: Tls::default(),
350            enable_mta_sts: Self::default_enable_mta_sts(),
351            enable_dane: Self::default_enable_dane(),
352            enable_rset: Self::default_enable_rset(),
353            enable_pipelining: Self::default_enable_pipelining(),
354            max_ready: Self::default_max_ready(),
355            consecutive_connection_failures_before_delay:
356                Self::default_consecutive_connection_failures_before_delay(),
357            smtp_port: Self::default_smtp_port(),
358            max_message_rate: None,
359            max_connection_rate: None,
360            max_deliveries_per_connection: Self::default_max_deliveries_per_connection(),
361            client_timeouts: SmtpClientTimeouts::default(),
362            system_shutdown_timeout: None,
363            prohibited_hosts: CidrSet::default_prohibited_hosts(),
364            skip_hosts: CidrSet::default(),
365            ehlo_domain: None,
366            allow_smtp_auth_plain_without_tls: false,
367            smtp_auth_plain_username: None,
368            smtp_auth_plain_password: None,
369            aggressive_connection_opening: false,
370            rustls_cipher_suites: vec![],
371            openssl_cipher_list: None,
372            openssl_cipher_suites: None,
373            openssl_options: None,
374            refresh_interval: Self::default_refresh_interval(),
375            refresh_strategy: ConfigRefreshStrategy::default(),
376            additional_message_rate_throttles: OrderMap::default(),
377            additional_connection_limits: OrderMap::default(),
378            source_selection_rate: None,
379            additional_source_selection_rates: OrderMap::default(),
380            provider_name: None,
381            remember_broken_tls: None,
382            opportunistic_tls_reconnect_on_failed_handshake: false,
383            use_lmtp: false,
384            reconnect_strategy: ReconnectStrategy::default(),
385            readyq_pool_name: None,
386            low_memory_reduction_policy: MemoryReductionPolicy::default(),
387            no_memory_reduction_policy: MemoryReductionPolicy::default(),
388            maintainer_wakeup_strategy: WakeupStrategy::default(),
389            dispatcher_wakeup_strategy: WakeupStrategy::default(),
390        }
391    }
392}
393
394impl EgressPathConfig {
395    fn default_connection_limit() -> LimitSpec {
396        LimitSpec::new(32)
397    }
398
399    fn default_enable_mta_sts() -> bool {
400        true
401    }
402
403    fn default_enable_pipelining() -> bool {
404        true
405    }
406
407    fn default_enable_rset() -> bool {
408        true
409    }
410
411    fn default_enable_dane() -> bool {
412        false
413    }
414
415    fn default_max_ready() -> usize {
416        1024
417    }
418
419    fn default_consecutive_connection_failures_before_delay() -> usize {
420        100
421    }
422
423    fn default_smtp_port() -> u16 {
424        25
425    }
426
427    fn default_max_deliveries_per_connection() -> usize {
428        1024
429    }
430
431    fn default_refresh_interval() -> Duration {
432        Duration::from_secs(60)
433    }
434}