kumo_server_common/
start.rs

1use crate::diagnostic_logging::LoggingConfig;
2use crate::nodeid::NodeId;
3use anyhow::Context;
4use chrono::{DateTime, Utc};
5use config::RegisterFunc;
6use kumo_machine_info::MachineInfo;
7use kumo_prometheus::declare_metric;
8use kumo_server_lifecycle::LifeCycle;
9use kumo_server_runtime::rt_spawn;
10use parking_lot::Mutex;
11use std::future::Future;
12use std::path::Path;
13use std::sync::LazyLock;
14
15pub static ONLINE_SINCE: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
16pub static MACHINE_INFO: LazyLock<Mutex<Option<MachineInfo>>> = LazyLock::new(|| Mutex::new(None));
17
18pub struct StartConfig<'a> {
19    pub logging: LoggingConfig<'a>,
20    pub lua_funcs: &'a [RegisterFunc],
21    pub policy: &'a Path,
22}
23
24impl StartConfig<'_> {
25    pub async fn run<INIT, FINI>(
26        self,
27        init_future: INIT,
28        shutdown_future: FINI,
29    ) -> anyhow::Result<()>
30    where
31        INIT: Future<Output = anyhow::Result<()>> + Send + 'static,
32        FINI: Future<Output = ()> + Send + 'static,
33    {
34        LazyLock::force(&ONLINE_SINCE);
35        self.logging.init()?;
36
37        start_cpu_usage_monitor();
38
39        rustls::crypto::aws_lc_rs::default_provider()
40            .install_default()
41            .map_err(|_| anyhow::anyhow!("failed to install default crypto provider"))?;
42
43        kumo_server_memory::setup_memory_limit().context("setup_memory_limit")?;
44
45        prometheus::register(Box::new(
46            tokio_metrics_collector::default_runtime_collector(),
47        ))
48        .context("failed to configure tokio-metrics-collector")?;
49
50        for &func in self.lua_funcs {
51            config::register(func);
52        }
53
54        config::set_policy_path(self.policy.to_path_buf())
55            .await
56            .with_context(|| format!("Error evaluating policy file {:?}", self.policy))?;
57
58        tokio::spawn(async move {
59            let mut info = MachineInfo::new();
60            info.node_id.replace(NodeId::get().uuid.to_string());
61            info.query_cloud_provider().await;
62            *MACHINE_INFO.lock() = Some(info);
63        });
64
65        let mut life_cycle = LifeCycle::new();
66
67        let init_handle = rt_spawn("initialize", async move {
68            let mut error = None;
69            if let Err(err) = init_future.await {
70                let err = format!("{err:#}");
71                tracing::error!("problem initializing: {err}");
72                LifeCycle::request_shutdown().await;
73                error.replace(err);
74            }
75            // This log line is depended upon by the integration
76            // test harness. Do not change or remove it without
77            // making appropriate adjustments over there!
78            tracing::info!("initialization complete");
79            error
80        })?;
81
82        life_cycle.wait_for_shutdown().await;
83
84        // after waiting for those to idle out, shut down logging
85        shutdown_future.await;
86
87        tracing::info!("Shutdown completed OK!");
88
89        if let Some(error) = init_handle.await? {
90            anyhow::bail!("Initialization raised an error: {error}");
91        }
92        Ok(())
93    }
94}
95
96declare_metric! {
97/// The sum of the system-wide CPU usage for each CPU in the system, can add up to more than 100%.
98///
99/// Each CPU has a value from 0-100% busy; a value of 100% in this metric
100/// indicates that the load is equivalent to one fully utilized CPU.
101///
102/// A multi-CPU system can report more than 100% in this metric; a dual-CPU
103/// system reporting 200% indicates that both CPUs are fully utilized.
104///
105/// See system_cpu_usage_normalized for a version of this metric that scales from
106/// 0% (totally idle) to 100% (totally saturated).
107///
108/// This metric is scoped to the system, reflecting the total load on the
109/// system, not just from the kumo related process(es).
110static SYS_CPU_USAGE: IntGauge("system_cpu_usage_sum");
111}
112
113declare_metric! {
114/// The sum of the system-wide CPU usage for each CPU in the system, divided by the number of CPUs.
115///
116/// 100% in this metric indicates that all CPU cores are 100% busy.
117///
118/// This metric is scoped to the system, reflecting the total load on the
119/// system, not just from the kumo related process(es).
120static SYS_CPU_USAGE_NORM: IntGauge("system_cpu_usage_normalized");
121}
122
123declare_metric! {
124/// The sum of the process CPU usage for each CPU in the system, can add up to more than 100%.
125///
126/// Each CPU has a value from 0-100% busy; a value of 100% in this metric
127/// indicates that the load is equivalent to one fully utilized CPU.
128///
129/// A multi-CPU system can report more than 100% in this metric; a dual-CPU
130/// system reporting 200% indicates that both CPUs are fully utilized.
131///
132/// See process_cpu_usage_normalized for a version of this metric that scales from
133/// 0% (totally idle) to 100% (totally saturated).
134///
135/// This metric is scoped to the service process, reflecting the CPU used only
136/// by the process and not the system as a whole.
137static PROC_CPU_USAGE: IntGauge("process_cpu_usage_sum");
138}
139
140declare_metric! {
141/// The sum of the process CPU usage for each CPU in the system, divided by the number of CPUs.
142///
143/// 100% in this metric indicates that all CPU cores are 100% busy.
144///
145/// This metric is scoped to the service process, reflecting the CPU used only
146/// by the process and not the system as a whole.
147static PROC_CPU_USAGE_NORM: IntGauge("process_cpu_usage_normalized");
148}
149
150fn start_cpu_usage_monitor() {
151    std::thread::spawn(|| {
152        use std::time::Duration;
153        use sysinfo::{
154            get_current_pid, CpuRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate,
155            RefreshKind, System,
156        };
157
158        let mut sys = System::new_with_specifics(
159            RefreshKind::nothing().with_cpu(CpuRefreshKind::everything()),
160        );
161
162        let my_pid = get_current_pid().expect("failed to get own pid!?");
163
164        let update_interval = Duration::from_secs(3).max(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
165        // Do two initial updates so that we have sufficient data for
166        // computing a delta; one here outside the loop, and another
167        // at the top of the loop.
168        sys.refresh_cpu_usage();
169        std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
170
171        loop {
172            sys.refresh_cpu_usage();
173            sys.refresh_processes_specifics(
174                ProcessesToUpdate::Some(&[my_pid]),
175                true,
176                ProcessRefreshKind::everything().without_tasks(),
177            );
178
179            let num_cpus = sys.cpus().len() as i64;
180
181            let sys_usage = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>() as i64;
182            SYS_CPU_USAGE.set(sys_usage);
183            SYS_CPU_USAGE_NORM.set(sys_usage / num_cpus);
184
185            if let Some(p) = sys.process(Pid::from(my_pid)) {
186                let proc_usage = p.cpu_usage() as i64;
187                PROC_CPU_USAGE.set(proc_usage);
188                PROC_CPU_USAGE_NORM.set(proc_usage / num_cpus);
189            }
190
191            std::thread::sleep(update_interval);
192        }
193    });
194}