1use crate::tracking::TrackingAllocator;
7use anyhow::Context;
8#[cfg(target_os = "linux")]
9use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
10#[cfg(target_os = "linux")]
11use cgroups_rs::hierarchies::{V1, V2};
12#[cfg(target_os = "linux")]
13use cgroups_rs::memory::MemController;
14#[cfg(target_os = "linux")]
15use cgroups_rs::{Hierarchy, MaxValue};
16use kumo_prometheus::declare_metric;
17use nix::sys::resource::{rlim_t, RLIM_INFINITY};
18use nix::unistd::{sysconf, SysconfVar};
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{LazyLock, Mutex};
21use std::time::Duration;
22use tikv_jemallocator::Jemalloc;
23use tokio::sync::watch::Receiver;
24
25pub mod tracking;
26
27pub use tracking::{set_tracking_callstacks, tracking_stats};
28
29#[global_allocator]
30static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
31
32declare_metric! {
33static LOW_COUNT: IntCounter("memory_low_count");
35}
36
37declare_metric! {
38static OVER_LIMIT_COUNT: IntCounter("memory_over_limit_count");
40}
41
42declare_metric! {
43static MEM_USAGE: Gauge("memory_usage");
45}
46
47declare_metric! {
48static MEM_LIMIT: Gauge("memory_limit");
50}
51
52declare_metric! {
53static MEM_COUNTED: Gauge("memory_usage_rust");
55}
56
57declare_metric! {
58static LOW_MEM_THRESH: Gauge("memory_low_thresh");
60}
61
62static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
63
64static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
65static LOW_MEM: AtomicBool = AtomicBool::new(false);
66
67static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
74
75#[derive(Debug, Clone, Copy)]
77pub struct MemoryUsage {
78 pub bytes: u64,
79}
80
81impl std::fmt::Display for MemoryUsage {
82 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(fmt, "{}", human(self.bytes))
84 }
85}
86
87impl MemoryUsage {
88 pub fn get() -> anyhow::Result<Self> {
89 #[cfg(target_os = "linux")]
90 {
91 if let Ok(v2) = Self::get_cgroup(true) {
92 return Ok(v2);
93 }
94 if let Ok(v1) = Self::get_cgroup(false) {
95 return Ok(v1);
96 }
97 }
98 Self::get_linux_statm()
99 }
100
101 #[cfg(target_os = "linux")]
102 fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
103 let cgroup = get_my_cgroup(v2)?;
104 let mem: &MemController = cgroup
105 .controller_of()
106 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
107 let stat = mem.memory_stat();
108 Ok(Self {
109 bytes: stat.usage_in_bytes,
110 })
111 }
112
113 pub fn get_linux_statm() -> anyhow::Result<Self> {
114 let data = std::fs::read_to_string("/proc/self/statm")?;
115 let fields: Vec<&str> = data.split(' ').collect();
116 let rss: u64 = fields[1].parse()?;
117 Ok(Self {
118 bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
119 })
120 }
121}
122
123fn human(n: u64) -> String {
124 humansize::format_size(n, humansize::DECIMAL)
125}
126
127#[derive(Debug, Clone, Copy)]
129pub struct MemoryLimits {
130 pub soft_limit: Option<u64>,
131 pub hard_limit: Option<u64>,
132}
133
134impl std::fmt::Display for MemoryLimits {
135 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
136 let soft = self.soft_limit.map(human);
137 let hard = self.hard_limit.map(human);
138 write!(fmt, "soft={soft:?}, hard={hard:?}")
139 }
140}
141
142impl MemoryLimits {
143 pub fn min(self, other: Self) -> Self {
144 Self {
145 soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
146 hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
147 }
148 }
149
150 pub fn is_unlimited(&self) -> bool {
151 self.soft_limit.is_none() && self.hard_limit.is_none()
152 }
153}
154
155fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
156 if rlim == RLIM_INFINITY {
157 None
158 } else {
159 Some(rlim)
160 }
161}
162
163#[cfg(target_os = "linux")]
164fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
165 Ok(match value {
166 None | Some(MaxValue::Max) => None,
167 Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
168 Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
169 })
170}
171
172fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
173 match (a, b) {
174 (Some(a), Some(b)) => Some(a.min(b)),
175 (Some(a), None) | (None, Some(a)) => Some(a),
176 (None, None) => None,
177 }
178}
179
180impl MemoryLimits {
181 pub fn get_rlimits() -> anyhow::Result<Self> {
182 #[cfg(not(target_os = "macos"))]
183 let (rss_soft, rss_hard) =
184 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
185 #[cfg(target_os = "macos")]
186 let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
187
188 let soft_limit = rlim_to_opt(rss_soft);
189 let hard_limit = rlim_to_opt(rss_hard);
190
191 Ok(Self {
192 soft_limit,
193 hard_limit,
194 })
195 }
196
197 #[cfg(target_os = "linux")]
198 fn get_any_cgroup() -> anyhow::Result<Self> {
199 if let Ok(cg) = Self::get_cgroup(true) {
200 return Ok(cg);
201 }
202 Self::get_cgroup(false)
203 }
204
205 #[cfg(target_os = "linux")]
206 pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
207 let cgroup = get_my_cgroup(v2)?;
208 let mem: &MemController = cgroup
209 .controller_of()
210 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
211
212 let limits = mem.get_mem()?;
213 Ok(Self {
214 soft_limit: max_value_to_opt(limits.high)?,
215 hard_limit: max_value_to_opt(limits.max)?,
216 })
217 }
218}
219
220#[cfg(target_os = "linux")]
223fn get_physical_memory() -> anyhow::Result<u64> {
224 let data = std::fs::read_to_string("/proc/meminfo")?;
225 for line in data.lines() {
226 if line.starts_with("MemTotal:") {
227 let mut iter = line.rsplit(' ');
228 let unit = iter
229 .next()
230 .ok_or_else(|| anyhow::anyhow!("expected unit"))?;
231 if unit != "kB" {
232 anyhow::bail!("unsupported /proc/meminfo unit {unit}");
233 }
234 let value = iter
235 .next()
236 .ok_or_else(|| anyhow::anyhow!("expected value"))?;
237 let value: u64 = value.parse()?;
238
239 return Ok(value * 1024);
240 }
241 }
242 anyhow::bail!("MemTotal not found in /proc/meminfo");
243}
244
245#[cfg(target_os = "linux")]
257fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
258 let mut limit = MemoryLimits::get_rlimits()?;
259 let mut usage = MemoryUsage::get_linux_statm()?;
260
261 if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
262 if !cg_lim.is_unlimited() {
263 limit = limit.min(cg_lim);
264 usage = MemoryUsage::get()?;
265 }
266 }
267
268 if limit.hard_limit.is_none() {
269 let phys = get_physical_memory()?;
270 limit.hard_limit.replace(phys);
271 }
272 if limit.soft_limit.is_none() {
273 limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
274 }
275
276 Ok((usage, limit))
277}
278
279#[cfg(not(target_os = "linux"))]
280fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
281 Ok((
282 MemoryUsage { bytes: 0 },
283 MemoryLimits {
284 soft_limit: None,
285 hard_limit: None,
286 },
287 ))
288}
289
290static USER_HARD_LIMIT: AtomicUsize = AtomicUsize::new(0);
291static USER_SOFT_LIMIT: AtomicUsize = AtomicUsize::new(0);
292static USER_LOW_THRESH: AtomicUsize = AtomicUsize::new(0);
293
294pub fn set_hard_limit(limit: usize) {
295 USER_HARD_LIMIT.store(limit, Ordering::Relaxed);
296}
297
298pub fn set_soft_limit(limit: usize) {
299 USER_SOFT_LIMIT.store(limit, Ordering::Relaxed);
300}
301
302pub fn set_low_memory_thresh(limit: usize) {
303 USER_LOW_THRESH.store(limit, Ordering::Relaxed);
304}
305
306pub fn get_hard_limit() -> Option<u64> {
307 let user_limit = USER_HARD_LIMIT.load(Ordering::Relaxed);
308 if user_limit > 0 {
309 return Some(user_limit as u64);
310 }
311
312 let result = get_usage_and_limit()
313 .ok()
314 .and_then(|(_, limits)| limits.hard_limit);
315 result
316}
317
318pub fn get_soft_limit() -> Option<u64> {
319 let user_limit = USER_SOFT_LIMIT.load(Ordering::Relaxed);
320 if user_limit > 0 {
321 return Some(user_limit as u64);
322 }
323
324 get_usage_and_limit()
325 .ok()
326 .and_then(|(_, limits)| limits.soft_limit)
327}
328
329pub fn get_low_memory_thresh() -> Option<u64> {
330 let user_thresh = USER_LOW_THRESH.load(Ordering::Relaxed);
331 if user_thresh > 0 {
332 return Some(user_thresh as u64);
333 }
334
335 get_usage_and_limit()
336 .ok()
337 .and_then(|(_, limits)| limits.soft_limit)
338 .map(|limit| limit * 8 / 10)
339}
340
341pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
342 let (usage, mut limit) = get_usage_and_limit_impl()?;
343
344 if let Ok(value) = std::env::var("KUMOD_MEMORY_HARD_LIMIT") {
345 limit.hard_limit.replace(
346 value
347 .parse()
348 .context("failed to parse KUMOD_MEMORY_HARD_LIMIT env var")?,
349 );
350 }
351 if let Ok(value) = std::env::var("KUMOD_MEMORY_SOFT_LIMIT") {
352 limit.soft_limit.replace(
353 value
354 .parse()
355 .context("failed to parse KUMOD_MEMORY_SOFT_LIMIT env var")?,
356 );
357 }
358
359 let hard = USER_HARD_LIMIT.load(Ordering::Relaxed);
360 if hard > 0 {
361 limit.hard_limit.replace(hard as u64);
362 }
363 let soft = USER_SOFT_LIMIT.load(Ordering::Relaxed);
364 if soft > 0 {
365 limit.soft_limit.replace(soft as u64);
366 }
367
368 Ok((usage, limit))
369}
370
371pub fn purge_thread_cache() {
375 unsafe {
376 tikv_jemalloc_sys::mallctl(
377 b"thread.tcache.flush\0".as_ptr() as *const _,
378 std::ptr::null_mut(),
379 std::ptr::null_mut(),
380 std::ptr::null_mut(),
381 0,
382 );
383 }
384}
385
386fn purge_all_arenas() {
389 unsafe {
390 tikv_jemalloc_sys::mallctl(
393 b"arena.4096.purge\0".as_ptr() as *const _,
394 std::ptr::null_mut(),
395 std::ptr::null_mut(),
396 std::ptr::null_mut(),
397 0,
398 );
399 }
400}
401
402fn dump_heap_profile() {
406 unsafe {
407 tikv_jemalloc_sys::mallctl(
408 b"prof.dump\0".as_ptr() as *const _,
409 std::ptr::null_mut(),
410 std::ptr::null_mut(),
411 std::ptr::null_mut(),
412 0,
413 );
414 }
415}
416
417fn memory_thread() {
420 let mut is_ok = true;
421 let mut is_low = false;
422
423 let (tx, rx) = tokio::sync::watch::channel(());
424 SUBSCRIBER.lock().unwrap().replace(rx);
425
426 loop {
427 MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
428
429 match get_usage_and_limit() {
430 Ok((
431 MemoryUsage { bytes: usage },
432 MemoryLimits {
433 soft_limit: Some(limit),
434 hard_limit: _,
435 },
436 )) => {
437 let was_ok = is_ok;
438 is_ok = usage < limit;
439 OVER_LIMIT.store(is_ok, Ordering::SeqCst);
440 HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
441 MEM_USAGE.set(usage as f64);
442 MEM_LIMIT.set(limit as f64);
443
444 let mut low_thresh = USER_LOW_THRESH.load(Ordering::Relaxed) as u64;
445 if low_thresh == 0 {
446 low_thresh = limit * 8 / 10;
447 }
448 LOW_MEM_THRESH.set(low_thresh as f64);
449
450 let was_low = is_low;
451 is_low = usage > low_thresh;
452 LOW_MEM.store(is_low, Ordering::SeqCst);
453
454 if !was_low && is_low {
455 LOW_COUNT.inc();
457 }
458
459 if !is_ok && was_ok {
460 dump_heap_profile();
462 OVER_LIMIT_COUNT.inc();
463 tracing::error!(
464 "memory usage {} exceeds limit {}",
465 human(usage),
466 human(limit)
467 );
468 tx.send(()).ok();
469 purge_all_arenas();
470 } else if !was_ok && is_ok {
471 dump_heap_profile();
473 tracing::error!(
474 "memory usage {} is back within limit {}",
475 human(usage),
476 human(limit)
477 );
478 tx.send(()).ok();
479 } else {
480 if !is_ok {
481 purge_all_arenas();
482 }
483 tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
484 }
485 }
486 Ok((
487 MemoryUsage { bytes: 0 },
488 MemoryLimits {
489 soft_limit: None,
490 hard_limit: None,
491 },
492 )) => {
493 HEAD_ROOM.store(1024, Ordering::SeqCst);
496 }
497 Ok(_) => {}
498 Err(err) => tracing::error!("unable to query memory info: {err:#}"),
499 }
500
501 std::thread::sleep(Duration::from_secs(3));
502 }
503}
504
505pub fn get_headroom() -> usize {
508 HEAD_ROOM.load(Ordering::SeqCst)
509}
510
511pub fn low_memory() -> bool {
513 LOW_MEM.load(Ordering::SeqCst)
514}
515
516pub fn memory_status() -> MemoryStatus {
518 if get_headroom() == 0 {
519 MemoryStatus::NoMemory
520 } else if low_memory() {
521 MemoryStatus::LowMemory
522 } else {
523 MemoryStatus::Ok
524 }
525}
526
527#[derive(Copy, Clone, Debug, Eq, PartialEq)]
528pub enum MemoryStatus {
529 Ok,
530 LowMemory,
531 NoMemory,
532}
533
534pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
537 SUBSCRIBER.lock().unwrap().clone()
538}
539
540pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
541 loop {
542 if let Some(rx) = subscribe_to_memory_status_changes() {
543 return rx;
544 }
545 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
546 }
547}
548
549pub fn setup_memory_limit() -> anyhow::Result<()> {
551 let (usage, limit) = get_usage_and_limit()?;
552 tracing::debug!("usage: {usage:?}");
553 tracing::info!("using limits: {limit}");
554
555 std::thread::Builder::new()
556 .name("memory-monitor".to_string())
557 .spawn(memory_thread)?;
558
559 Ok(())
560}
561
562#[cfg(target_os = "linux")]
565fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
566 let paths = get_cgroups_relative_paths()?;
567 let h: Box<dyn Hierarchy> = if v2 {
568 Box::new(V2::new())
569 } else {
570 Box::new(V1::new())
571 };
572
573 let path = paths
574 .get("")
575 .ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
576
577 let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
578 Ok(cgroup)
579}
580
581#[derive(Copy, Clone)]
582pub struct NumBytes(pub usize);
583
584impl std::fmt::Debug for NumBytes {
585 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
586 write!(
587 fmt,
588 "{} ({})",
589 self.0,
590 humansize::format_size(self.0, humansize::DECIMAL)
591 )
592 }
593}
594
595impl From<usize> for NumBytes {
596 fn from(n: usize) -> Self {
597 Self(n)
598 }
599}
600
601impl From<u64> for NumBytes {
602 fn from(n: u64) -> Self {
603 Self(n as usize)
604 }
605}
606
607#[derive(Copy, Clone)]
608pub struct Number(pub usize);
609
610impl std::fmt::Debug for Number {
611 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
612 use num_format::{Locale, ToFormattedString};
613 write!(
614 fmt,
615 "{} ({})",
616 self.0,
617 self.0.to_formatted_string(&Locale::en)
618 )
619 }
620}
621
622impl From<usize> for Number {
623 fn from(n: usize) -> Self {
624 Self(n)
625 }
626}
627
628impl From<u64> for Number {
629 fn from(n: u64) -> Self {
630 Self(n as usize)
631 }
632}
633
634#[derive(Debug)]
635pub struct JemallocStats {
636 pub allocated: NumBytes,
638
639 pub active: NumBytes,
645
646 pub metadata: NumBytes,
649
650 pub resident: NumBytes,
653
654 pub mapped: NumBytes,
657
658 pub retained: NumBytes,
661}
662
663impl JemallocStats {
664 pub fn collect() -> Self {
665 use tikv_jemalloc_ctl::{epoch, stats};
666
667 epoch::advance().ok();
671
672 Self {
673 allocated: stats::allocated::read().unwrap_or(0).into(),
674 active: stats::active::read().unwrap_or(0).into(),
675 metadata: stats::metadata::read().unwrap_or(0).into(),
676 resident: stats::resident::read().unwrap_or(0).into(),
677 mapped: stats::mapped::read().unwrap_or(0).into(),
678 retained: stats::retained::read().unwrap_or(0).into(),
679 }
680 }
681}