1use crate::tracking::TrackingAllocator;
7use anyhow::Context;
8#[cfg(target_os = "linux")]
9use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
10#[cfg(target_os = "linux")]
11use cgroups_rs::hierarchies::{V1, V2};
12#[cfg(target_os = "linux")]
13use cgroups_rs::memory::MemController;
14#[cfg(target_os = "linux")]
15use cgroups_rs::{Hierarchy, MaxValue};
16use nix::sys::resource::{rlim_t, RLIM_INFINITY};
17use nix::unistd::{sysconf, SysconfVar};
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{LazyLock, Mutex};
20use std::time::Duration;
21use tikv_jemallocator::Jemalloc;
22use tokio::sync::watch::Receiver;
23
24pub mod tracking;
25
26pub use tracking::{set_tracking_callstacks, tracking_stats};
27
28#[global_allocator]
29static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
30
31static LOW_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
32 metrics::describe_counter!(
33 "memory_low_count",
34 "how many times the low memory threshold was exceeded"
35 );
36 metrics::counter!("memory_low_count")
37});
38static OVER_LIMIT_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
39 metrics::describe_counter!(
40 "memory_over_limit_count",
41 "how many times the soft memory limit was exceeded"
42 );
43 metrics::counter!("memory_over_limit_count")
44});
45static MEM_USAGE: LazyLock<metrics::Gauge> = LazyLock::new(|| {
46 metrics::describe_gauge!(
47 "memory_usage",
48 "number of bytes of used memory (Resident Set Size)"
49 );
50 metrics::gauge!("memory_usage")
51});
52static MEM_LIMIT: LazyLock<metrics::Gauge> = LazyLock::new(|| {
53 metrics::describe_gauge!("memory_limit", "soft memory limit measured in bytes");
54 metrics::gauge!("memory_limit")
55});
56static MEM_COUNTED: LazyLock<metrics::Gauge> = LazyLock::new(|| {
57 metrics::describe_gauge!(
58 "memory_usage_rust",
59 "number of bytes of used memory (allocated by Rust)"
60 );
61 metrics::gauge!("memory_usage_rust")
62});
63static LOW_MEM_THRESH: LazyLock<metrics::Gauge> = LazyLock::new(|| {
64 metrics::describe_gauge!(
65 "memory_low_thresh",
66 "low memory threshold measured in bytes"
67 );
68 metrics::gauge!("memory_low_thresh")
69});
70static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
71
72static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
73static LOW_MEM: AtomicBool = AtomicBool::new(false);
74
75static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
82
83#[derive(Debug, Clone, Copy)]
85pub struct MemoryUsage {
86 pub bytes: u64,
87}
88
89impl std::fmt::Display for MemoryUsage {
90 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
91 write!(fmt, "{}", human(self.bytes))
92 }
93}
94
95impl MemoryUsage {
96 pub fn get() -> anyhow::Result<Self> {
97 #[cfg(target_os = "linux")]
98 {
99 if let Ok(v2) = Self::get_cgroup(true) {
100 return Ok(v2);
101 }
102 if let Ok(v1) = Self::get_cgroup(false) {
103 return Ok(v1);
104 }
105 }
106 Self::get_linux_statm()
107 }
108
109 #[cfg(target_os = "linux")]
110 fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
111 let cgroup = get_my_cgroup(v2)?;
112 let mem: &MemController = cgroup
113 .controller_of()
114 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
115 let stat = mem.memory_stat();
116 Ok(Self {
117 bytes: stat.usage_in_bytes,
118 })
119 }
120
121 pub fn get_linux_statm() -> anyhow::Result<Self> {
122 let data = std::fs::read_to_string("/proc/self/statm")?;
123 let fields: Vec<&str> = data.split(' ').collect();
124 let rss: u64 = fields[1].parse()?;
125 Ok(Self {
126 bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
127 })
128 }
129}
130
131fn human(n: u64) -> String {
132 humansize::format_size(n, humansize::DECIMAL)
133}
134
135#[derive(Debug, Clone, Copy)]
137pub struct MemoryLimits {
138 pub soft_limit: Option<u64>,
139 pub hard_limit: Option<u64>,
140}
141
142impl std::fmt::Display for MemoryLimits {
143 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
144 let soft = self.soft_limit.map(human);
145 let hard = self.hard_limit.map(human);
146 write!(fmt, "soft={soft:?}, hard={hard:?}")
147 }
148}
149
150impl MemoryLimits {
151 pub fn min(self, other: Self) -> Self {
152 Self {
153 soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
154 hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
155 }
156 }
157
158 pub fn is_unlimited(&self) -> bool {
159 self.soft_limit.is_none() && self.hard_limit.is_none()
160 }
161}
162
163fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
164 if rlim == RLIM_INFINITY {
165 None
166 } else {
167 Some(rlim)
168 }
169}
170
171#[cfg(target_os = "linux")]
172fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
173 Ok(match value {
174 None | Some(MaxValue::Max) => None,
175 Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
176 Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
177 })
178}
179
180fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
181 match (a, b) {
182 (Some(a), Some(b)) => Some(a.min(b)),
183 (Some(a), None) | (None, Some(a)) => Some(a),
184 (None, None) => None,
185 }
186}
187
188impl MemoryLimits {
189 pub fn get_rlimits() -> anyhow::Result<Self> {
190 #[cfg(not(target_os = "macos"))]
191 let (rss_soft, rss_hard) =
192 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
193 #[cfg(target_os = "macos")]
194 let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
195
196 let soft_limit = rlim_to_opt(rss_soft);
197 let hard_limit = rlim_to_opt(rss_hard);
198
199 Ok(Self {
200 soft_limit,
201 hard_limit,
202 })
203 }
204
205 #[cfg(target_os = "linux")]
206 fn get_any_cgroup() -> anyhow::Result<Self> {
207 if let Ok(cg) = Self::get_cgroup(true) {
208 return Ok(cg);
209 }
210 Self::get_cgroup(false)
211 }
212
213 #[cfg(target_os = "linux")]
214 pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
215 let cgroup = get_my_cgroup(v2)?;
216 let mem: &MemController = cgroup
217 .controller_of()
218 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
219
220 let limits = mem.get_mem()?;
221 Ok(Self {
222 soft_limit: max_value_to_opt(limits.high)?,
223 hard_limit: max_value_to_opt(limits.max)?,
224 })
225 }
226}
227
228#[cfg(target_os = "linux")]
231fn get_physical_memory() -> anyhow::Result<u64> {
232 let data = std::fs::read_to_string("/proc/meminfo")?;
233 for line in data.lines() {
234 if line.starts_with("MemTotal:") {
235 let mut iter = line.rsplit(' ');
236 let unit = iter
237 .next()
238 .ok_or_else(|| anyhow::anyhow!("expected unit"))?;
239 if unit != "kB" {
240 anyhow::bail!("unsupported /proc/meminfo unit {unit}");
241 }
242 let value = iter
243 .next()
244 .ok_or_else(|| anyhow::anyhow!("expected value"))?;
245 let value: u64 = value.parse()?;
246
247 return Ok(value * 1024);
248 }
249 }
250 anyhow::bail!("MemTotal not found in /proc/meminfo");
251}
252
253#[cfg(target_os = "linux")]
265fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
266 let mut limit = MemoryLimits::get_rlimits()?;
267 let mut usage = MemoryUsage::get_linux_statm()?;
268
269 if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
270 if !cg_lim.is_unlimited() {
271 limit = limit.min(cg_lim);
272 usage = MemoryUsage::get()?;
273 }
274 }
275
276 if limit.hard_limit.is_none() {
277 let phys = get_physical_memory()?;
278 limit.hard_limit.replace(phys);
279 }
280 if limit.soft_limit.is_none() {
281 limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
282 }
283
284 Ok((usage, limit))
285}
286
287#[cfg(not(target_os = "linux"))]
288fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
289 Ok((
290 MemoryUsage { bytes: 0 },
291 MemoryLimits {
292 soft_limit: None,
293 hard_limit: None,
294 },
295 ))
296}
297
298static USER_HARD_LIMIT: AtomicUsize = AtomicUsize::new(0);
299static USER_SOFT_LIMIT: AtomicUsize = AtomicUsize::new(0);
300static USER_LOW_THRESH: AtomicUsize = AtomicUsize::new(0);
301
302pub fn set_hard_limit(limit: usize) {
303 USER_HARD_LIMIT.store(limit, Ordering::Relaxed);
304}
305
306pub fn set_soft_limit(limit: usize) {
307 USER_SOFT_LIMIT.store(limit, Ordering::Relaxed);
308}
309
310pub fn set_low_memory_thresh(limit: usize) {
311 USER_LOW_THRESH.store(limit, Ordering::Relaxed);
312}
313
314pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
315 let (usage, mut limit) = get_usage_and_limit_impl()?;
316
317 if let Ok(value) = std::env::var("KUMOD_MEMORY_HARD_LIMIT") {
318 limit.hard_limit.replace(
319 value
320 .parse()
321 .context("failed to parse KUMOD_MEMORY_HARD_LIMIT env var")?,
322 );
323 }
324 if let Ok(value) = std::env::var("KUMOD_MEMORY_SOFT_LIMIT") {
325 limit.soft_limit.replace(
326 value
327 .parse()
328 .context("failed to parse KUMOD_MEMORY_SOFT_LIMIT env var")?,
329 );
330 }
331
332 let hard = USER_HARD_LIMIT.load(Ordering::Relaxed);
333 if hard > 0 {
334 limit.hard_limit.replace(hard as u64);
335 }
336 let soft = USER_SOFT_LIMIT.load(Ordering::Relaxed);
337 if soft > 0 {
338 limit.soft_limit.replace(soft as u64);
339 }
340
341 Ok((usage, limit))
342}
343
344pub fn purge_thread_cache() {
348 unsafe {
349 tikv_jemalloc_sys::mallctl(
350 b"thread.tcache.flush\0".as_ptr() as *const _,
351 std::ptr::null_mut(),
352 std::ptr::null_mut(),
353 std::ptr::null_mut(),
354 0,
355 );
356 }
357}
358
359fn purge_all_arenas() {
362 unsafe {
363 tikv_jemalloc_sys::mallctl(
366 b"arena.4096.purge\0".as_ptr() as *const _,
367 std::ptr::null_mut(),
368 std::ptr::null_mut(),
369 std::ptr::null_mut(),
370 0,
371 );
372 }
373}
374
375fn dump_heap_profile() {
379 unsafe {
380 tikv_jemalloc_sys::mallctl(
381 b"prof.dump\0".as_ptr() as *const _,
382 std::ptr::null_mut(),
383 std::ptr::null_mut(),
384 std::ptr::null_mut(),
385 0,
386 );
387 }
388}
389
390fn memory_thread() {
393 let mut is_ok = true;
394 let mut is_low = false;
395
396 let (tx, rx) = tokio::sync::watch::channel(());
397 SUBSCRIBER.lock().unwrap().replace(rx);
398
399 loop {
400 MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
401
402 match get_usage_and_limit() {
403 Ok((
404 MemoryUsage { bytes: usage },
405 MemoryLimits {
406 soft_limit: Some(limit),
407 hard_limit: _,
408 },
409 )) => {
410 let was_ok = is_ok;
411 is_ok = usage < limit;
412 OVER_LIMIT.store(is_ok, Ordering::SeqCst);
413 HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
414 MEM_USAGE.set(usage as f64);
415 MEM_LIMIT.set(limit as f64);
416
417 let mut low_thresh = USER_LOW_THRESH.load(Ordering::Relaxed) as u64;
418 if low_thresh == 0 {
419 low_thresh = limit * 8 / 10;
420 }
421 LOW_MEM_THRESH.set(low_thresh as f64);
422
423 let was_low = is_low;
424 is_low = usage > low_thresh;
425 LOW_MEM.store(is_low, Ordering::SeqCst);
426
427 if !was_low && is_low {
428 LOW_COUNT.increment(1);
430 }
431
432 if !is_ok && was_ok {
433 dump_heap_profile();
435 OVER_LIMIT_COUNT.increment(1);
436 tracing::error!(
437 "memory usage {} exceeds limit {}",
438 human(usage),
439 human(limit)
440 );
441 tx.send(()).ok();
442 purge_all_arenas();
443 } else if !was_ok && is_ok {
444 dump_heap_profile();
446 tracing::error!(
447 "memory usage {} is back within limit {}",
448 human(usage),
449 human(limit)
450 );
451 tx.send(()).ok();
452 } else {
453 if !is_ok {
454 purge_all_arenas();
455 }
456 tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
457 }
458 }
459 Ok((
460 MemoryUsage { bytes: 0 },
461 MemoryLimits {
462 soft_limit: None,
463 hard_limit: None,
464 },
465 )) => {
466 HEAD_ROOM.store(1024, Ordering::SeqCst);
469 }
470 Ok(_) => {}
471 Err(err) => tracing::error!("unable to query memory info: {err:#}"),
472 }
473
474 std::thread::sleep(Duration::from_secs(3));
475 }
476}
477
478pub fn get_headroom() -> usize {
481 HEAD_ROOM.load(Ordering::SeqCst)
482}
483
484pub fn low_memory() -> bool {
486 LOW_MEM.load(Ordering::SeqCst)
487}
488
489pub fn memory_status() -> MemoryStatus {
491 if get_headroom() == 0 {
492 MemoryStatus::NoMemory
493 } else if low_memory() {
494 MemoryStatus::LowMemory
495 } else {
496 MemoryStatus::Ok
497 }
498}
499
500#[derive(Copy, Clone, Debug, Eq, PartialEq)]
501pub enum MemoryStatus {
502 Ok,
503 LowMemory,
504 NoMemory,
505}
506
507pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
510 SUBSCRIBER.lock().unwrap().clone()
511}
512
513pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
514 loop {
515 if let Some(rx) = subscribe_to_memory_status_changes() {
516 return rx;
517 }
518 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
519 }
520}
521
522pub fn setup_memory_limit() -> anyhow::Result<()> {
524 let (usage, limit) = get_usage_and_limit()?;
525 tracing::debug!("usage: {usage:?}");
526 tracing::info!("using limits: {limit}");
527
528 std::thread::Builder::new()
529 .name("memory-monitor".to_string())
530 .spawn(memory_thread)?;
531
532 Ok(())
533}
534
535#[cfg(target_os = "linux")]
538fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
539 let paths = get_cgroups_relative_paths()?;
540 let h: Box<dyn Hierarchy> = if v2 {
541 Box::new(V2::new())
542 } else {
543 Box::new(V1::new())
544 };
545
546 let path = paths
547 .get("")
548 .ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
549
550 let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
551 Ok(cgroup)
552}
553
554#[derive(Copy, Clone)]
555pub struct NumBytes(pub usize);
556
557impl std::fmt::Debug for NumBytes {
558 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
559 write!(
560 fmt,
561 "{} ({})",
562 self.0,
563 humansize::format_size(self.0, humansize::DECIMAL)
564 )
565 }
566}
567
568impl From<usize> for NumBytes {
569 fn from(n: usize) -> Self {
570 Self(n)
571 }
572}
573
574impl From<u64> for NumBytes {
575 fn from(n: u64) -> Self {
576 Self(n as usize)
577 }
578}
579
580#[derive(Copy, Clone)]
581pub struct Number(pub usize);
582
583impl std::fmt::Debug for Number {
584 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
585 use num_format::{Locale, ToFormattedString};
586 write!(
587 fmt,
588 "{} ({})",
589 self.0,
590 self.0.to_formatted_string(&Locale::en)
591 )
592 }
593}
594
595impl From<usize> for Number {
596 fn from(n: usize) -> Self {
597 Self(n)
598 }
599}
600
601impl From<u64> for Number {
602 fn from(n: u64) -> Self {
603 Self(n as usize)
604 }
605}
606
607#[derive(Debug)]
608pub struct JemallocStats {
609 pub allocated: NumBytes,
611
612 pub active: NumBytes,
618
619 pub metadata: NumBytes,
622
623 pub resident: NumBytes,
626
627 pub mapped: NumBytes,
630
631 pub retained: NumBytes,
634}
635
636impl JemallocStats {
637 pub fn collect() -> Self {
638 use tikv_jemalloc_ctl::{epoch, stats};
639
640 epoch::advance().ok();
644
645 Self {
646 allocated: stats::allocated::read().unwrap_or(0).into(),
647 active: stats::active::read().unwrap_or(0).into(),
648 metadata: stats::metadata::read().unwrap_or(0).into(),
649 resident: stats::resident::read().unwrap_or(0).into(),
650 mapped: stats::mapped::read().unwrap_or(0).into(),
651 retained: stats::retained::read().unwrap_or(0).into(),
652 }
653 }
654}