3
3
4
4
use std:: convert:: identity;
5
5
use std:: env;
6
- use std:: io:: Read ;
7
6
use std:: process;
8
7
use std:: thread:: sleep;
9
8
use std:: time:: Duration ;
@@ -125,20 +124,17 @@ impl RedisCluster {
125
124
num_replicas : replicas,
126
125
modules,
127
126
mtls_enabled,
128
- mut ports,
127
+ ports,
129
128
} = configuration;
130
129
131
- if ports. is_empty ( ) {
132
- // We use a hashset in order to be sure that we have the right number
133
- // of unique ports.
134
- let mut hash = std:: collections:: HashSet :: new ( ) ;
135
- while hash. len ( ) < nodes as usize {
136
- hash. insert ( get_random_available_port ( ) ) ;
137
- }
138
- ports = hash. into_iter ( ) . collect ( ) ;
139
- }
130
+ let optional_ports = if ports. is_empty ( ) {
131
+ vec ! [ None ; nodes as usize ]
132
+ } else {
133
+ assert ! ( ports. len( ) == nodes as usize ) ;
134
+ ports. into_iter ( ) . map ( Some ) . collect ( )
135
+ } ;
136
+ let mut chosen_ports = std:: collections:: HashSet :: new ( ) ;
140
137
141
- let mut servers = vec ! [ ] ;
142
138
let mut folders = vec ! [ ] ;
143
139
let mut addrs = vec ! [ ] ;
144
140
let mut tls_paths = None ;
@@ -159,8 +155,8 @@ impl RedisCluster {
159
155
160
156
let max_attempts = 5 ;
161
157
162
- for port in ports {
163
- servers . push ( RedisServer :: new_with_addr_tls_modules_and_spawner (
158
+ let mut make_server = |port| {
159
+ RedisServer :: new_with_addr_tls_modules_and_spawner (
164
160
ClusterType :: build_addr ( port) ,
165
161
None ,
166
162
tls_paths. clone ( ) ,
@@ -194,65 +190,74 @@ impl RedisCluster {
194
190
cmd. arg ( "--tls-replication" ) . arg ( "yes" ) ;
195
191
}
196
192
}
197
- let addr = format ! ( "127.0.0.1:{port}" ) ;
198
193
cmd. current_dir ( tempdir. path ( ) ) ;
199
194
folders. push ( tempdir) ;
200
- addrs. push ( addr. clone ( ) ) ;
195
+ cmd. spawn ( ) . unwrap ( )
196
+ } ,
197
+ )
198
+ } ;
201
199
200
+ let verify_server = |server : & mut RedisServer | {
201
+ let process = & mut server. process ;
202
+ match process. try_wait ( ) {
203
+ Ok ( Some ( status) ) => {
204
+ let log_file_contents = server. log_file_contents ( ) ;
205
+ let err =
206
+ format ! ( "redis server creation failed with status {status:?}.\n log file: {log_file_contents}" ) ;
207
+ Err ( err)
208
+ }
209
+ Ok ( None ) => {
210
+ // wait for 10 seconds for the server to be available.
211
+ let max_attempts = 200 ;
202
212
let mut cur_attempts = 0 ;
203
213
loop {
204
- let mut process = cmd. spawn ( ) . unwrap ( ) ;
214
+ if cur_attempts == max_attempts {
215
+ let log_file_contents = server. log_file_contents ( ) ;
216
+ break Err ( format ! ( "redis server creation failed: Address {} closed. {log_file_contents}" , server. addr) ) ;
217
+ } else if port_in_use ( & server. addr . to_string ( ) ) {
218
+ break Ok ( ( ) ) ;
219
+ }
220
+ eprintln ! ( "Waiting for redis process to initialize" ) ;
205
221
sleep ( Duration :: from_millis ( 50 ) ) ;
222
+ cur_attempts += 1 ;
223
+ }
224
+ }
225
+ Err ( e) => {
226
+ panic ! ( "Unexpected error in redis server creation {e}" ) ;
227
+ }
228
+ }
229
+ } ;
206
230
207
- let log_file_index = cmd. get_args ( ) . position ( |arg|arg == "--logfile" ) . unwrap ( ) + 1 ;
208
- let log_file_path = cmd. get_args ( ) . nth ( log_file_index) . unwrap ( ) ;
209
- match process. try_wait ( ) {
210
- Ok ( Some ( status) ) => {
211
- let stdout = process. stdout . map_or ( String :: new ( ) , |mut out|{
212
- let mut str = String :: new ( ) ;
213
- out. read_to_string ( & mut str) . unwrap ( ) ;
214
- str
215
- } ) ;
216
- let stderr = process. stderr . map_or ( String :: new ( ) , |mut out|{
217
- let mut str = String :: new ( ) ;
218
- out. read_to_string ( & mut str) . unwrap ( ) ;
219
- str
220
- } ) ;
221
-
222
- let log_file_contents = std:: fs:: read_to_string ( log_file_path) . unwrap ( ) ;
223
- let err =
224
- format ! ( "redis server creation failed with status {status:?}.\n stdout: `{stdout}`.\n stderr: `{stderr}`\n log file: {log_file_contents}" ) ;
225
- if cur_attempts == max_attempts {
226
- panic ! ( "{err}" ) ;
227
- }
228
- eprintln ! ( "Retrying: {err}" ) ;
229
- cur_attempts += 1 ;
230
- }
231
- Ok ( None ) => {
232
- // wait for 10 seconds for the server to be available.
233
- let max_attempts = 200 ;
234
- let mut cur_attempts = 0 ;
235
- loop {
236
- if cur_attempts == max_attempts {
237
- let log_file_contents = std:: fs:: read_to_string ( log_file_path) . unwrap ( ) ;
238
- panic ! ( "redis server creation failed: Port {port} closed. {log_file_contents}" )
239
- }
240
- if port_in_use ( & addr) {
241
- return process;
242
- }
243
- eprintln ! ( "Waiting for redis process to initialize" ) ;
244
- sleep ( Duration :: from_millis ( 50 ) ) ;
245
- cur_attempts += 1 ;
246
- }
247
- }
248
- Err ( e) => {
249
- panic ! ( "Unexpected error in redis server creation {e}" ) ;
231
+ let servers = optional_ports
232
+ . into_iter ( )
233
+ . map ( |port_option| {
234
+ for _ in 0 ..5 {
235
+ let port = match port_option {
236
+ Some ( port) => port,
237
+ None => loop {
238
+ let port = get_random_available_port ( ) ;
239
+ if chosen_ports. contains ( & port) {
240
+ continue ;
250
241
}
242
+ chosen_ports. insert ( port) ;
243
+ break port;
244
+ } ,
245
+ } ;
246
+ let mut server = make_server ( port) ;
247
+ sleep ( Duration :: from_millis ( 50 ) ) ;
248
+
249
+ match verify_server ( & mut server) {
250
+ Ok ( _) => {
251
+ let addr = format ! ( "127.0.0.1:{port}" ) ;
252
+ addrs. push ( addr. clone ( ) ) ;
253
+ return server;
251
254
}
255
+ Err ( err) => eprintln ! ( "{err}" ) ,
252
256
}
253
- } ,
254
- ) ) ;
255
- }
257
+ }
258
+ panic ! ( "Exhausted retries" ) ;
259
+ } )
260
+ . collect ( ) ;
256
261
257
262
let mut cmd = process:: Command :: new ( "redis-cli" ) ;
258
263
cmd. stdout ( process:: Stdio :: piped ( ) )
0 commit comments