1use crate::{NodeSpec, RedisConnKey, RedisConnection};
2use anyhow::Context;
3use std::process::Stdio;
4use std::time::Duration;
5use tempfile::TempDir;
6use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::time::timeout;
9
10pub struct RedisServer {
12 _daemon: Child,
13 port: u16,
14 _dir: TempDir,
15}
16
17fn allocate_port() -> u16 {
22 let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind 127.0.0.1:0 failed");
23 listener.local_addr().unwrap().port()
24}
25
26impl RedisServer {
27 pub fn is_available() -> bool {
28 which::which("redis-server").is_ok()
29 }
30
31 pub async fn spawn(extra_config: &str) -> anyhow::Result<Self> {
32 let mut errors = vec![];
33
34 for _ in 0..2 {
35 let port = allocate_port();
36 match timeout(
37 Duration::from_secs(5),
38 Self::spawn_with_port(port, extra_config),
39 )
40 .await?
41 {
42 Ok(me) => return Ok(me),
43 Err(err) => {
44 errors.push(format!("{err:#}"));
45 }
46 }
47 }
48 anyhow::bail!("failed to spawn redis-server: {}", errors.join(". "));
49 }
50
51 async fn spawn_with_port(port: u16, extra_config: &str) -> anyhow::Result<Self> {
52 let dir = tempfile::tempdir().context("make temp dir")?;
53 let mut daemon = Command::new("redis-server")
54 .args(["-"])
55 .stdin(Stdio::piped())
56 .stderr(Stdio::piped())
57 .stdout(Stdio::piped())
58 .kill_on_drop(true)
59 .spawn()
60 .context("spawning redis-server")?;
61
62 let mut stdout = BufReader::new(daemon.stdout.take().unwrap());
63 let mut stderr = daemon.stderr.take().unwrap();
64
65 tokio::spawn(async move {
66 copy_stream_with_line_prefix("redis stderr", &mut stderr, &mut tokio::io::stderr())
67 .await
68 });
69
70 if let Some(mut stdin) = daemon.stdin.take() {
72 stdin
73 .write_all(b"bind 127.0.0.1\nlogfile /dev/stdout\nloglevel debug\n")
74 .await?;
75 stdin.write_all(format!("port {port}\n").as_bytes()).await?;
76 stdin
77 .write_all(format!("dir {}\n", dir.path().display()).as_bytes())
78 .await?;
79 stdin
80 .write_all(format!("{extra_config}\n").as_bytes())
81 .await?;
82 drop(stdin);
83 }
84
85 loop {
87 let mut line = String::new();
88 stdout.read_line(&mut line).await?;
89 if line.is_empty() {
90 anyhow::bail!("Unexpected EOF while reading output from redis-server");
91 }
92 eprintln!("{}", line.trim());
93
94 if line.contains("Server initialized")
95 || line.contains("The server is now ready to accept connections on port")
96 {
97 break;
98 }
99 }
100
101 tokio::spawn(async move {
103 copy_stream_with_line_prefix("redis stdout", &mut stdout, &mut tokio::io::stderr())
104 .await
105 });
106
107 Ok(Self {
108 _daemon: daemon,
109 port,
110 _dir: dir,
111 })
112 }
113
114 pub async fn connection(&self) -> anyhow::Result<RedisConnection> {
115 let key = RedisConnKey {
116 node: NodeSpec::Single(format!("redis://127.0.0.1:{}", self.port)),
117 read_from_replicas: false,
118 username: None,
119 password: None,
120 cluster: None,
121 pool_size: None,
122 connect_timeout: None,
123 recycle_timeout: None,
124 wait_timeout: None,
125 response_timeout: None,
126 };
127 key.open()
128 }
129}
130
131pub struct RedisCluster {
132 primary: RedisServer,
133 secondary: RedisServer,
134 tertiary: RedisServer,
135}
136
137impl RedisCluster {
138 pub async fn is_available() -> bool {
144 if !RedisServer::is_available() {
145 return false;
146 }
147
148 match Command::new("redis-cli").arg("-v").output().await {
149 Ok(output) => {
150 let stdout = String::from_utf8_lossy(&output.stdout);
151 match stdout.lines().next() {
152 Some(line) => {
153 let Some((redis, version)) = line.split_once(" ") else {
154 return false;
155 };
156 if redis == "redis-cli" {
157 let Some((major, _rest)) = version.split_once(".") else {
158 return false;
159 };
160 let Ok(major) = major.parse::<u32>() else {
161 return false;
162 };
163 major >= 7
164 } else {
165 false
166 }
167 }
168 None => false,
169 }
170 }
171 Err(_) => false,
172 }
173 }
174
175 pub async fn spawn() -> anyhow::Result<Self> {
176 let extra_config = "cluster-enabled yes\n";
177 let primary = RedisServer::spawn(extra_config).await?;
178 let secondary = RedisServer::spawn(extra_config).await?;
179 let tertiary = RedisServer::spawn(extra_config).await?;
180
181 let cluster_setup = Command::new("redis-cli")
182 .args([
183 "--cluster",
184 "create",
185 &format!("127.0.0.1:{}", primary.port),
186 &format!("127.0.0.1:{}", secondary.port),
187 &format!("127.0.0.1:{}", tertiary.port),
188 "--cluster-yes",
189 ])
190 .kill_on_drop(true)
191 .output()
192 .await
193 .context("create redis cluster")?;
194
195 if !cluster_setup.stdout.is_empty() {
196 eprintln!(
197 "cluster_setup stdout: {}",
198 String::from_utf8_lossy(&cluster_setup.stdout)
199 );
200 }
201 if !cluster_setup.stderr.is_empty() {
202 eprintln!(
203 "cluster_setup stderr: {}",
204 String::from_utf8_lossy(&cluster_setup.stderr)
205 );
206 }
207
208 Ok(Self {
209 primary,
210 secondary,
211 tertiary,
212 })
213 }
214
215 pub async fn connection(&self) -> anyhow::Result<RedisConnection> {
216 let key = RedisConnKey {
217 node: NodeSpec::Cluster(vec![
218 format!("redis://127.0.0.1:{}", self.primary.port),
219 format!("redis://127.0.0.1:{}", self.secondary.port),
220 format!("redis://127.0.0.1:{}", self.tertiary.port),
221 ]),
222 read_from_replicas: false,
223 username: None,
224 password: None,
225 cluster: None,
226 pool_size: None,
227 connect_timeout: None,
228 recycle_timeout: None,
229 wait_timeout: None,
230 response_timeout: None,
231 };
232 key.open()
233 }
234}
235
236async fn copy_stream_with_line_prefix<SRC, DEST>(
237 prefix: &str,
238 src: SRC,
239 mut dest: DEST,
240) -> std::io::Result<()>
241where
242 SRC: AsyncRead + Unpin,
243 DEST: AsyncWrite + Unpin,
244{
245 let mut src = tokio::io::BufReader::new(src);
246 loop {
247 let mut line = String::new();
248 src.read_line(&mut line).await?;
249 if !line.is_empty() {
250 dest.write_all(format!("{prefix}: {line}").as_bytes())
251 .await?;
252 }
253 }
254}
255
256#[cfg(test)]
257mod test {
258 use super::*;
259
260 #[tokio::test]
261 async fn test_basic_operation() -> anyhow::Result<()> {
262 if !RedisServer::is_available() {
263 return Ok(());
264 }
265 let daemon = RedisServer::spawn("").await?;
266 let connection = daemon.connection().await?;
267
268 let mut cmd = redis::cmd("SET");
269 cmd.arg("my_key").arg(42);
270 connection.query(cmd).await?;
271
272 let mut cmd = redis::cmd("GET");
273 cmd.arg("my_key");
274 let value = connection.query(cmd).await?;
275
276 assert_eq!(value, redis::Value::BulkString(b"42".to_vec()));
277
278 Ok(())
279 }
280
281 #[tokio::test]
282 async fn test_basic_operation_cluster() -> anyhow::Result<()> {
283 if !RedisCluster::is_available().await {
284 return Ok(());
285 }
286 let daemon = RedisCluster::spawn().await?;
287 let connection = daemon.connection().await?;
288
289 let mut cmd = redis::cmd("SET");
290 cmd.arg("my_key").arg(42);
291 connection.query(cmd).await?;
292
293 let mut cmd = redis::cmd("GET");
294 cmd.arg("my_key");
295 let value = connection.query(cmd).await?;
296
297 assert_eq!(value, redis::Value::BulkString(b"42".to_vec()));
298
299 Ok(())
300 }
301}