kumo_server_memory/
lib.rs

1//! This module contains logic to reason about memory usage,
2//! implement memory limits, and some helpers to attempt to
3//! release cached memory back to the system when memory
4//! is low.
5
6use crate::tracking::TrackingAllocator;
7use anyhow::Context;
8#[cfg(target_os = "linux")]
9use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
10#[cfg(target_os = "linux")]
11use cgroups_rs::hierarchies::{V1, V2};
12#[cfg(target_os = "linux")]
13use cgroups_rs::memory::MemController;
14#[cfg(target_os = "linux")]
15use cgroups_rs::{Hierarchy, MaxValue};
16use kumo_prometheus::declare_metric;
17use nix::sys::resource::{rlim_t, RLIM_INFINITY};
18use nix::unistd::{sysconf, SysconfVar};
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{LazyLock, Mutex};
21use std::time::Duration;
22use tikv_jemallocator::Jemalloc;
23use tokio::sync::watch::Receiver;
24
25pub mod tracking;
26
27pub use tracking::{set_tracking_callstacks, tracking_stats};
28
29#[global_allocator]
30static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
31
32declare_metric! {
33/// How many times the low memory threshold was exceeded.
34static LOW_COUNT: IntCounter("memory_low_count");
35}
36
37declare_metric! {
38/// how many times the soft memory limit was exceeded
39static OVER_LIMIT_COUNT: IntCounter("memory_over_limit_count");
40}
41
42declare_metric! {
43/// number of bytes of used memory (Resident Set Size)
44static MEM_USAGE: Gauge("memory_usage");
45}
46
47declare_metric! {
48/// soft memory limit measured in bytes
49static MEM_LIMIT: Gauge("memory_limit");
50}
51
52declare_metric! {
53/// number of bytes of used memory (allocated by Rust)
54static MEM_COUNTED: Gauge("memory_usage_rust");
55}
56
57declare_metric! {
58/// low memory threshold measured in bytes
59static LOW_MEM_THRESH: Gauge("memory_low_thresh");
60}
61
62static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
63
64static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
65static LOW_MEM: AtomicBool = AtomicBool::new(false);
66
67// Default this to a reasonable non-zero value, as it is possible
68// in the test harness on the CI system to inject concurrent with
69// early startup.
70// Having this return 0 and propagate back as a 421 makes it harder
71// to to write tests that care precisely about a response if they
72// have to deal with this small window on startup.
73static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
74
75/// Represents the current memory usage of this process
76#[derive(Debug, Clone, Copy)]
77pub struct MemoryUsage {
78    pub bytes: u64,
79}
80
81impl std::fmt::Display for MemoryUsage {
82    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
83        write!(fmt, "{}", human(self.bytes))
84    }
85}
86
87impl MemoryUsage {
88    pub fn get() -> anyhow::Result<Self> {
89        #[cfg(target_os = "linux")]
90        {
91            if let Ok(v2) = Self::get_cgroup(true) {
92                return Ok(v2);
93            }
94            if let Ok(v1) = Self::get_cgroup(false) {
95                return Ok(v1);
96            }
97        }
98        Self::get_linux_statm()
99    }
100
101    #[cfg(target_os = "linux")]
102    fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
103        let cgroup = get_my_cgroup(v2)?;
104        let mem: &MemController = cgroup
105            .controller_of()
106            .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
107        let stat = mem.memory_stat();
108        Ok(Self {
109            bytes: stat.usage_in_bytes,
110        })
111    }
112
113    pub fn get_linux_statm() -> anyhow::Result<Self> {
114        let data = std::fs::read_to_string("/proc/self/statm")?;
115        let fields: Vec<&str> = data.split(' ').collect();
116        let rss: u64 = fields[1].parse()?;
117        Ok(Self {
118            bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
119        })
120    }
121}
122
123fn human(n: u64) -> String {
124    humansize::format_size(n, humansize::DECIMAL)
125}
126
127/// Represents a constraint on memory usage
128#[derive(Debug, Clone, Copy)]
129pub struct MemoryLimits {
130    pub soft_limit: Option<u64>,
131    pub hard_limit: Option<u64>,
132}
133
134impl std::fmt::Display for MemoryLimits {
135    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
136        let soft = self.soft_limit.map(human);
137        let hard = self.hard_limit.map(human);
138        write!(fmt, "soft={soft:?}, hard={hard:?}")
139    }
140}
141
142impl MemoryLimits {
143    pub fn min(self, other: Self) -> Self {
144        Self {
145            soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
146            hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
147        }
148    }
149
150    pub fn is_unlimited(&self) -> bool {
151        self.soft_limit.is_none() && self.hard_limit.is_none()
152    }
153}
154
155fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
156    if rlim == RLIM_INFINITY {
157        None
158    } else {
159        Some(rlim)
160    }
161}
162
163#[cfg(target_os = "linux")]
164fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
165    Ok(match value {
166        None | Some(MaxValue::Max) => None,
167        Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
168        Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
169    })
170}
171
172fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
173    match (a, b) {
174        (Some(a), Some(b)) => Some(a.min(b)),
175        (Some(a), None) | (None, Some(a)) => Some(a),
176        (None, None) => None,
177    }
178}
179
180impl MemoryLimits {
181    pub fn get_rlimits() -> anyhow::Result<Self> {
182        #[cfg(not(target_os = "macos"))]
183        let (rss_soft, rss_hard) =
184            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
185        #[cfg(target_os = "macos")]
186        let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
187
188        let soft_limit = rlim_to_opt(rss_soft);
189        let hard_limit = rlim_to_opt(rss_hard);
190
191        Ok(Self {
192            soft_limit,
193            hard_limit,
194        })
195    }
196
197    #[cfg(target_os = "linux")]
198    fn get_any_cgroup() -> anyhow::Result<Self> {
199        if let Ok(cg) = Self::get_cgroup(true) {
200            return Ok(cg);
201        }
202        Self::get_cgroup(false)
203    }
204
205    #[cfg(target_os = "linux")]
206    pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
207        let cgroup = get_my_cgroup(v2)?;
208        let mem: &MemController = cgroup
209            .controller_of()
210            .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
211
212        let limits = mem.get_mem()?;
213        Ok(Self {
214            soft_limit: max_value_to_opt(limits.high)?,
215            hard_limit: max_value_to_opt(limits.max)?,
216        })
217    }
218}
219
220/// Returns the amount of physical memory available to the system.
221/// This is linux specific.
222#[cfg(target_os = "linux")]
223fn get_physical_memory() -> anyhow::Result<u64> {
224    let data = std::fs::read_to_string("/proc/meminfo")?;
225    for line in data.lines() {
226        if line.starts_with("MemTotal:") {
227            let mut iter = line.rsplit(' ');
228            let unit = iter
229                .next()
230                .ok_or_else(|| anyhow::anyhow!("expected unit"))?;
231            if unit != "kB" {
232                anyhow::bail!("unsupported /proc/meminfo unit {unit}");
233            }
234            let value = iter
235                .next()
236                .ok_or_else(|| anyhow::anyhow!("expected value"))?;
237            let value: u64 = value.parse()?;
238
239            return Ok(value * 1024);
240        }
241    }
242    anyhow::bail!("MemTotal not found in /proc/meminfo");
243}
244
245/// Retrieves the current usage and limits.
246/// This is a bit of a murky area as, on Linux, the cgroup reported usage
247/// appears to be nonsensical when no limits are configured.
248/// So we first obtain the limits from cgroups, and if they are set,
249/// we return the usage from cgroups along with it,
250/// otherwise we get the ulimit limits and look at the more general
251/// usage numbers to go with it.
252///
253/// If no limits are explicitly configured, we'll assume a hard limit
254/// equal to the physical ram on the system, and a soft limit of 75%
255/// of whatever we've determined the hard limit to be.
256#[cfg(target_os = "linux")]
257fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
258    let mut limit = MemoryLimits::get_rlimits()?;
259    let mut usage = MemoryUsage::get_linux_statm()?;
260
261    if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
262        if !cg_lim.is_unlimited() {
263            limit = limit.min(cg_lim);
264            usage = MemoryUsage::get()?;
265        }
266    }
267
268    if limit.hard_limit.is_none() {
269        let phys = get_physical_memory()?;
270        limit.hard_limit.replace(phys);
271    }
272    if limit.soft_limit.is_none() {
273        limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
274    }
275
276    Ok((usage, limit))
277}
278
279#[cfg(not(target_os = "linux"))]
280fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
281    Ok((
282        MemoryUsage { bytes: 0 },
283        MemoryLimits {
284            soft_limit: None,
285            hard_limit: None,
286        },
287    ))
288}
289
290static USER_HARD_LIMIT: AtomicUsize = AtomicUsize::new(0);
291static USER_SOFT_LIMIT: AtomicUsize = AtomicUsize::new(0);
292static USER_LOW_THRESH: AtomicUsize = AtomicUsize::new(0);
293
294pub fn set_hard_limit(limit: usize) {
295    USER_HARD_LIMIT.store(limit, Ordering::Relaxed);
296}
297
298pub fn set_soft_limit(limit: usize) {
299    USER_SOFT_LIMIT.store(limit, Ordering::Relaxed);
300}
301
302pub fn set_low_memory_thresh(limit: usize) {
303    USER_LOW_THRESH.store(limit, Ordering::Relaxed);
304}
305
306pub fn get_hard_limit() -> Option<u64> {
307    let user_limit = USER_HARD_LIMIT.load(Ordering::Relaxed);
308    if user_limit > 0 {
309        return Some(user_limit as u64);
310    }
311
312    let result = get_usage_and_limit()
313        .ok()
314        .and_then(|(_, limits)| limits.hard_limit);
315    result
316}
317
318pub fn get_soft_limit() -> Option<u64> {
319    let user_limit = USER_SOFT_LIMIT.load(Ordering::Relaxed);
320    if user_limit > 0 {
321        return Some(user_limit as u64);
322    }
323
324    get_usage_and_limit()
325        .ok()
326        .and_then(|(_, limits)| limits.soft_limit)
327}
328
329pub fn get_low_memory_thresh() -> Option<u64> {
330    let user_thresh = USER_LOW_THRESH.load(Ordering::Relaxed);
331    if user_thresh > 0 {
332        return Some(user_thresh as u64);
333    }
334
335    get_usage_and_limit()
336        .ok()
337        .and_then(|(_, limits)| limits.soft_limit)
338        .map(|limit| limit * 8 / 10)
339}
340
341pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
342    let (usage, mut limit) = get_usage_and_limit_impl()?;
343
344    if let Ok(value) = std::env::var("KUMOD_MEMORY_HARD_LIMIT") {
345        limit.hard_limit.replace(
346            value
347                .parse()
348                .context("failed to parse KUMOD_MEMORY_HARD_LIMIT env var")?,
349        );
350    }
351    if let Ok(value) = std::env::var("KUMOD_MEMORY_SOFT_LIMIT") {
352        limit.soft_limit.replace(
353            value
354                .parse()
355                .context("failed to parse KUMOD_MEMORY_SOFT_LIMIT env var")?,
356        );
357    }
358
359    let hard = USER_HARD_LIMIT.load(Ordering::Relaxed);
360    if hard > 0 {
361        limit.hard_limit.replace(hard as u64);
362    }
363    let soft = USER_SOFT_LIMIT.load(Ordering::Relaxed);
364    if soft > 0 {
365        limit.soft_limit.replace(soft as u64);
366    }
367
368    Ok((usage, limit))
369}
370
371/// To be called when a thread goes idle; it will flush cached
372/// memory out of the thread local cache to be returned/reused
373/// elsewhere in the system
374pub fn purge_thread_cache() {
375    unsafe {
376        tikv_jemalloc_sys::mallctl(
377            b"thread.tcache.flush\0".as_ptr() as *const _,
378            std::ptr::null_mut(),
379            std::ptr::null_mut(),
380            std::ptr::null_mut(),
381            0,
382        );
383    }
384}
385
386/// To be called when used memory is high: will aggressively
387/// flush and release cached memory
388fn purge_all_arenas() {
389    unsafe {
390        // 4096 is MALLCTL_ARENAS_ALL, which is a magic value
391        // that instructs jemalloc to purge all arenas
392        tikv_jemalloc_sys::mallctl(
393            b"arena.4096.purge\0".as_ptr() as *const _,
394            std::ptr::null_mut(),
395            std::ptr::null_mut(),
396            std::ptr::null_mut(),
397            0,
398        );
399    }
400}
401
402/// If `MALLOC_CONF='prof:true,prof_prefix:jeprof.out'` is set in the
403/// environment, calling this will generate a heap profile in the
404/// current directory
405fn dump_heap_profile() {
406    unsafe {
407        tikv_jemalloc_sys::mallctl(
408            b"prof.dump\0".as_ptr() as *const _,
409            std::ptr::null_mut(),
410            std::ptr::null_mut(),
411            std::ptr::null_mut(),
412            0,
413        );
414    }
415}
416
417/// The memory thread continuously examines memory usage and limits
418/// and maintains global counters to track the memory state
419fn memory_thread() {
420    let mut is_ok = true;
421    let mut is_low = false;
422
423    let (tx, rx) = tokio::sync::watch::channel(());
424    SUBSCRIBER.lock().unwrap().replace(rx);
425
426    loop {
427        MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
428
429        match get_usage_and_limit() {
430            Ok((
431                MemoryUsage { bytes: usage },
432                MemoryLimits {
433                    soft_limit: Some(limit),
434                    hard_limit: _,
435                },
436            )) => {
437                let was_ok = is_ok;
438                is_ok = usage < limit;
439                OVER_LIMIT.store(is_ok, Ordering::SeqCst);
440                HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
441                MEM_USAGE.set(usage as f64);
442                MEM_LIMIT.set(limit as f64);
443
444                let mut low_thresh = USER_LOW_THRESH.load(Ordering::Relaxed) as u64;
445                if low_thresh == 0 {
446                    low_thresh = limit * 8 / 10;
447                }
448                LOW_MEM_THRESH.set(low_thresh as f64);
449
450                let was_low = is_low;
451                is_low = usage > low_thresh;
452                LOW_MEM.store(is_low, Ordering::SeqCst);
453
454                if !was_low && is_low {
455                    // Transition to low memory
456                    LOW_COUNT.inc();
457                }
458
459                if !is_ok && was_ok {
460                    // Transition from OK -> !OK
461                    dump_heap_profile();
462                    OVER_LIMIT_COUNT.inc();
463                    tracing::error!(
464                        "memory usage {} exceeds limit {}",
465                        human(usage),
466                        human(limit)
467                    );
468                    tx.send(()).ok();
469                    purge_all_arenas();
470                } else if !was_ok && is_ok {
471                    // Transition from !OK -> OK
472                    dump_heap_profile();
473                    tracing::error!(
474                        "memory usage {} is back within limit {}",
475                        human(usage),
476                        human(limit)
477                    );
478                    tx.send(()).ok();
479                } else {
480                    if !is_ok {
481                        purge_all_arenas();
482                    }
483                    tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
484                }
485            }
486            Ok((
487                MemoryUsage { bytes: 0 },
488                MemoryLimits {
489                    soft_limit: None,
490                    hard_limit: None,
491                },
492            )) => {
493                // We don't know anything about the memory usage on this
494                // system, just pretend everything is fine
495                HEAD_ROOM.store(1024, Ordering::SeqCst);
496            }
497            Ok(_) => {}
498            Err(err) => tracing::error!("unable to query memory info: {err:#}"),
499        }
500
501        std::thread::sleep(Duration::from_secs(3));
502    }
503}
504
505/// Returns the amount of headroom; the number of bytes that can
506/// be allocated before we hit the soft limit
507pub fn get_headroom() -> usize {
508    HEAD_ROOM.load(Ordering::SeqCst)
509}
510
511/// Returns true when we are within 10% if the soft limit
512pub fn low_memory() -> bool {
513    LOW_MEM.load(Ordering::SeqCst)
514}
515
516/// Indicates the overall memory status
517pub fn memory_status() -> MemoryStatus {
518    if get_headroom() == 0 {
519        MemoryStatus::NoMemory
520    } else if low_memory() {
521        MemoryStatus::LowMemory
522    } else {
523        MemoryStatus::Ok
524    }
525}
526
527#[derive(Copy, Clone, Debug, Eq, PartialEq)]
528pub enum MemoryStatus {
529    Ok,
530    LowMemory,
531    NoMemory,
532}
533
534/// Returns a receiver that will notify when memory status
535/// changes from OK -> !OK or vice versa.
536pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
537    SUBSCRIBER.lock().unwrap().clone()
538}
539
540pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
541    loop {
542        if let Some(rx) = subscribe_to_memory_status_changes() {
543            return rx;
544        }
545        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
546    }
547}
548
549/// Initialize the memory thread to monitor memory usage/limits
550pub fn setup_memory_limit() -> anyhow::Result<()> {
551    let (usage, limit) = get_usage_and_limit()?;
552    tracing::debug!("usage: {usage:?}");
553    tracing::info!("using limits: {limit}");
554
555    std::thread::Builder::new()
556        .name("memory-monitor".to_string())
557        .spawn(memory_thread)?;
558
559    Ok(())
560}
561
562/// Returns a Cgroup for the current process.
563/// Can return either a v2 or a v1 cgroup.
564#[cfg(target_os = "linux")]
565fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
566    let paths = get_cgroups_relative_paths()?;
567    let h: Box<dyn Hierarchy> = if v2 {
568        Box::new(V2::new())
569    } else {
570        Box::new(V1::new())
571    };
572
573    let path = paths
574        .get("")
575        .ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
576
577    let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
578    Ok(cgroup)
579}
580
581#[derive(Copy, Clone)]
582pub struct NumBytes(pub usize);
583
584impl std::fmt::Debug for NumBytes {
585    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
586        write!(
587            fmt,
588            "{} ({})",
589            self.0,
590            humansize::format_size(self.0, humansize::DECIMAL)
591        )
592    }
593}
594
595impl From<usize> for NumBytes {
596    fn from(n: usize) -> Self {
597        Self(n)
598    }
599}
600
601impl From<u64> for NumBytes {
602    fn from(n: u64) -> Self {
603        Self(n as usize)
604    }
605}
606
607#[derive(Copy, Clone)]
608pub struct Number(pub usize);
609
610impl std::fmt::Debug for Number {
611    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
612        use num_format::{Locale, ToFormattedString};
613        write!(
614            fmt,
615            "{} ({})",
616            self.0,
617            self.0.to_formatted_string(&Locale::en)
618        )
619    }
620}
621
622impl From<usize> for Number {
623    fn from(n: usize) -> Self {
624        Self(n)
625    }
626}
627
628impl From<u64> for Number {
629    fn from(n: u64) -> Self {
630        Self(n as usize)
631    }
632}
633
634#[derive(Debug)]
635pub struct JemallocStats {
636    /// stats.allocated` - Total number of bytes allocated by the application.
637    pub allocated: NumBytes,
638
639    /// `stats.active`
640    /// Total number of bytes in active pages allocated by the application. This is a multiple of
641    /// the page size, and greater than or equal to stats.allocated. This does not include
642    /// `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator
643    /// metadata.
644    pub active: NumBytes,
645
646    /// stats.metadata (size_t) r- [--enable-stats]
647    /// Total number of bytes dedicated to metadata, which comprise base allocations used for bootstrap-sensitive allocator metadata structures (see `stats.arenas.<i>.base`) and internal allocations (see `stats.arenas.<i>.internal`). Transparent huge page (enabled with opt.metadata_thp) usage is not considered.
648    pub metadata: NumBytes,
649
650    /// stats.resident (size_t) r- [--enable-stats]
651    /// Maximum number of bytes in physically resident data pages mapped by the allocator, comprising all pages dedicated to allocator metadata, pages backing active allocations, and unused dirty pages. This is a maximum rather than precise because pages may not actually be physically resident if they correspond to demand-zeroed virtual memory that has not yet been touched. This is a multiple of the page size, and is larger than stats.active.
652    pub resident: NumBytes,
653
654    /// stats.mapped (size_t) r- [--enable-stats]
655    /// Total number of bytes in active extents mapped by the allocator. This is larger than stats.active. This does not include inactive extents, even those that contain unused dirty pages, which means that there is no strict ordering between this and stats.resident.
656    pub mapped: NumBytes,
657
658    /// stats.retained (size_t) r- [--enable-stats]
659    /// Total number of bytes in virtual memory mappings that were retained rather than being returned to the operating system via e.g. munmap(2) or similar. Retained virtual memory is typically untouched, decommitted, or purged, so it has no strongly associated physical memory (see extent hooks for details). Retained memory is excluded from mapped memory statistics, e.g. stats.mapped.
660    pub retained: NumBytes,
661}
662
663impl JemallocStats {
664    pub fn collect() -> Self {
665        use tikv_jemalloc_ctl::{epoch, stats};
666
667        // jemalloc stats are cached and don't fully report current
668        // values until an epoch is advanced, so let's explicitly
669        // do that here.
670        epoch::advance().ok();
671
672        Self {
673            allocated: stats::allocated::read().unwrap_or(0).into(),
674            active: stats::active::read().unwrap_or(0).into(),
675            metadata: stats::metadata::read().unwrap_or(0).into(),
676            resident: stats::resident::read().unwrap_or(0).into(),
677            mapped: stats::mapped::read().unwrap_or(0).into(),
678            retained: stats::retained::read().unwrap_or(0).into(),
679        }
680    }
681}