1use crate::tracking::TrackingAllocator;
7use anyhow::Context;
8#[cfg(target_os = "linux")]
9use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
10#[cfg(target_os = "linux")]
11use cgroups_rs::hierarchies::{V1, V2};
12#[cfg(target_os = "linux")]
13use cgroups_rs::memory::MemController;
14#[cfg(target_os = "linux")]
15use cgroups_rs::{Hierarchy, MaxValue};
16use nix::sys::resource::{rlim_t, RLIM_INFINITY};
17use nix::unistd::{sysconf, SysconfVar};
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{LazyLock, Mutex};
20use std::time::Duration;
21use tikv_jemallocator::Jemalloc;
22use tokio::sync::watch::Receiver;
23
24pub mod tracking;
25
26pub use tracking::{set_tracking_callstacks, tracking_stats};
27
28#[global_allocator]
29static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
30
31static LOW_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
32 metrics::describe_counter!(
33 "memory_low_count",
34 "how many times the low memory threshold was exceeded"
35 );
36 metrics::counter!("memory_low_count")
37});
38static OVER_LIMIT_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
39 metrics::describe_counter!(
40 "memory_over_limit_count",
41 "how many times the soft memory limit was exceeded"
42 );
43 metrics::counter!("memory_over_limit_count")
44});
45static MEM_USAGE: LazyLock<metrics::Gauge> = LazyLock::new(|| {
46 metrics::describe_gauge!(
47 "memory_usage",
48 "number of bytes of used memory (Resident Set Size)"
49 );
50 metrics::gauge!("memory_usage")
51});
52static MEM_LIMIT: LazyLock<metrics::Gauge> = LazyLock::new(|| {
53 metrics::describe_gauge!("memory_limit", "soft memory limit measured in bytes");
54 metrics::gauge!("memory_limit")
55});
56static MEM_COUNTED: LazyLock<metrics::Gauge> = LazyLock::new(|| {
57 metrics::describe_gauge!(
58 "memory_usage_rust",
59 "number of bytes of used memory (allocated by Rust)"
60 );
61 metrics::gauge!("memory_usage_rust")
62});
63static LOW_MEM_THRESH: LazyLock<metrics::Gauge> = LazyLock::new(|| {
64 metrics::describe_gauge!(
65 "memory_low_thresh",
66 "low memory threshold measured in bytes"
67 );
68 metrics::gauge!("memory_low_thresh")
69});
70static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
71
72static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
73static LOW_MEM: AtomicBool = AtomicBool::new(false);
74
75static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
82
83#[derive(Debug, Clone, Copy)]
85pub struct MemoryUsage {
86 pub bytes: u64,
87}
88
89impl std::fmt::Display for MemoryUsage {
90 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
91 write!(fmt, "{}", human(self.bytes))
92 }
93}
94
95impl MemoryUsage {
96 pub fn get() -> anyhow::Result<Self> {
97 #[cfg(target_os = "linux")]
98 {
99 if let Ok(v2) = Self::get_cgroup(true) {
100 return Ok(v2);
101 }
102 if let Ok(v1) = Self::get_cgroup(false) {
103 return Ok(v1);
104 }
105 }
106 Self::get_linux_statm()
107 }
108
109 #[cfg(target_os = "linux")]
110 fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
111 let cgroup = get_my_cgroup(v2)?;
112 let mem: &MemController = cgroup
113 .controller_of()
114 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
115 let stat = mem.memory_stat();
116 Ok(Self {
117 bytes: stat.usage_in_bytes,
118 })
119 }
120
121 pub fn get_linux_statm() -> anyhow::Result<Self> {
122 let data = std::fs::read_to_string("/proc/self/statm")?;
123 let fields: Vec<&str> = data.split(' ').collect();
124 let rss: u64 = fields[1].parse()?;
125 Ok(Self {
126 bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
127 })
128 }
129}
130
131fn human(n: u64) -> String {
132 humansize::format_size(n, humansize::DECIMAL)
133}
134
135#[derive(Debug, Clone, Copy)]
137pub struct MemoryLimits {
138 pub soft_limit: Option<u64>,
139 pub hard_limit: Option<u64>,
140}
141
142impl std::fmt::Display for MemoryLimits {
143 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
144 let soft = self.soft_limit.map(human);
145 let hard = self.hard_limit.map(human);
146 write!(fmt, "soft={soft:?}, hard={hard:?}")
147 }
148}
149
150impl MemoryLimits {
151 pub fn min(self, other: Self) -> Self {
152 Self {
153 soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
154 hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
155 }
156 }
157
158 pub fn is_unlimited(&self) -> bool {
159 self.soft_limit.is_none() && self.hard_limit.is_none()
160 }
161}
162
163fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
164 if rlim == RLIM_INFINITY {
165 None
166 } else {
167 Some(rlim)
168 }
169}
170
171#[cfg(target_os = "linux")]
172fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
173 Ok(match value {
174 None | Some(MaxValue::Max) => None,
175 Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
176 Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
177 })
178}
179
180fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
181 match (a, b) {
182 (Some(a), Some(b)) => Some(a.min(b)),
183 (Some(a), None) | (None, Some(a)) => Some(a),
184 (None, None) => None,
185 }
186}
187
188impl MemoryLimits {
189 pub fn get_rlimits() -> anyhow::Result<Self> {
190 #[cfg(not(target_os = "macos"))]
191 let (rss_soft, rss_hard) =
192 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
193 #[cfg(target_os = "macos")]
194 let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
195
196 let soft_limit = rlim_to_opt(rss_soft);
197 let hard_limit = rlim_to_opt(rss_hard);
198
199 Ok(Self {
200 soft_limit,
201 hard_limit,
202 })
203 }
204
205 #[cfg(target_os = "linux")]
206 fn get_any_cgroup() -> anyhow::Result<Self> {
207 if let Ok(cg) = Self::get_cgroup(true) {
208 return Ok(cg);
209 }
210 Self::get_cgroup(false)
211 }
212
213 #[cfg(target_os = "linux")]
214 pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
215 let cgroup = get_my_cgroup(v2)?;
216 let mem: &MemController = cgroup
217 .controller_of()
218 .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
219
220 let limits = mem.get_mem()?;
221 Ok(Self {
222 soft_limit: max_value_to_opt(limits.high)?,
223 hard_limit: max_value_to_opt(limits.max)?,
224 })
225 }
226}
227
228#[cfg(target_os = "linux")]
231fn get_physical_memory() -> anyhow::Result<u64> {
232 let data = std::fs::read_to_string("/proc/meminfo")?;
233 for line in data.lines() {
234 if line.starts_with("MemTotal:") {
235 let mut iter = line.rsplit(' ');
236 let unit = iter
237 .next()
238 .ok_or_else(|| anyhow::anyhow!("expected unit"))?;
239 if unit != "kB" {
240 anyhow::bail!("unsupported /proc/meminfo unit {unit}");
241 }
242 let value = iter
243 .next()
244 .ok_or_else(|| anyhow::anyhow!("expected value"))?;
245 let value: u64 = value.parse()?;
246
247 return Ok(value * 1024);
248 }
249 }
250 anyhow::bail!("MemTotal not found in /proc/meminfo");
251}
252
253#[cfg(target_os = "linux")]
265fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
266 let mut limit = MemoryLimits::get_rlimits()?;
267 let mut usage = MemoryUsage::get_linux_statm()?;
268
269 if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
270 if !cg_lim.is_unlimited() {
271 limit = limit.min(cg_lim);
272 usage = MemoryUsage::get()?;
273 }
274 }
275
276 if limit.hard_limit.is_none() {
277 let phys = get_physical_memory()?;
278 limit.hard_limit.replace(phys);
279 }
280 if limit.soft_limit.is_none() {
281 limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
282 }
283
284 Ok((usage, limit))
285}
286
287#[cfg(not(target_os = "linux"))]
288fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
289 Ok((
290 MemoryUsage { bytes: 0 },
291 MemoryLimits {
292 soft_limit: None,
293 hard_limit: None,
294 },
295 ))
296}
297
298static USER_HARD_LIMIT: AtomicUsize = AtomicUsize::new(0);
299static USER_SOFT_LIMIT: AtomicUsize = AtomicUsize::new(0);
300static USER_LOW_THRESH: AtomicUsize = AtomicUsize::new(0);
301
302pub fn set_hard_limit(limit: usize) {
303 USER_HARD_LIMIT.store(limit, Ordering::Relaxed);
304}
305
306pub fn set_soft_limit(limit: usize) {
307 USER_SOFT_LIMIT.store(limit, Ordering::Relaxed);
308}
309
310pub fn set_low_memory_thresh(limit: usize) {
311 USER_LOW_THRESH.store(limit, Ordering::Relaxed);
312}
313
314pub fn get_hard_limit() -> Option<u64> {
315 let user_limit = USER_HARD_LIMIT.load(Ordering::Relaxed);
316 if user_limit > 0 {
317 return Some(user_limit as u64);
318 }
319
320 let result = get_usage_and_limit()
321 .ok()
322 .and_then(|(_, limits)| limits.hard_limit);
323 result
324}
325
326pub fn get_soft_limit() -> Option<u64> {
327 let user_limit = USER_SOFT_LIMIT.load(Ordering::Relaxed);
328 if user_limit > 0 {
329 return Some(user_limit as u64);
330 }
331
332 get_usage_and_limit()
333 .ok()
334 .and_then(|(_, limits)| limits.soft_limit)
335}
336
337pub fn get_low_memory_thresh() -> Option<u64> {
338 let user_thresh = USER_LOW_THRESH.load(Ordering::Relaxed);
339 if user_thresh > 0 {
340 return Some(user_thresh as u64);
341 }
342
343 get_usage_and_limit()
344 .ok()
345 .and_then(|(_, limits)| limits.soft_limit)
346 .map(|limit| limit * 8 / 10)
347}
348
349pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
350 let (usage, mut limit) = get_usage_and_limit_impl()?;
351
352 if let Ok(value) = std::env::var("KUMOD_MEMORY_HARD_LIMIT") {
353 limit.hard_limit.replace(
354 value
355 .parse()
356 .context("failed to parse KUMOD_MEMORY_HARD_LIMIT env var")?,
357 );
358 }
359 if let Ok(value) = std::env::var("KUMOD_MEMORY_SOFT_LIMIT") {
360 limit.soft_limit.replace(
361 value
362 .parse()
363 .context("failed to parse KUMOD_MEMORY_SOFT_LIMIT env var")?,
364 );
365 }
366
367 let hard = USER_HARD_LIMIT.load(Ordering::Relaxed);
368 if hard > 0 {
369 limit.hard_limit.replace(hard as u64);
370 }
371 let soft = USER_SOFT_LIMIT.load(Ordering::Relaxed);
372 if soft > 0 {
373 limit.soft_limit.replace(soft as u64);
374 }
375
376 Ok((usage, limit))
377}
378
379pub fn purge_thread_cache() {
383 unsafe {
384 tikv_jemalloc_sys::mallctl(
385 b"thread.tcache.flush\0".as_ptr() as *const _,
386 std::ptr::null_mut(),
387 std::ptr::null_mut(),
388 std::ptr::null_mut(),
389 0,
390 );
391 }
392}
393
394fn purge_all_arenas() {
397 unsafe {
398 tikv_jemalloc_sys::mallctl(
401 b"arena.4096.purge\0".as_ptr() as *const _,
402 std::ptr::null_mut(),
403 std::ptr::null_mut(),
404 std::ptr::null_mut(),
405 0,
406 );
407 }
408}
409
410fn dump_heap_profile() {
414 unsafe {
415 tikv_jemalloc_sys::mallctl(
416 b"prof.dump\0".as_ptr() as *const _,
417 std::ptr::null_mut(),
418 std::ptr::null_mut(),
419 std::ptr::null_mut(),
420 0,
421 );
422 }
423}
424
425fn memory_thread() {
428 let mut is_ok = true;
429 let mut is_low = false;
430
431 let (tx, rx) = tokio::sync::watch::channel(());
432 SUBSCRIBER.lock().unwrap().replace(rx);
433
434 loop {
435 MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
436
437 match get_usage_and_limit() {
438 Ok((
439 MemoryUsage { bytes: usage },
440 MemoryLimits {
441 soft_limit: Some(limit),
442 hard_limit: _,
443 },
444 )) => {
445 let was_ok = is_ok;
446 is_ok = usage < limit;
447 OVER_LIMIT.store(is_ok, Ordering::SeqCst);
448 HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
449 MEM_USAGE.set(usage as f64);
450 MEM_LIMIT.set(limit as f64);
451
452 let mut low_thresh = USER_LOW_THRESH.load(Ordering::Relaxed) as u64;
453 if low_thresh == 0 {
454 low_thresh = limit * 8 / 10;
455 }
456 LOW_MEM_THRESH.set(low_thresh as f64);
457
458 let was_low = is_low;
459 is_low = usage > low_thresh;
460 LOW_MEM.store(is_low, Ordering::SeqCst);
461
462 if !was_low && is_low {
463 LOW_COUNT.increment(1);
465 }
466
467 if !is_ok && was_ok {
468 dump_heap_profile();
470 OVER_LIMIT_COUNT.increment(1);
471 tracing::error!(
472 "memory usage {} exceeds limit {}",
473 human(usage),
474 human(limit)
475 );
476 tx.send(()).ok();
477 purge_all_arenas();
478 } else if !was_ok && is_ok {
479 dump_heap_profile();
481 tracing::error!(
482 "memory usage {} is back within limit {}",
483 human(usage),
484 human(limit)
485 );
486 tx.send(()).ok();
487 } else {
488 if !is_ok {
489 purge_all_arenas();
490 }
491 tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
492 }
493 }
494 Ok((
495 MemoryUsage { bytes: 0 },
496 MemoryLimits {
497 soft_limit: None,
498 hard_limit: None,
499 },
500 )) => {
501 HEAD_ROOM.store(1024, Ordering::SeqCst);
504 }
505 Ok(_) => {}
506 Err(err) => tracing::error!("unable to query memory info: {err:#}"),
507 }
508
509 std::thread::sleep(Duration::from_secs(3));
510 }
511}
512
513pub fn get_headroom() -> usize {
516 HEAD_ROOM.load(Ordering::SeqCst)
517}
518
519pub fn low_memory() -> bool {
521 LOW_MEM.load(Ordering::SeqCst)
522}
523
524pub fn memory_status() -> MemoryStatus {
526 if get_headroom() == 0 {
527 MemoryStatus::NoMemory
528 } else if low_memory() {
529 MemoryStatus::LowMemory
530 } else {
531 MemoryStatus::Ok
532 }
533}
534
535#[derive(Copy, Clone, Debug, Eq, PartialEq)]
536pub enum MemoryStatus {
537 Ok,
538 LowMemory,
539 NoMemory,
540}
541
542pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
545 SUBSCRIBER.lock().unwrap().clone()
546}
547
548pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
549 loop {
550 if let Some(rx) = subscribe_to_memory_status_changes() {
551 return rx;
552 }
553 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
554 }
555}
556
557pub fn setup_memory_limit() -> anyhow::Result<()> {
559 let (usage, limit) = get_usage_and_limit()?;
560 tracing::debug!("usage: {usage:?}");
561 tracing::info!("using limits: {limit}");
562
563 std::thread::Builder::new()
564 .name("memory-monitor".to_string())
565 .spawn(memory_thread)?;
566
567 Ok(())
568}
569
570#[cfg(target_os = "linux")]
573fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
574 let paths = get_cgroups_relative_paths()?;
575 let h: Box<dyn Hierarchy> = if v2 {
576 Box::new(V2::new())
577 } else {
578 Box::new(V1::new())
579 };
580
581 let path = paths
582 .get("")
583 .ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
584
585 let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
586 Ok(cgroup)
587}
588
589#[derive(Copy, Clone)]
590pub struct NumBytes(pub usize);
591
592impl std::fmt::Debug for NumBytes {
593 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
594 write!(
595 fmt,
596 "{} ({})",
597 self.0,
598 humansize::format_size(self.0, humansize::DECIMAL)
599 )
600 }
601}
602
603impl From<usize> for NumBytes {
604 fn from(n: usize) -> Self {
605 Self(n)
606 }
607}
608
609impl From<u64> for NumBytes {
610 fn from(n: u64) -> Self {
611 Self(n as usize)
612 }
613}
614
615#[derive(Copy, Clone)]
616pub struct Number(pub usize);
617
618impl std::fmt::Debug for Number {
619 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
620 use num_format::{Locale, ToFormattedString};
621 write!(
622 fmt,
623 "{} ({})",
624 self.0,
625 self.0.to_formatted_string(&Locale::en)
626 )
627 }
628}
629
630impl From<usize> for Number {
631 fn from(n: usize) -> Self {
632 Self(n)
633 }
634}
635
636impl From<u64> for Number {
637 fn from(n: u64) -> Self {
638 Self(n as usize)
639 }
640}
641
642#[derive(Debug)]
643pub struct JemallocStats {
644 pub allocated: NumBytes,
646
647 pub active: NumBytes,
653
654 pub metadata: NumBytes,
657
658 pub resident: NumBytes,
661
662 pub mapped: NumBytes,
665
666 pub retained: NumBytes,
669}
670
671impl JemallocStats {
672 pub fn collect() -> Self {
673 use tikv_jemalloc_ctl::{epoch, stats};
674
675 epoch::advance().ok();
679
680 Self {
681 allocated: stats::allocated::read().unwrap_or(0).into(),
682 active: stats::active::read().unwrap_or(0).into(),
683 metadata: stats::metadata::read().unwrap_or(0).into(),
684 resident: stats::resident::read().unwrap_or(0).into(),
685 mapped: stats::mapped::read().unwrap_or(0).into(),
686 retained: stats::retained::read().unwrap_or(0).into(),
687 }
688 }
689}