use crate::tracking::TrackingAllocator;
#[cfg(target_os = "linux")]
use cgroups_rs::cgroup::{get_cgroups_relative_paths, Cgroup, UNIFIED_MOUNTPOINT};
#[cfg(target_os = "linux")]
use cgroups_rs::hierarchies::{V1, V2};
#[cfg(target_os = "linux")]
use cgroups_rs::memory::MemController;
#[cfg(target_os = "linux")]
use cgroups_rs::{Hierarchy, MaxValue};
use nix::sys::resource::{rlim_t, RLIM_INFINITY};
use nix::unistd::{sysconf, SysconfVar};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{LazyLock, Mutex};
use std::time::Duration;
use tikv_jemallocator::Jemalloc;
use tokio::sync::watch::Receiver;
pub mod tracking;
pub use tracking::{set_tracking_callstacks, tracking_stats};
#[global_allocator]
static GLOBAL: TrackingAllocator<Jemalloc> = TrackingAllocator::new(Jemalloc);
static OVER_LIMIT_COUNT: LazyLock<metrics::Counter> = LazyLock::new(|| {
metrics::describe_counter!(
"memory_over_limit_count",
"how many times the soft memory limit was exceeded"
);
metrics::counter!("memory_over_limit_count")
});
static MEM_USAGE: LazyLock<metrics::Gauge> = LazyLock::new(|| {
metrics::describe_gauge!(
"memory_usage",
"number of bytes of used memory (Resident Set Size)"
);
metrics::gauge!("memory_usage")
});
static MEM_LIMIT: LazyLock<metrics::Gauge> = LazyLock::new(|| {
metrics::describe_gauge!("memory_limit", "soft memory limit measured in bytes");
metrics::gauge!("memory_limit")
});
static MEM_COUNTED: LazyLock<metrics::Gauge> = LazyLock::new(|| {
metrics::describe_gauge!(
"memory_usage_rust",
"number of bytes of used memory (allocated by Rust)"
);
metrics::gauge!("memory_usage_rust")
});
static SUBSCRIBER: LazyLock<Mutex<Option<Receiver<()>>>> = LazyLock::new(|| Mutex::new(None));
static OVER_LIMIT: AtomicBool = AtomicBool::new(false);
static LOW_MEM: AtomicBool = AtomicBool::new(false);
static HEAD_ROOM: AtomicUsize = AtomicUsize::new(u32::MAX as usize);
#[derive(Debug, Clone, Copy)]
pub struct MemoryUsage {
pub bytes: u64,
}
impl std::fmt::Display for MemoryUsage {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "{}", human(self.bytes))
}
}
impl MemoryUsage {
pub fn get() -> anyhow::Result<Self> {
#[cfg(target_os = "linux")]
{
if let Ok(v2) = Self::get_cgroup(true) {
return Ok(v2);
}
if let Ok(v1) = Self::get_cgroup(false) {
return Ok(v1);
}
}
Self::get_linux_statm()
}
#[cfg(target_os = "linux")]
fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
let cgroup = get_my_cgroup(v2)?;
let mem: &MemController = cgroup
.controller_of()
.ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
let stat = mem.memory_stat();
Ok(Self {
bytes: stat.usage_in_bytes,
})
}
pub fn get_linux_statm() -> anyhow::Result<Self> {
let data = std::fs::read_to_string("/proc/self/statm")?;
let fields: Vec<&str> = data.split(' ').collect();
let rss: u64 = fields[1].parse()?;
Ok(Self {
bytes: rss * sysconf(SysconfVar::PAGE_SIZE)?.unwrap_or(4 * 1024) as u64,
})
}
}
fn human(n: u64) -> String {
humansize::format_size(n, humansize::DECIMAL)
}
#[derive(Debug, Clone, Copy)]
pub struct MemoryLimits {
pub soft_limit: Option<u64>,
pub hard_limit: Option<u64>,
}
impl std::fmt::Display for MemoryLimits {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
let soft = self.soft_limit.map(human);
let hard = self.hard_limit.map(human);
write!(fmt, "soft={soft:?}, hard={hard:?}")
}
}
impl MemoryLimits {
pub fn min(self, other: Self) -> Self {
Self {
soft_limit: min_opt_limit(self.soft_limit, other.soft_limit),
hard_limit: min_opt_limit(self.hard_limit, other.hard_limit),
}
}
pub fn is_unlimited(&self) -> bool {
self.soft_limit.is_none() && self.hard_limit.is_none()
}
}
fn rlim_to_opt(rlim: rlim_t) -> Option<u64> {
if rlim == RLIM_INFINITY {
None
} else {
Some(rlim)
}
}
#[cfg(target_os = "linux")]
fn max_value_to_opt(value: Option<MaxValue>) -> anyhow::Result<Option<u64>> {
Ok(match value {
None | Some(MaxValue::Max) => None,
Some(MaxValue::Value(n)) if n >= 0 => Some(n as u64),
Some(MaxValue::Value(n)) => anyhow::bail!("unexpected negative limit {n}"),
})
}
fn min_opt_limit(a: Option<u64>, b: Option<u64>) -> Option<u64> {
match (a, b) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) | (None, Some(a)) => Some(a),
(None, None) => None,
}
}
impl MemoryLimits {
pub fn get_rlimits() -> anyhow::Result<Self> {
#[cfg(not(target_os = "macos"))]
let (rss_soft, rss_hard) =
nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_RSS)?;
#[cfg(target_os = "macos")]
let (rss_soft, rss_hard) = (RLIM_INFINITY, RLIM_INFINITY);
let soft_limit = rlim_to_opt(rss_soft);
let hard_limit = rlim_to_opt(rss_hard);
Ok(Self {
soft_limit,
hard_limit,
})
}
#[cfg(target_os = "linux")]
fn get_any_cgroup() -> anyhow::Result<Self> {
if let Ok(cg) = Self::get_cgroup(true) {
return Ok(cg);
}
Self::get_cgroup(false)
}
#[cfg(target_os = "linux")]
pub fn get_cgroup(v2: bool) -> anyhow::Result<Self> {
let cgroup = get_my_cgroup(v2)?;
let mem: &MemController = cgroup
.controller_of()
.ok_or_else(|| anyhow::anyhow!("no memory controller?"))?;
let limits = mem.get_mem()?;
Ok(Self {
soft_limit: max_value_to_opt(limits.high)?,
hard_limit: max_value_to_opt(limits.max)?,
})
}
}
#[cfg(target_os = "linux")]
fn get_physical_memory() -> anyhow::Result<u64> {
let data = std::fs::read_to_string("/proc/meminfo")?;
for line in data.lines() {
if line.starts_with("MemTotal:") {
let mut iter = line.rsplit(' ');
let unit = iter
.next()
.ok_or_else(|| anyhow::anyhow!("expected unit"))?;
if unit != "kB" {
anyhow::bail!("unsupported /proc/meminfo unit {unit}");
}
let value = iter
.next()
.ok_or_else(|| anyhow::anyhow!("expected value"))?;
let value: u64 = value.parse()?;
return Ok(value * 1024);
}
}
anyhow::bail!("MemTotal not found in /proc/meminfo");
}
#[cfg(target_os = "linux")]
pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
let mut limit = MemoryLimits::get_rlimits()?;
let mut usage = MemoryUsage::get_linux_statm()?;
if let Ok(cg_lim) = MemoryLimits::get_any_cgroup() {
if !cg_lim.is_unlimited() {
limit = limit.min(cg_lim);
usage = MemoryUsage::get()?;
}
}
let phys = get_physical_memory()?;
if limit.hard_limit.is_none() {
limit.hard_limit.replace(phys);
}
if limit.soft_limit.is_none() {
limit.soft_limit = limit.hard_limit.map(|lim| lim * 3 / 4);
}
Ok((usage, limit))
}
#[cfg(not(target_os = "linux"))]
pub fn get_usage_and_limit() -> anyhow::Result<(MemoryUsage, MemoryLimits)> {
Ok((
MemoryUsage { bytes: 0 },
MemoryLimits {
soft_limit: None,
hard_limit: None,
},
))
}
pub fn purge_thread_cache() {
unsafe {
tikv_jemalloc_sys::mallctl(
b"thread.tcache.flush\0".as_ptr() as *const _,
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
);
}
}
fn purge_all_arenas() {
unsafe {
tikv_jemalloc_sys::mallctl(
b"arena.4096.purge\0".as_ptr() as *const _,
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
);
}
}
fn dump_heap_profile() {
unsafe {
tikv_jemalloc_sys::mallctl(
b"prof.dump\0".as_ptr() as *const _,
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
);
}
}
fn memory_thread() {
let mut is_ok = true;
let (tx, rx) = tokio::sync::watch::channel(());
SUBSCRIBER.lock().unwrap().replace(rx);
loop {
MEM_COUNTED.set(crate::tracking::counted_usage() as f64);
match get_usage_and_limit() {
Ok((
MemoryUsage { bytes: usage },
MemoryLimits {
soft_limit: Some(limit),
hard_limit: _,
},
)) => {
let was_ok = is_ok;
is_ok = usage < limit;
OVER_LIMIT.store(is_ok, Ordering::SeqCst);
HEAD_ROOM.store(limit.saturating_sub(usage) as usize, Ordering::SeqCst);
MEM_USAGE.set(usage as f64);
MEM_LIMIT.set(limit as f64);
let low_thresh = limit * 8 / 10;
LOW_MEM.store(usage > low_thresh, Ordering::SeqCst);
if !is_ok && was_ok {
dump_heap_profile();
OVER_LIMIT_COUNT.increment(1);
tracing::error!(
"memory usage {} exceeds limit {}",
human(usage),
human(limit)
);
tx.send(()).ok();
purge_all_arenas();
} else if !was_ok && is_ok {
dump_heap_profile();
tracing::error!(
"memory usage {} is back within limit {}",
human(usage),
human(limit)
);
tx.send(()).ok();
} else {
if !is_ok {
purge_all_arenas();
}
tracing::debug!("memory usage {}, limit {}", human(usage), human(limit));
}
}
Ok((
MemoryUsage { bytes: 0 },
MemoryLimits {
soft_limit: None,
hard_limit: None,
},
)) => {
HEAD_ROOM.store(1024, Ordering::SeqCst);
}
Ok(_) => {}
Err(err) => tracing::error!("unable to query memory info: {err:#}"),
}
std::thread::sleep(Duration::from_secs(3));
}
}
pub fn get_headroom() -> usize {
HEAD_ROOM.load(Ordering::SeqCst)
}
pub fn low_memory() -> bool {
LOW_MEM.load(Ordering::SeqCst)
}
pub fn subscribe_to_memory_status_changes() -> Option<Receiver<()>> {
SUBSCRIBER.lock().unwrap().clone()
}
pub async fn subscribe_to_memory_status_changes_async() -> Receiver<()> {
loop {
if let Some(rx) = subscribe_to_memory_status_changes() {
return rx;
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
}
pub fn setup_memory_limit() -> anyhow::Result<()> {
let (usage, limit) = get_usage_and_limit()?;
tracing::debug!("usage: {usage:?}");
tracing::info!("using limits: {limit}");
std::thread::Builder::new()
.name("memory-monitor".to_string())
.spawn(memory_thread)?;
Ok(())
}
#[cfg(target_os = "linux")]
fn get_my_cgroup(v2: bool) -> anyhow::Result<Cgroup> {
let paths = get_cgroups_relative_paths()?;
let h: Box<dyn Hierarchy> = if v2 {
Box::new(V2::new())
} else {
Box::new(V1::new())
};
let path = paths
.get("")
.ok_or_else(|| anyhow::anyhow!("couldn't resolve path"))?;
let cgroup = Cgroup::load(h, format!("{}/{}", UNIFIED_MOUNTPOINT, path));
Ok(cgroup)
}
#[derive(Copy, Clone)]
pub struct NumBytes(pub usize);
impl std::fmt::Debug for NumBytes {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"{} ({})",
self.0,
humansize::format_size(self.0, humansize::DECIMAL)
)
}
}
impl From<usize> for NumBytes {
fn from(n: usize) -> Self {
Self(n)
}
}
impl From<u64> for NumBytes {
fn from(n: u64) -> Self {
Self(n as usize)
}
}
#[derive(Copy, Clone)]
pub struct Number(pub usize);
impl std::fmt::Debug for Number {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
use num_format::{Locale, ToFormattedString};
write!(
fmt,
"{} ({})",
self.0,
self.0.to_formatted_string(&Locale::en)
)
}
}
impl From<usize> for Number {
fn from(n: usize) -> Self {
Self(n)
}
}
impl From<u64> for Number {
fn from(n: u64) -> Self {
Self(n as usize)
}
}
#[derive(Debug)]
pub struct JemallocStats {
pub allocated: NumBytes,
pub active: NumBytes,
pub metadata: NumBytes,
pub resident: NumBytes,
pub mapped: NumBytes,
pub retained: NumBytes,
}
impl JemallocStats {
pub fn collect() -> Self {
use tikv_jemalloc_ctl::{epoch, stats};
epoch::advance().ok();
Self {
allocated: stats::allocated::read().unwrap_or(0).into(),
active: stats::active::read().unwrap_or(0).into(),
metadata: stats::metadata::read().unwrap_or(0).into(),
resident: stats::resident::read().unwrap_or(0).into(),
mapped: stats::mapped::read().unwrap_or(0).into(),
retained: stats::retained::read().unwrap_or(0).into(),
}
}
}