mod_redis/
test.rs

1use crate::{NodeSpec, RedisConnKey, RedisConnection};
2use anyhow::Context;
3use std::process::Stdio;
4use std::time::Duration;
5use tempfile::TempDir;
6use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::time::timeout;
9
10/// A local redis server for executing tests against
11pub struct RedisServer {
12    _daemon: Child,
13    port: u16,
14    _dir: TempDir,
15}
16
17/// Ask the kernel to assign a free port.
18/// We pass this to sshd and tell it to listen on that port.
19/// This is racy, as releasing the socket technically makes
20/// that port available to others using the same technique.
21fn allocate_port() -> u16 {
22    let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind 127.0.0.1:0 failed");
23    listener.local_addr().unwrap().port()
24}
25
26impl RedisServer {
27    pub fn is_available() -> bool {
28        which::which("redis-server").is_ok()
29    }
30
31    pub async fn spawn(extra_config: &str) -> anyhow::Result<Self> {
32        let mut errors = vec![];
33
34        for _ in 0..2 {
35            let port = allocate_port();
36            match timeout(
37                Duration::from_secs(5),
38                Self::spawn_with_port(port, extra_config),
39            )
40            .await?
41            {
42                Ok(me) => return Ok(me),
43                Err(err) => {
44                    errors.push(format!("{err:#}"));
45                }
46            }
47        }
48        anyhow::bail!("failed to spawn redis-server: {}", errors.join(". "));
49    }
50
51    async fn spawn_with_port(port: u16, extra_config: &str) -> anyhow::Result<Self> {
52        let dir = tempfile::tempdir().context("make temp dir")?;
53        let mut daemon = Command::new("redis-server")
54            .args(["-"])
55            .stdin(Stdio::piped())
56            .stderr(Stdio::piped())
57            .stdout(Stdio::piped())
58            .kill_on_drop(true)
59            .spawn()
60            .context("spawning redis-server")?;
61
62        let mut stdout = BufReader::new(daemon.stdout.take().unwrap());
63        let mut stderr = daemon.stderr.take().unwrap();
64
65        tokio::spawn(async move {
66            copy_stream_with_line_prefix("redis stderr", &mut stderr, &mut tokio::io::stderr())
67                .await
68        });
69
70        // Generate configuration
71        if let Some(mut stdin) = daemon.stdin.take() {
72            stdin
73                .write_all(b"bind 127.0.0.1\nlogfile /dev/stdout\nloglevel debug\n")
74                .await?;
75            stdin.write_all(format!("port {port}\n").as_bytes()).await?;
76            stdin
77                .write_all(format!("dir {}\n", dir.path().display()).as_bytes())
78                .await?;
79            stdin
80                .write_all(format!("{extra_config}\n").as_bytes())
81                .await?;
82            drop(stdin);
83        }
84
85        // Wait until the server initializes successfully
86        loop {
87            let mut line = String::new();
88            stdout.read_line(&mut line).await?;
89            if line.is_empty() {
90                anyhow::bail!("Unexpected EOF while reading output from redis-server");
91            }
92            eprintln!("{}", line.trim());
93
94            if line.contains("Server initialized")
95                || line.contains("The server is now ready to accept connections on port")
96            {
97                break;
98            }
99        }
100
101        // Now just pipe the output through to the test harness
102        tokio::spawn(async move {
103            copy_stream_with_line_prefix("redis stdout", &mut stdout, &mut tokio::io::stderr())
104                .await
105        });
106
107        Ok(Self {
108            _daemon: daemon,
109            port,
110            _dir: dir,
111        })
112    }
113
114    pub async fn connection(&self) -> anyhow::Result<RedisConnection> {
115        let key = RedisConnKey {
116            node: NodeSpec::Single(format!("redis://127.0.0.1:{}", self.port)),
117            read_from_replicas: false,
118            username: None,
119            password: None,
120            cluster: None,
121            pool_size: None,
122            connect_timeout: None,
123            recycle_timeout: None,
124            wait_timeout: None,
125            response_timeout: None,
126        };
127        key.open()
128    }
129}
130
131pub struct RedisCluster {
132    primary: RedisServer,
133    secondary: RedisServer,
134    tertiary: RedisServer,
135}
136
137impl RedisCluster {
138    /// Check whether redis is available to run as a cluster.
139    /// We look for redis 7.x and later, because we rely on
140    /// the --cluster-yes option actually working as part of
141    /// our cluster initialization. It doesn't work on redis 5.x
142    /// which is present on rocky8 for example.
143    pub async fn is_available() -> bool {
144        if !RedisServer::is_available() {
145            return false;
146        }
147
148        match Command::new("redis-cli").arg("-v").output().await {
149            Ok(output) => {
150                let stdout = String::from_utf8_lossy(&output.stdout);
151                match stdout.lines().next() {
152                    Some(line) => {
153                        let Some((redis, version)) = line.split_once(" ") else {
154                            return false;
155                        };
156                        if redis == "redis-cli" {
157                            let Some((major, _rest)) = version.split_once(".") else {
158                                return false;
159                            };
160                            let Ok(major) = major.parse::<u32>() else {
161                                return false;
162                            };
163                            major >= 7
164                        } else {
165                            false
166                        }
167                    }
168                    None => false,
169                }
170            }
171            Err(_) => false,
172        }
173    }
174
175    pub async fn spawn() -> anyhow::Result<Self> {
176        let extra_config = "cluster-enabled yes\n";
177        let primary = RedisServer::spawn(extra_config).await?;
178        let secondary = RedisServer::spawn(extra_config).await?;
179        let tertiary = RedisServer::spawn(extra_config).await?;
180
181        let cluster_setup = Command::new("redis-cli")
182            .args([
183                "--cluster",
184                "create",
185                &format!("127.0.0.1:{}", primary.port),
186                &format!("127.0.0.1:{}", secondary.port),
187                &format!("127.0.0.1:{}", tertiary.port),
188                "--cluster-yes",
189            ])
190            .kill_on_drop(true)
191            .output()
192            .await
193            .context("create redis cluster")?;
194
195        if !cluster_setup.stdout.is_empty() {
196            eprintln!(
197                "cluster_setup stdout: {}",
198                String::from_utf8_lossy(&cluster_setup.stdout)
199            );
200        }
201        if !cluster_setup.stderr.is_empty() {
202            eprintln!(
203                "cluster_setup stderr: {}",
204                String::from_utf8_lossy(&cluster_setup.stderr)
205            );
206        }
207
208        Ok(Self {
209            primary,
210            secondary,
211            tertiary,
212        })
213    }
214
215    pub async fn connection(&self) -> anyhow::Result<RedisConnection> {
216        let key = RedisConnKey {
217            node: NodeSpec::Cluster(vec![
218                format!("redis://127.0.0.1:{}", self.primary.port),
219                format!("redis://127.0.0.1:{}", self.secondary.port),
220                format!("redis://127.0.0.1:{}", self.tertiary.port),
221            ]),
222            read_from_replicas: false,
223            username: None,
224            password: None,
225            cluster: None,
226            pool_size: None,
227            connect_timeout: None,
228            recycle_timeout: None,
229            wait_timeout: None,
230            response_timeout: None,
231        };
232        key.open()
233    }
234}
235
236async fn copy_stream_with_line_prefix<SRC, DEST>(
237    prefix: &str,
238    src: SRC,
239    mut dest: DEST,
240) -> std::io::Result<()>
241where
242    SRC: AsyncRead + Unpin,
243    DEST: AsyncWrite + Unpin,
244{
245    let mut src = tokio::io::BufReader::new(src);
246    loop {
247        let mut line = String::new();
248        src.read_line(&mut line).await?;
249        if !line.is_empty() {
250            dest.write_all(format!("{prefix}: {line}").as_bytes())
251                .await?;
252        }
253    }
254}
255
256#[cfg(test)]
257mod test {
258    use super::*;
259
260    #[tokio::test]
261    async fn test_basic_operation() -> anyhow::Result<()> {
262        if !RedisServer::is_available() {
263            return Ok(());
264        }
265        let daemon = RedisServer::spawn("").await?;
266        let connection = daemon.connection().await?;
267
268        let mut cmd = redis::cmd("SET");
269        cmd.arg("my_key").arg(42);
270        connection.query(cmd).await?;
271
272        let mut cmd = redis::cmd("GET");
273        cmd.arg("my_key");
274        let value = connection.query(cmd).await?;
275
276        assert_eq!(value, redis::Value::BulkString(b"42".to_vec()));
277
278        Ok(())
279    }
280
281    #[tokio::test]
282    async fn test_basic_operation_cluster() -> anyhow::Result<()> {
283        if !RedisCluster::is_available().await {
284            return Ok(());
285        }
286        let daemon = RedisCluster::spawn().await?;
287        let connection = daemon.connection().await?;
288
289        let mut cmd = redis::cmd("SET");
290        cmd.arg("my_key").arg(42);
291        connection.query(cmd).await?;
292
293        let mut cmd = redis::cmd("GET");
294        cmd.arg("my_key");
295        let value = connection.query(cmd).await?;
296
297        assert_eq!(value, redis::Value::BulkString(b"42".to_vec()));
298
299        Ok(())
300    }
301}