kumo_server_common/
start.rs

1use crate::diagnostic_logging::LoggingConfig;
2use crate::nodeid::NodeId;
3use anyhow::Context;
4use chrono::{DateTime, Utc};
5use config::RegisterFunc;
6use kumo_machine_info::MachineInfo;
7use kumo_prometheus::declare_metric;
8use kumo_server_lifecycle::LifeCycle;
9use kumo_server_runtime::rt_spawn;
10use parking_lot::Mutex;
11use std::future::Future;
12use std::path::Path;
13use std::sync::LazyLock;
14
15pub static ONLINE_SINCE: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
16pub static MACHINE_INFO: LazyLock<Mutex<Option<MachineInfo>>> = LazyLock::new(|| Mutex::new(None));
17
18pub struct StartConfig<'a> {
19    pub logging: LoggingConfig<'a>,
20    pub lua_funcs: &'a [RegisterFunc],
21    pub policy: &'a Path,
22}
23
24impl StartConfig<'_> {
25    pub async fn run<INIT, FINI>(
26        self,
27        init_future: INIT,
28        shutdown_future: FINI,
29    ) -> anyhow::Result<()>
30    where
31        INIT: Future<Output = anyhow::Result<()>> + Send + 'static,
32        FINI: Future<Output = ()> + Send + 'static,
33    {
34        LazyLock::force(&ONLINE_SINCE);
35        self.logging.init()?;
36
37        start_cpu_usage_monitor();
38
39        rustls::crypto::aws_lc_rs::default_provider()
40            .install_default()
41            .map_err(|_| anyhow::anyhow!("failed to install default crypto provider"))?;
42
43        kumo_server_memory::setup_memory_limit().context("setup_memory_limit")?;
44
45        prometheus::register(Box::new(
46            tokio_metrics_collector::default_runtime_collector(),
47        ))
48        .context("failed to configure tokio-metrics-collector")?;
49
50        for &func in self.lua_funcs {
51            config::register(func);
52        }
53
54        config::set_policy_path(self.policy.to_path_buf())
55            .await
56            .with_context(|| format!("Error evaluating policy file {:?}", self.policy))?;
57
58        tokio::spawn(async move {
59            let mut info = MachineInfo::new();
60            info.node_id.replace(NodeId::get().uuid.to_string());
61            info.query_cloud_provider().await;
62            *MACHINE_INFO.lock() = Some(info);
63        });
64
65        let mut life_cycle = LifeCycle::new();
66
67        let init_handle = rt_spawn("initialize", async move {
68            let mut error = None;
69            if let Err(err) = init_future.await {
70                let err = format!("{err:#}");
71                tracing::error!("problem initializing: {err}");
72                LifeCycle::request_shutdown(Ok(())).await;
73                error.replace(err);
74            }
75            // This log line is depended upon by the integration
76            // test harness. Do not change or remove it without
77            // making appropriate adjustments over there!
78            tracing::info!("initialization complete");
79            error
80        })?;
81
82        let final_result = life_cycle.wait_for_shutdown().await;
83
84        // after waiting for those to idle out, shut down logging
85        shutdown_future.await;
86
87        tracing::info!("Shutdown completed OK!");
88
89        if let Some(error) = init_handle.await? {
90            anyhow::bail!("Initialization raised an error: {error}");
91        }
92        if let Some(Err(error)) = final_result {
93            anyhow::bail!("Shutdown due to error: {error}");
94        }
95        Ok(())
96    }
97}
98
99declare_metric! {
100/// The sum of the system-wide CPU usage for each CPU in the system, can add up to more than 100%.
101///
102/// Each CPU has a value from 0-100% busy; a value of 100% in this metric
103/// indicates that the load is equivalent to one fully utilized CPU.
104///
105/// A multi-CPU system can report more than 100% in this metric; a dual-CPU
106/// system reporting 200% indicates that both CPUs are fully utilized.
107///
108/// See system_cpu_usage_normalized for a version of this metric that scales from
109/// 0% (totally idle) to 100% (totally saturated).
110///
111/// This metric is scoped to the system, reflecting the total load on the
112/// system, not just from the kumo related process(es).
113static SYS_CPU_USAGE: IntGauge("system_cpu_usage_sum");
114}
115
116declare_metric! {
117/// The sum of the system-wide CPU usage for each CPU in the system, divided by the number of CPUs.
118///
119/// 100% in this metric indicates that all CPU cores are 100% busy.
120///
121/// This metric is scoped to the system, reflecting the total load on the
122/// system, not just from the kumo related process(es).
123static SYS_CPU_USAGE_NORM: IntGauge("system_cpu_usage_normalized");
124}
125
126declare_metric! {
127/// The sum of the process CPU usage for each CPU in the system, can add up to more than 100%.
128///
129/// Each CPU has a value from 0-100% busy; a value of 100% in this metric
130/// indicates that the load is equivalent to one fully utilized CPU.
131///
132/// A multi-CPU system can report more than 100% in this metric; a dual-CPU
133/// system reporting 200% indicates that both CPUs are fully utilized.
134///
135/// See process_cpu_usage_normalized for a version of this metric that scales from
136/// 0% (totally idle) to 100% (totally saturated).
137///
138/// This metric is scoped to the service process, reflecting the CPU used only
139/// by the process and not the system as a whole.
140static PROC_CPU_USAGE: IntGauge("process_cpu_usage_sum");
141}
142
143declare_metric! {
144/// The sum of the process CPU usage for each CPU in the system, divided by the number of CPUs.
145///
146/// 100% in this metric indicates that all CPU cores are 100% busy.
147///
148/// This metric is scoped to the service process, reflecting the CPU used only
149/// by the process and not the system as a whole.
150static PROC_CPU_USAGE_NORM: IntGauge("process_cpu_usage_normalized");
151}
152
153fn start_cpu_usage_monitor() {
154    std::thread::spawn(|| {
155        use std::time::Duration;
156        use sysinfo::{
157            get_current_pid, CpuRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate,
158            RefreshKind, System,
159        };
160
161        let mut sys = System::new_with_specifics(
162            RefreshKind::nothing().with_cpu(CpuRefreshKind::everything()),
163        );
164
165        let my_pid = get_current_pid().expect("failed to get own pid!?");
166
167        let update_interval = Duration::from_secs(3).max(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
168        // Do two initial updates so that we have sufficient data for
169        // computing a delta; one here outside the loop, and another
170        // at the top of the loop.
171        sys.refresh_cpu_usage();
172        std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
173
174        loop {
175            sys.refresh_cpu_usage();
176            sys.refresh_processes_specifics(
177                ProcessesToUpdate::Some(&[my_pid]),
178                true,
179                ProcessRefreshKind::everything().without_tasks(),
180            );
181
182            let num_cpus = sys.cpus().len() as i64;
183
184            let sys_usage = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>() as i64;
185            SYS_CPU_USAGE.set(sys_usage);
186            SYS_CPU_USAGE_NORM.set(sys_usage / num_cpus);
187
188            if let Some(p) = sys.process(Pid::from(my_pid)) {
189                let proc_usage = p.cpu_usage() as i64;
190                PROC_CPU_USAGE.set(proc_usage);
191                PROC_CPU_USAGE_NORM.set(proc_usage / num_cpus);
192            }
193
194            std::thread::sleep(update_interval);
195        }
196    });
197}