kumo_server_memory/
tracking.rs

1use crate::{NumBytes, Number};
2use backtrace::Backtrace;
3use parking_lot::Mutex;
4use std::alloc::{GlobalAlloc, Layout};
5use std::cell::Cell;
6use std::collections::HashMap;
7use std::sync::atomic::Ordering::Relaxed;
8use std::sync::atomic::{AtomicBool, AtomicUsize};
9use std::sync::LazyLock;
10
11// Portions of this file are derived from the re_memory crate
12// which is Copyright (c) 2022 Rerun Technologies AB <opensource@rerun.io>
13// and used under the terms of its MIT License
14// <https://github.com/rerun-io/rerun/tree/main/crates/utils/re_memory>
15
16thread_local! {
17    static IN_TRACKER: Cell<bool> = const { Cell::new(false) };
18}
19
20#[derive(Default)]
21pub struct TrackingAllocator<A: GlobalAlloc> {
22    allocator: A,
23}
24
25impl<A: GlobalAlloc> TrackingAllocator<A> {
26    pub const fn new(allocator: A) -> Self {
27        Self { allocator }
28    }
29}
30
31static STATS: Stats = Stats::new();
32
33const SMALL_SIZE: usize = 128;
34const MEDIUM_SIZE: usize = 4 * 1024;
35
36const MEDIUM_RATE: u64 = 64;
37const BIG_RATE: u64 = 1;
38
39static BIG_TRACKER: LazyLock<Mutex<AllocationTracker>> =
40    LazyLock::new(|| Mutex::new(AllocationTracker::default()));
41static MEDIUM_TRACKER: LazyLock<Mutex<AllocationTracker>> =
42    LazyLock::new(|| Mutex::new(AllocationTracker::default()));
43
44// SAFETY: we're passing through the unsafe portions to an underlying
45// allocator, which we're relying on to uphold safety.
46// The additional logic we add here is safe and is merely tracking
47unsafe impl<A: GlobalAlloc> GlobalAlloc for TrackingAllocator<A> {
48    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
49        let ptr = unsafe { self.allocator.alloc(layout) };
50        track_allocation(ptr, layout.size());
51        ptr
52    }
53
54    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
55        let ptr = unsafe { self.allocator.alloc_zeroed(layout) };
56        track_allocation(ptr, layout.size());
57        ptr
58    }
59
60    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
61        unsafe { self.allocator.dealloc(ptr, layout) };
62        track_dealloc(ptr, layout.size());
63    }
64
65    unsafe fn realloc(&self, old_ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
66        track_dealloc(old_ptr, layout.size());
67
68        let new_ptr = unsafe { self.allocator.realloc(old_ptr, layout, new_size) };
69
70        track_allocation(new_ptr, new_size);
71        new_ptr
72    }
73}
74
75fn track_allocation(ptr: *mut u8, size: usize) {
76    STATS.live.add(size);
77
78    if !STATS.track_callstacks.load(Relaxed) {
79        return;
80    }
81
82    if size < SMALL_SIZE {
83        STATS.small.add(size);
84        return;
85    }
86
87    IN_TRACKER.with(|in_track| {
88        if !in_track.get() {
89            in_track.set(true);
90
91            let hash = PtrHash::new(ptr);
92            let track = hash.should_sample_size(size);
93
94            if track {
95                let bt = Backtrace::new_unresolved();
96                if size < MEDIUM_SIZE {
97                    STATS.medium.add(size);
98                    MEDIUM_TRACKER.lock().track_allocation(hash, size, bt);
99                } else {
100                    STATS.large.add(size);
101                    BIG_TRACKER.lock().track_allocation(hash, size, bt);
102                }
103            }
104
105            in_track.set(false);
106        }
107    });
108}
109
110fn track_dealloc(ptr: *mut u8, size: usize) {
111    STATS.live.sub(size);
112
113    if !STATS.track_callstacks.load(Relaxed) {
114        return;
115    }
116
117    if size < SMALL_SIZE {
118        STATS.small.sub(size);
119        return;
120    }
121
122    IN_TRACKER.with(|in_track| {
123        if !in_track.get() {
124            in_track.set(true);
125
126            let hash = PtrHash::new(ptr);
127            let track = hash.should_sample_size(size);
128
129            if track {
130                if size < MEDIUM_SIZE {
131                    MEDIUM_TRACKER.lock().track_dealloc(hash, size);
132                    STATS.medium.sub(size);
133                } else {
134                    STATS.large.sub(size);
135                    BIG_TRACKER.lock().track_dealloc(hash, size);
136                }
137            }
138
139            in_track.set(false);
140        }
141    });
142}
143
144/// Returns the stochastic sampling rate (really, an interval)
145/// that should be used for a given allocation size.
146fn stochastic_rate_by_size(size: usize) -> u64 {
147    if size < MEDIUM_SIZE {
148        MEDIUM_RATE
149    } else {
150        BIG_RATE
151    }
152}
153
154/// Given a pointer address, hash it into a 64-bit hash value.
155/// The hash re-distributes the bits which is important for
156/// the stochastic sampling approach used in this module.
157#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
158struct PtrHash(u64);
159
160impl PtrHash {
161    #[inline]
162    pub fn new(ptr: *mut u8) -> Self {
163        Self(ahash::RandomState::with_seeds(1, 2, 3, 4).hash_one(ptr))
164    }
165
166    /// Given an allocation size, returns true if we should sample
167    /// the associated allocation call stack based on the stochastic
168    /// rate configured for that allocation size.
169    pub fn should_sample_size(&self, size: usize) -> bool {
170        let rate = stochastic_rate_by_size(size);
171        self.should_sample_at_rate(rate)
172    }
173
174    /// Apply "stochastic sampling" at a specified "rate".
175    /// The rate is nominally a sampling interval.
176    /// The redistribution of the address bits by the hash
177    /// "randomizes" the bits and the rate/interval is used
178    /// as a mask
179    pub fn should_sample_at_rate(&self, rate: u64) -> bool {
180        self.0 & (rate - 1) == 0
181    }
182}
183
184struct CallstackEntry {
185    size: usize,
186    bt: Backtrace,
187}
188
189pub struct CallstackStats {
190    pub count: usize,
191    pub total_size: usize,
192    pub bt: Backtrace,
193    pub stochastic_rate: usize,
194}
195
196#[derive(Default)]
197struct AllocationTracker {
198    live_allocations: ahash::HashMap<PtrHash, CallstackEntry>,
199}
200
201impl AllocationTracker {
202    pub fn track_allocation(&mut self, ptr: PtrHash, size: usize, bt: Backtrace) {
203        self.live_allocations
204            .insert(ptr, CallstackEntry { size, bt });
205    }
206
207    pub fn track_dealloc(&mut self, ptr: PtrHash, _size: usize) {
208        self.live_allocations.remove(&ptr);
209    }
210
211    pub fn top_callstacks(&self, max_stacks: usize) -> Vec<CallstackStats> {
212        let mut by_stack = HashMap::new();
213
214        for alloc in self.live_allocations.values() {
215            let key = alloc.bt.frames().iter().map(|f| f.ip()).collect::<Vec<_>>();
216            let entry = by_stack.entry(key).or_insert_with(|| CallstackStats {
217                count: 0,
218                total_size: 0,
219                bt: alloc.bt.clone(),
220                stochastic_rate: stochastic_rate_by_size(alloc.size) as usize,
221            });
222
223            entry.count += 1;
224            entry.total_size += alloc.size;
225        }
226
227        let mut stats = by_stack.into_values().collect::<Vec<_>>();
228        stats.sort_by(|a, b| b.total_size.cmp(&a.total_size));
229        stats.truncate(max_stacks);
230        stats.shrink_to_fit();
231        stats
232    }
233}
234
235struct AtomicCountAndSize {
236    /// Number of allocations.
237    pub count: AtomicUsize,
238
239    /// Number of bytes.
240    pub size: AtomicUsize,
241}
242
243impl AtomicCountAndSize {
244    pub const fn zero() -> Self {
245        Self {
246            count: AtomicUsize::new(0),
247            size: AtomicUsize::new(0),
248        }
249    }
250
251    fn load(&self) -> CountAndSize {
252        CountAndSize {
253            count: self.count.load(Relaxed).into(),
254            size: self.size.load(Relaxed).into(),
255        }
256    }
257
258    /// Add an allocation.
259    fn add(&self, size: usize) {
260        self.count.fetch_add(1, Relaxed);
261        self.size.fetch_add(size, Relaxed);
262    }
263
264    /// Remove an allocation.
265    fn sub(&self, size: usize) {
266        self.count.fetch_sub(1, Relaxed);
267        self.size.fetch_sub(size, Relaxed);
268    }
269}
270
271#[derive(Debug, Clone, Copy)]
272pub struct CountAndSize {
273    pub count: Number,
274    pub size: NumBytes,
275}
276
277struct Stats {
278    live: AtomicCountAndSize,
279    track_callstacks: AtomicBool,
280    small: AtomicCountAndSize,
281    medium: AtomicCountAndSize,
282    large: AtomicCountAndSize,
283}
284
285impl Stats {
286    const fn new() -> Self {
287        Self {
288            live: AtomicCountAndSize::zero(),
289            small: AtomicCountAndSize::zero(),
290            medium: AtomicCountAndSize::zero(),
291            large: AtomicCountAndSize::zero(),
292            track_callstacks: AtomicBool::new(false),
293        }
294    }
295}
296
297/// Number of bytes allocated via the global allocator.
298/// Not all of these may be resident; the RSS value will
299/// typically be different from this value.
300pub fn counted_usage() -> usize {
301    STATS.live.size.load(Relaxed)
302}
303
304pub fn set_tracking_callstacks(enable: bool) {
305    STATS.track_callstacks.store(enable, Relaxed);
306}
307
308pub struct TrackingStats {
309    pub small_threshold: NumBytes,
310    pub live: CountAndSize,
311    pub top_callstacks: Vec<CallstackStats>,
312}
313
314pub fn tracking_stats() -> TrackingStats {
315    const MAX_STACKS: usize = 128;
316
317    let mut top_callstacks = vec![];
318
319    IN_TRACKER.with(|in_track| {
320        if !in_track.get() {
321            in_track.set(true);
322            top_callstacks = BIG_TRACKER.lock().top_callstacks(MAX_STACKS);
323            top_callstacks.append(&mut MEDIUM_TRACKER.lock().top_callstacks(MAX_STACKS));
324
325            // Resolve symbols while we are in_track so that the allocations
326            // made by this don't "pollute" the overall set of callstacks
327            for stack in &mut top_callstacks {
328                stack.bt.resolve();
329            }
330
331            in_track.set(false);
332        }
333    });
334
335    TrackingStats {
336        small_threshold: SMALL_SIZE.into(),
337        live: STATS.live.load(),
338        top_callstacks,
339    }
340}