1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use crate::diagnostic_logging::LoggingConfig;
use anyhow::Context;
use config::RegisterFunc;
use kumo_server_lifecycle::LifeCycle;
use kumo_server_runtime::rt_spawn;
use std::future::Future;
use std::path::Path;
use std::pin::Pin;

pub struct StartConfig<'a> {
    pub logging: LoggingConfig<'a>,
    pub lua_funcs: &'a [RegisterFunc],
    pub policy: &'a Path,
}

impl<'a> StartConfig<'a> {
    pub async fn run<INIT, FINI>(
        self,
        perform_init: INIT,
        broadcast_shutdown: FINI,
    ) -> anyhow::Result<()>
    where
        INIT: FnOnce() -> Pin<Box<dyn Future<Output = anyhow::Result<()>>>> + Send + 'static,
        FINI: FnOnce() -> Pin<Box<dyn Future<Output = ()>>> + Send + 'static,
    {
        self.logging.init()?;

        rustls::crypto::aws_lc_rs::default_provider()
            .install_default()
            .map_err(|_| anyhow::anyhow!("failed to install default crypto provider"))?;

        kumo_server_memory::setup_memory_limit().context("setup_memory_limit")?;

        prometheus::register(Box::new(
            tokio_metrics_collector::default_runtime_collector(),
        ))
        .context("failed to configure tokio-metrics-collector")?;

        for &func in self.lua_funcs {
            config::register(func);
        }

        config::set_policy_path(self.policy.to_path_buf())
            .await
            .with_context(|| format!("Failed to parse policy file {:?}", self.policy))?;

        let mut life_cycle = LifeCycle::new();

        let init_handle = rt_spawn("initialize".to_string(), move || {
            Ok(async move {
                let mut error = None;
                let init_future = (perform_init)();
                if let Err(err) = init_future.await {
                    let err = format!("{err:#}");
                    tracing::error!("problem initializing: {err}");
                    LifeCycle::request_shutdown().await;
                    error.replace(err);
                }
                // This log line is depended upon by the integration
                // test harness. Do not change or remove it without
                // making appropriate adjustments over there!
                tracing::info!("initialization complete");
                error
            })
        })
        .await?;

        life_cycle.wait_for_shutdown().await;

        // after waiting for those to idle out, shut down logging
        let shutdown_future = (broadcast_shutdown)();
        shutdown_future.await;

        tracing::info!("Shutdown completed OK!");

        if let Some(error) = init_handle.await? {
            anyhow::bail!("Initialization raised an error: {error}");
        }
        Ok(())
    }
}