kumo_server_common/
start.rs1use crate::diagnostic_logging::LoggingConfig;
2use crate::nodeid::NodeId;
3use anyhow::Context;
4use chrono::{DateTime, Utc};
5use config::RegisterFunc;
6use kumo_machine_info::MachineInfo;
7use kumo_prometheus::declare_metric;
8use kumo_server_lifecycle::LifeCycle;
9use kumo_server_runtime::rt_spawn;
10use parking_lot::Mutex;
11use std::future::Future;
12use std::path::Path;
13use std::sync::LazyLock;
14
15pub static ONLINE_SINCE: LazyLock<DateTime<Utc>> = LazyLock::new(Utc::now);
16pub static MACHINE_INFO: LazyLock<Mutex<Option<MachineInfo>>> = LazyLock::new(|| Mutex::new(None));
17
18pub struct StartConfig<'a> {
19 pub logging: LoggingConfig<'a>,
20 pub lua_funcs: &'a [RegisterFunc],
21 pub policy: &'a Path,
22}
23
24impl StartConfig<'_> {
25 pub async fn run<INIT, FINI>(
26 self,
27 init_future: INIT,
28 shutdown_future: FINI,
29 ) -> anyhow::Result<()>
30 where
31 INIT: Future<Output = anyhow::Result<()>> + Send + 'static,
32 FINI: Future<Output = ()> + Send + 'static,
33 {
34 LazyLock::force(&ONLINE_SINCE);
35 self.logging.init()?;
36
37 start_cpu_usage_monitor();
38
39 rustls::crypto::aws_lc_rs::default_provider()
40 .install_default()
41 .map_err(|_| anyhow::anyhow!("failed to install default crypto provider"))?;
42
43 kumo_server_memory::setup_memory_limit().context("setup_memory_limit")?;
44
45 prometheus::register(Box::new(
46 tokio_metrics_collector::default_runtime_collector(),
47 ))
48 .context("failed to configure tokio-metrics-collector")?;
49
50 for &func in self.lua_funcs {
51 config::register(func);
52 }
53
54 config::set_policy_path(self.policy.to_path_buf())
55 .await
56 .with_context(|| format!("Error evaluating policy file {:?}", self.policy))?;
57
58 tokio::spawn(async move {
59 let mut info = MachineInfo::new();
60 info.node_id.replace(NodeId::get().uuid.to_string());
61 info.query_cloud_provider().await;
62 *MACHINE_INFO.lock() = Some(info);
63 });
64
65 let mut life_cycle = LifeCycle::new();
66
67 let init_handle = rt_spawn("initialize", async move {
68 let mut error = None;
69 if let Err(err) = init_future.await {
70 let err = format!("{err:#}");
71 tracing::error!("problem initializing: {err}");
72 LifeCycle::request_shutdown().await;
73 error.replace(err);
74 }
75 tracing::info!("initialization complete");
79 error
80 })?;
81
82 life_cycle.wait_for_shutdown().await;
83
84 shutdown_future.await;
86
87 tracing::info!("Shutdown completed OK!");
88
89 if let Some(error) = init_handle.await? {
90 anyhow::bail!("Initialization raised an error: {error}");
91 }
92 Ok(())
93 }
94}
95
96declare_metric! {
97static SYS_CPU_USAGE: IntGauge("system_cpu_usage_sum");
111}
112
113declare_metric! {
114static SYS_CPU_USAGE_NORM: IntGauge("system_cpu_usage_normalized");
121}
122
123declare_metric! {
124static PROC_CPU_USAGE: IntGauge("process_cpu_usage_sum");
138}
139
140declare_metric! {
141static PROC_CPU_USAGE_NORM: IntGauge("process_cpu_usage_normalized");
148}
149
150fn start_cpu_usage_monitor() {
151 std::thread::spawn(|| {
152 use std::time::Duration;
153 use sysinfo::{
154 get_current_pid, CpuRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate,
155 RefreshKind, System,
156 };
157
158 let mut sys = System::new_with_specifics(
159 RefreshKind::nothing().with_cpu(CpuRefreshKind::everything()),
160 );
161
162 let my_pid = get_current_pid().expect("failed to get own pid!?");
163
164 let update_interval = Duration::from_secs(3).max(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
165 sys.refresh_cpu_usage();
169 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
170
171 loop {
172 sys.refresh_cpu_usage();
173 sys.refresh_processes_specifics(
174 ProcessesToUpdate::Some(&[my_pid]),
175 true,
176 ProcessRefreshKind::everything().without_tasks(),
177 );
178
179 let num_cpus = sys.cpus().len() as i64;
180
181 let sys_usage = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>() as i64;
182 SYS_CPU_USAGE.set(sys_usage);
183 SYS_CPU_USAGE_NORM.set(sys_usage / num_cpus);
184
185 if let Some(p) = sys.process(Pid::from(my_pid)) {
186 let proc_usage = p.cpu_usage() as i64;
187 PROC_CPU_USAGE.set(proc_usage);
188 PROC_CPU_USAGE_NORM.set(proc_usage / num_cpus);
189 }
190
191 std::thread::sleep(update_interval);
192 }
193 });
194}