config/
pool.rs

1use crate::{get_current_epoch, LuaConfig, LuaConfigInner};
2use kumo_prometheus::declare_metric;
3use parking_lot::FairMutex as Mutex;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::LazyLock;
7use std::time::Duration;
8
9static POOL: LazyLock<Mutex<Pool>> = LazyLock::new(|| Mutex::new(Pool::new()));
10
11declare_metric! {
12/// the number of lua contexts available for reuse in the pool
13static LUA_SPARE_COUNT: IntGauge("lua_spare_count");
14}
15
16/// Maximum age of a lua context before we release it, in seconds
17static MAX_AGE: AtomicUsize = AtomicUsize::new(300);
18/// Maximum number of uses of a given lua context before we release it
19static MAX_USE: AtomicUsize = AtomicUsize::new(1024);
20/// Maximum number of spare lua contexts to maintain in the pool
21static MAX_SPARE: AtomicUsize = AtomicUsize::new(8192);
22static GC_ON_PUT: AtomicUsize = AtomicUsize::new(0);
23
24pub fn set_max_use(max_use: usize) {
25    MAX_USE.store(max_use, Ordering::Relaxed);
26}
27
28pub fn set_max_spare(max_spare: usize) {
29    MAX_SPARE.store(max_spare, Ordering::Relaxed);
30}
31
32pub fn set_max_age(max_age: usize) {
33    MAX_AGE.store(max_age, Ordering::Relaxed);
34}
35
36/// Set the gc on put percentage chance, in the range 0-100
37pub fn set_gc_on_put(v: u8) {
38    GC_ON_PUT.store(v as usize, Ordering::Relaxed);
39}
40
41#[derive(Default)]
42pub(crate) struct Pool {
43    pool: VecDeque<LuaConfigInner>,
44}
45
46impl Pool {
47    pub fn new() -> Self {
48        std::thread::Builder::new()
49            .name("config idler".to_string())
50            .spawn(|| loop {
51                std::thread::sleep(Duration::from_secs(30));
52                POOL.lock().expire();
53            })
54            .expect("create config idler thread");
55        Self::default()
56    }
57
58    pub fn expire(&mut self) {
59        let len_before = self.pool.len();
60        let epoch = get_current_epoch();
61        let max_age = Duration::from_secs(MAX_AGE.load(Ordering::Relaxed) as u64);
62        self.pool
63            .retain(|inner| inner.created.elapsed() < max_age && inner.epoch == epoch);
64        let len_after = self.pool.len();
65        let diff = len_before - len_after;
66        if diff > 0 {
67            LUA_SPARE_COUNT.sub(diff as i64);
68        }
69    }
70
71    pub fn get(&mut self) -> Option<LuaConfigInner> {
72        let max_age = Duration::from_secs(MAX_AGE.load(Ordering::Relaxed) as u64);
73        loop {
74            let mut item = self.pool.pop_front()?;
75            LUA_SPARE_COUNT.dec();
76            if item.created.elapsed() > max_age || item.epoch != get_current_epoch() {
77                continue;
78            }
79            item.use_count += 1;
80            return Some(item);
81        }
82    }
83
84    pub fn put(&mut self, config: LuaConfigInner) {
85        let epoch = get_current_epoch();
86        if config.epoch != epoch {
87            // Stale; let it drop
88            return;
89        }
90        if self.pool.len() + 1 > MAX_SPARE.load(Ordering::Relaxed) {
91            return;
92        }
93        if config.created.elapsed() > Duration::from_secs(MAX_AGE.load(Ordering::Relaxed) as u64)
94            || config.use_count + 1 > MAX_USE.load(Ordering::Relaxed)
95        {
96            return;
97        }
98        let prob = GC_ON_PUT.load(Ordering::Relaxed);
99        if prob != 0 {
100            let chance = (rand::random::<f32>() * 100.) as usize;
101            if chance <= prob {
102                if let Err(err) = config.lua.gc_collect() {
103                    tracing::error!("Error during gc: {err:#}");
104                    return;
105                }
106            }
107        }
108
109        self.pool.push_back(config);
110        LUA_SPARE_COUNT.inc();
111    }
112}
113
114pub(crate) fn pool_get() -> Option<LuaConfig> {
115    POOL.lock()
116        .get()
117        .map(|inner| LuaConfig { inner: Some(inner) })
118}
119
120pub(crate) fn pool_put(config: LuaConfigInner) {
121    POOL.lock().put(config);
122}