kumo_server_memory/
lib.rs

1//! This module contains logic to reason about memory usage,
2//! implement memory limits, and some helpers to attempt to
3//! release cached memory back to the system when memory
4//! is low.
5
6use crate::tracking::TrackingAllocator;
7use anyhow::Context;
8#[cfg(target_os = "linux")]
9use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
10#[cfg(target_os = "linux")]
11use cgroups_rs::hierarchies::{V1, V2};
12#[cfg(target_os = "linux")]
13use cgroups_rs::memory::MemController;
14#[cfg(target_os = "linux")]
15use cgroups_rs::{Hierarchy, MaxValue};
16use nix::sys::resource::{rlim_t, RLIM_INFINITY};
17use nix::unistd::{sysconf, SysconfVar};
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::sync::{LazyLock, Mutex};
20use std::time::Duration;
21use tikv_jemallocator::Jemalloc;
22use tokio::sync::watch::Receiver;
23
24pub mod tracking;
25
26pub use tracking::{set_tracking_callstacks, tracking_stats};
27
28#[global_allocator]
29static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
30
31static LOW_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
32    metrics::describe_counter!(
33        "memory_low_count",
34        "how many times the low memory threshold was exceeded"
35    );
36    metrics::counter!("memory_low_count")
37});
38static OVER_LIMIT_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
39    metrics::describe_counter!(
40        "memory_over_limit_count",
41        "how many times the soft memory limit was exceeded"
42    );
43    metrics::counter!("memory_over_limit_count")
44});
45static MEM_USAGE: LazyLock<metrics::Gauge> = LazyLock::new(|| {
46    metrics::describe_gauge!(
47        "memory_usage",
48        "number of bytes of used memory (Resident Set Size)"
49    );
50    metrics::gauge!("memory_usage")
51});
52static MEM_LIMIT: LazyLock<metrics::Gauge> = LazyLock::new(|| {
53    metrics::describe_gauge!("memory_limit", "soft memory limit measured in bytes");
54    metrics::gauge!("memory_limit")
55});
56static MEM_COUNTED: LazyLock<metrics::Gauge> = LazyLock::new(|| {
57    metrics::describe_gauge!(
58        "memory_usage_rust",
59        "number of bytes of used memory (allocated by Rust)"
60    );
61    metrics::gauge!("memory_usage_rust")
62});
63static LOW_MEM_THRESH: LazyLock<metrics::Gauge> = LazyLock::new(|| {
64    metrics::describe_gauge!(
65        "memory_low_thresh",
66        "low memory threshold measured in bytes"
67    );
68    metrics::gauge!("memory_low_thresh")
69});
70static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
71
72static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
73static LOW_MEM: AtomicBool = AtomicBool::new(false);
74
75// Default this to a reasonable non-zero value, as it is possible
76// in the test harness on the CI system to inject concurrent with
77// early startup.
78// Having this return 0 and propagate back as a 421 makes it harder
79// to to write tests that care precisely about a response if they
80// have to deal with this small window on startup.
81static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
82
83/// Represents the current memory usage of this process
84#[derive(Debug, Clone, Copy)]
85pub struct MemoryUsage {
86    pub bytes: u64,
87}
88
89impl std::fmt::Display for MemoryUsage {
90    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
91        write!(fmt, "{}", human(self.bytes))
92    }
93}
94
95impl MemoryUsage {
96    pub fn get() -> anyhow::Result<Self> {
97        #[cfg(target_os = "linux")]
98        {
99            if let Ok(v2) = Self::get_cgroup(true) {
100                return Ok(v2);
101            }
102            if let Ok(v1) = Self::get_cgroup(false) {
103                return Ok(v1);
104            }
105        }
106        Self::get_linux_statm()
107    }
108
109    #[cfg(target_os = "linux")]
110    fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
111        let cgroup = get_my_cgroup(v2)?;
112        let mem: &MemController = cgroup
113            .controller_of()
114            .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
115        let stat = mem.memory_stat();
116        Ok(Self {
117            bytes: stat.usage_in_bytes,
118        })
119    }
120
121    pub fn get_linux_statm() -> anyhow::Result<Self> {
122        let data = std::fs::read_to_string("/proc/self/statm")?;
123        let fields: Vec<&str> = data.split(' ').collect();
124        let rss: u64 = fields[1].parse()?;
125        Ok(Self {
126            bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
127        })
128    }
129}
130
131fn human(n: u64) -> String {
132    humansize::format_size(n, humansize::DECIMAL)
133}
134
135/// Represents a constraint on memory usage
136#[derive(Debug, Clone, Copy)]
137pub struct MemoryLimits {
138    pub soft_limit: Option<u64>,
139    pub hard_limit: Option<u64>,
140}
141
142impl std::fmt::Display for MemoryLimits {
143    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
144        let soft = self.soft_limit.map(human);
145        let hard = self.hard_limit.map(human);
146        write!(fmt, "soft={soft:?}, hard={hard:?}")
147    }
148}
149
150impl MemoryLimits {
151    pub fn min(self, other: Self) -> Self {
152        Self {
153            soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
154            hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
155        }
156    }
157
158    pub fn is_unlimited(&self) -> bool {
159        self.soft_limit.is_none() && self.hard_limit.is_none()
160    }
161}
162
163fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
164    if rlim == RLIM_INFINITY {
165        None
166    } else {
167        Some(rlim)
168    }
169}
170
171#[cfg(target_os = "linux")]
172fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
173    Ok(match value {
174        None | Some(MaxValue::Max) => None,
175        Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
176        Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
177    })
178}
179
180fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
181    match (a, b) {
182        (Some(a), Some(b)) => Some(a.min(b)),
183        (Some(a), None) | (None, Some(a)) => Some(a),
184        (None, None) => None,
185    }
186}
187
188impl MemoryLimits {
189    pub fn get_rlimits() -> anyhow::Result<Self> {
190        #[cfg(not(target_os = "macos"))]
191        let (rss_soft, rss_hard) =
192            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
193        #[cfg(target_os = "macos")]
194        let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
195
196        let soft_limit = rlim_to_opt(rss_soft);
197        let hard_limit = rlim_to_opt(rss_hard);
198
199        Ok(Self {
200            soft_limit,
201            hard_limit,
202        })
203    }
204
205    #[cfg(target_os = "linux")]
206    fn get_any_cgroup() -> anyhow::Result<Self> {
207        if let Ok(cg) = Self::get_cgroup(true) {
208            return Ok(cg);
209        }
210        Self::get_cgroup(false)
211    }
212
213    #[cfg(target_os = "linux")]
214    pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
215        let cgroup = get_my_cgroup(v2)?;
216        let mem: &MemController = cgroup
217            .controller_of()
218            .ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
219
220        let limits = mem.get_mem()?;
221        Ok(Self {
222            soft_limit: max_value_to_opt(limits.high)?,
223            hard_limit: max_value_to_opt(limits.max)?,
224        })
225    }
226}
227
228/// Returns the amount of physical memory available to the system.
229/// This is linux specific.
230#[cfg(target_os = "linux")]
231fn get_physical_memory() -> anyhow::Result<u64> {
232    let data = std::fs::read_to_string("/proc/meminfo")?;
233    for line in data.lines() {
234        if line.starts_with("MemTotal:") {
235            let mut iter = line.rsplit(' ');
236            let unit = iter
237                .next()
238                .ok_or_else(|| anyhow::anyhow!("expected unit"))?;
239            if unit != "kB" {
240                anyhow::bail!("unsupported /proc/meminfo unit {unit}");
241            }
242            let value = iter
243                .next()
244                .ok_or_else(|| anyhow::anyhow!("expected value"))?;
245            let value: u64 = value.parse()?;
246
247            return Ok(value * 1024);
248        }
249    }
250    anyhow::bail!("MemTotal not found in /proc/meminfo");
251}
252
253/// Retrieves the current usage and limits.
254/// This is a bit of a murky area as, on Linux, the cgroup reported usage
255/// appears to be nonsensical when no limits are configured.
256/// So we first obtain the limits from cgroups, and if they are set,
257/// we return the usage from cgroups along with it,
258/// otherwise we get the ulimit limits and look at the more general
259/// usage numbers to go with it.
260///
261/// If no limits are explicitly configured, we'll assume a hard limit
262/// equal to the physical ram on the system, and a soft limit of 75%
263/// of whatever we've determined the hard limit to be.
264#[cfg(target_os = "linux")]
265fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
266    let mut limit = MemoryLimits::get_rlimits()?;
267    let mut usage = MemoryUsage::get_linux_statm()?;
268
269    if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
270        if !cg_lim.is_unlimited() {
271            limit = limit.min(cg_lim);
272            usage = MemoryUsage::get()?;
273        }
274    }
275
276    if limit.hard_limit.is_none() {
277        let phys = get_physical_memory()?;
278        limit.hard_limit.replace(phys);
279    }
280    if limit.soft_limit.is_none() {
281        limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
282    }
283
284    Ok((usage, limit))
285}
286
287#[cfg(not(target_os = "linux"))]
288fn get_usage_and_limit_impl() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
289    Ok((
290        MemoryUsage { bytes: 0 },
291        MemoryLimits {
292            soft_limit: None,
293            hard_limit: None,
294        },
295    ))
296}
297
298static USER_HARD_LIMIT: AtomicUsize = AtomicUsize::new(0);
299static USER_SOFT_LIMIT: AtomicUsize = AtomicUsize::new(0);
300static USER_LOW_THRESH: AtomicUsize = AtomicUsize::new(0);
301
302pub fn set_hard_limit(limit: usize) {
303    USER_HARD_LIMIT.store(limit, Ordering::Relaxed);
304}
305
306pub fn set_soft_limit(limit: usize) {
307    USER_SOFT_LIMIT.store(limit, Ordering::Relaxed);
308}
309
310pub fn set_low_memory_thresh(limit: usize) {
311    USER_LOW_THRESH.store(limit, Ordering::Relaxed);
312}
313
314pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
315    let (usage, mut limit) = get_usage_and_limit_impl()?;
316
317    if let Ok(value) = std::env::var("KUMOD_MEMORY_HARD_LIMIT") {
318        limit.hard_limit.replace(
319            value
320                .parse()
321                .context("failed to parse KUMOD_MEMORY_HARD_LIMIT env var")?,
322        );
323    }
324    if let Ok(value) = std::env::var("KUMOD_MEMORY_SOFT_LIMIT") {
325        limit.soft_limit.replace(
326            value
327                .parse()
328                .context("failed to parse KUMOD_MEMORY_SOFT_LIMIT env var")?,
329        );
330    }
331
332    let hard = USER_HARD_LIMIT.load(Ordering::Relaxed);
333    if hard > 0 {
334        limit.hard_limit.replace(hard as u64);
335    }
336    let soft = USER_SOFT_LIMIT.load(Ordering::Relaxed);
337    if soft > 0 {
338        limit.soft_limit.replace(soft as u64);
339    }
340
341    Ok((usage, limit))
342}
343
344/// To be called when a thread goes idle; it will flush cached
345/// memory out of the thread local cache to be returned/reused
346/// elsewhere in the system
347pub fn purge_thread_cache() {
348    unsafe {
349        tikv_jemalloc_sys::mallctl(
350            b"thread.tcache.flush\0".as_ptr() as *const _,
351            std::ptr::null_mut(),
352            std::ptr::null_mut(),
353            std::ptr::null_mut(),
354            0,
355        );
356    }
357}
358
359/// To be called when used memory is high: will aggressively
360/// flush and release cached memory
361fn purge_all_arenas() {
362    unsafe {
363        // 4096 is MALLCTL_ARENAS_ALL, which is a magic value
364        // that instructs jemalloc to purge all arenas
365        tikv_jemalloc_sys::mallctl(
366            b"arena.4096.purge\0".as_ptr() as *const _,
367            std::ptr::null_mut(),
368            std::ptr::null_mut(),
369            std::ptr::null_mut(),
370            0,
371        );
372    }
373}
374
375/// If `MALLOC_CONF='prof:true,prof_prefix:jeprof.out'` is set in the
376/// environment, calling this will generate a heap profile in the
377/// current directory
378fn dump_heap_profile() {
379    unsafe {
380        tikv_jemalloc_sys::mallctl(
381            b"prof.dump\0".as_ptr() as *const _,
382            std::ptr::null_mut(),
383            std::ptr::null_mut(),
384            std::ptr::null_mut(),
385            0,
386        );
387    }
388}
389
390/// The memory thread continuously examines memory usage and limits
391/// and maintains global counters to track the memory state
392fn memory_thread() {
393    let mut is_ok = true;
394    let mut is_low = false;
395
396    let (tx, rx) = tokio::sync::watch::channel(());
397    SUBSCRIBER.lock().unwrap().replace(rx);
398
399    loop {
400        MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
401
402        match get_usage_and_limit() {
403            Ok((
404                MemoryUsage { bytes: usage },
405                MemoryLimits {
406                    soft_limit: Some(limit),
407                    hard_limit: _,
408                },
409            )) => {
410                let was_ok = is_ok;
411                is_ok = usage < limit;
412                OVER_LIMIT.store(is_ok, Ordering::SeqCst);
413                HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
414                MEM_USAGE.set(usage as f64);
415                MEM_LIMIT.set(limit as f64);
416
417                let mut low_thresh = USER_LOW_THRESH.load(Ordering::Relaxed) as u64;
418                if low_thresh == 0 {
419                    low_thresh = limit * 8 / 10;
420                }
421                LOW_MEM_THRESH.set(low_thresh as f64);
422
423                let was_low = is_low;
424                is_low = usage > low_thresh;
425                LOW_MEM.store(is_low, Ordering::SeqCst);
426
427                if !was_low && is_low {
428                    // Transition to low memory
429                    LOW_COUNT.increment(1);
430                }
431
432                if !is_ok && was_ok {
433                    // Transition from OK -> !OK
434                    dump_heap_profile();
435                    OVER_LIMIT_COUNT.increment(1);
436                    tracing::error!(
437                        "memory usage {} exceeds limit {}",
438                        human(usage),
439                        human(limit)
440                    );
441                    tx.send(()).ok();
442                    purge_all_arenas();
443                } else if !was_ok && is_ok {
444                    // Transition from !OK -> OK
445                    dump_heap_profile();
446                    tracing::error!(
447                        "memory usage {} is back within limit {}",
448                        human(usage),
449                        human(limit)
450                    );
451                    tx.send(()).ok();
452                } else {
453                    if !is_ok {
454                        purge_all_arenas();
455                    }
456                    tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
457                }
458            }
459            Ok((
460                MemoryUsage { bytes: 0 },
461                MemoryLimits {
462                    soft_limit: None,
463                    hard_limit: None,
464                },
465            )) => {
466                // We don't know anything about the memory usage on this
467                // system, just pretend everything is fine
468                HEAD_ROOM.store(1024, Ordering::SeqCst);
469            }
470            Ok(_) => {}
471            Err(err) => tracing::error!("unable to query memory info: {err:#}"),
472        }
473
474        std::thread::sleep(Duration::from_secs(3));
475    }
476}
477
478/// Returns the amount of headroom; the number of bytes that can
479/// be allocated before we hit the soft limit
480pub fn get_headroom() -> usize {
481    HEAD_ROOM.load(Ordering::SeqCst)
482}
483
484/// Returns true when we are within 10% if the soft limit
485pub fn low_memory() -> bool {
486    LOW_MEM.load(Ordering::SeqCst)
487}
488
489/// Indicates the overall memory status
490pub fn memory_status() -> MemoryStatus {
491    if get_headroom() == 0 {
492        MemoryStatus::NoMemory
493    } else if low_memory() {
494        MemoryStatus::LowMemory
495    } else {
496        MemoryStatus::Ok
497    }
498}
499
500#[derive(Copy, Clone, Debug, Eq, PartialEq)]
501pub enum MemoryStatus {
502    Ok,
503    LowMemory,
504    NoMemory,
505}
506
507/// Returns a receiver that will notify when memory status
508/// changes from OK -> !OK or vice versa.
509pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
510    SUBSCRIBER.lock().unwrap().clone()
511}
512
513pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
514    loop {
515        if let Some(rx) = subscribe_to_memory_status_changes() {
516            return rx;
517        }
518        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
519    }
520}
521
522/// Initialize the memory thread to monitor memory usage/limits
523pub fn setup_memory_limit() -> anyhow::Result<()> {
524    let (usage, limit) = get_usage_and_limit()?;
525    tracing::debug!("usage: {usage:?}");
526    tracing::info!("using limits: {limit}");
527
528    std::thread::Builder::new()
529        .name("memory-monitor".to_string())
530        .spawn(memory_thread)?;
531
532    Ok(())
533}
534
535/// Returns a Cgroup for the current process.
536/// Can return either a v2 or a v1 cgroup.
537#[cfg(target_os = "linux")]
538fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
539    let paths = get_cgroups_relative_paths()?;
540    let h: Box<dyn Hierarchy> = if v2 {
541        Box::new(V2::new())
542    } else {
543        Box::new(V1::new())
544    };
545
546    let path = paths
547        .get("")
548        .ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
549
550    let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
551    Ok(cgroup)
552}
553
554#[derive(Copy, Clone)]
555pub struct NumBytes(pub usize);
556
557impl std::fmt::Debug for NumBytes {
558    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
559        write!(
560            fmt,
561            "{} ({})",
562            self.0,
563            humansize::format_size(self.0, humansize::DECIMAL)
564        )
565    }
566}
567
568impl From<usize> for NumBytes {
569    fn from(n: usize) -> Self {
570        Self(n)
571    }
572}
573
574impl From<u64> for NumBytes {
575    fn from(n: u64) -> Self {
576        Self(n as usize)
577    }
578}
579
580#[derive(Copy, Clone)]
581pub struct Number(pub usize);
582
583impl std::fmt::Debug for Number {
584    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
585        use num_format::{Locale, ToFormattedString};
586        write!(
587            fmt,
588            "{} ({})",
589            self.0,
590            self.0.to_formatted_string(&Locale::en)
591        )
592    }
593}
594
595impl From<usize> for Number {
596    fn from(n: usize) -> Self {
597        Self(n)
598    }
599}
600
601impl From<u64> for Number {
602    fn from(n: u64) -> Self {
603        Self(n as usize)
604    }
605}
606
607#[derive(Debug)]
608pub struct JemallocStats {
609    /// stats.allocated` - Total number of bytes allocated by the application.
610    pub allocated: NumBytes,
611
612    /// `stats.active`
613    /// Total number of bytes in active pages allocated by the application. This is a multiple of
614    /// the page size, and greater than or equal to stats.allocated. This does not include
615    /// `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator
616    /// metadata.
617    pub active: NumBytes,
618
619    /// stats.metadata (size_t) r- [--enable-stats]
620    /// Total number of bytes dedicated to metadata, which comprise base allocations used for bootstrap-sensitive allocator metadata structures (see `stats.arenas.<i>.base`) and internal allocations (see `stats.arenas.<i>.internal`). Transparent huge page (enabled with opt.metadata_thp) usage is not considered.
621    pub metadata: NumBytes,
622
623    /// stats.resident (size_t) r- [--enable-stats]
624    /// Maximum number of bytes in physically resident data pages mapped by the allocator, comprising all pages dedicated to allocator metadata, pages backing active allocations, and unused dirty pages. This is a maximum rather than precise because pages may not actually be physically resident if they correspond to demand-zeroed virtual memory that has not yet been touched. This is a multiple of the page size, and is larger than stats.active.
625    pub resident: NumBytes,
626
627    /// stats.mapped (size_t) r- [--enable-stats]
628    /// Total number of bytes in active extents mapped by the allocator. This is larger than stats.active. This does not include inactive extents, even those that contain unused dirty pages, which means that there is no strict ordering between this and stats.resident.
629    pub mapped: NumBytes,
630
631    /// stats.retained (size_t) r- [--enable-stats]
632    /// Total number of bytes in virtual memory mappings that were retained rather than being returned to the operating system via e.g. munmap(2) or similar. Retained virtual memory is typically untouched, decommitted, or purged, so it has no strongly associated physical memory (see extent hooks for details). Retained memory is excluded from mapped memory statistics, e.g. stats.mapped.
633    pub retained: NumBytes,
634}
635
636impl JemallocStats {
637    pub fn collect() -> Self {
638        use tikv_jemalloc_ctl::{epoch, stats};
639
640        // jemalloc stats are cached and don't fully report current
641        // values until an epoch is advanced, so let's explicitly
642        // do that here.
643        epoch::advance().ok();
644
645        Self {
646            allocated: stats::allocated::read().unwrap_or(0).into(),
647            active: stats::active::read().unwrap_or(0).into(),
648            metadata: stats::metadata::read().unwrap_or(0).into(),
649            resident: stats::resident::read().unwrap_or(0).into(),
650            mapped: stats::mapped::read().unwrap_or(0).into(),
651            retained: stats::retained::read().unwrap_or(0).into(),
652        }
653    }
654}