Skip to content

Commit dd36fa0

Browse files
committed
fix: better logic for retrying redis connection
1 parent 05c31ed commit dd36fa0

1 file changed

Lines changed: 51 additions & 16 deletions

File tree

src/redis_logs.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
use crate::config::RedisConfig;
12
use chrono::TimeZone;
23
use redis::Commands;
34
use rmp_serde;
45
use serde::{Deserialize, Serialize};
5-
use std::error::Error;
6+
use std::{error::Error, thread, time::Duration};
67
use tokio::sync::mpsc;
78

8-
use crate::config::RedisConfig;
9-
109
#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
1110
pub struct Elapsed {
1211
pub repr: String,
@@ -144,7 +143,7 @@ fn str_error(err: &str) -> Box<dyn Error> {
144143
Box::<dyn Error>::from(err)
145144
}
146145

147-
fn redis_conn(url: &str) -> Result<redis::Connection, redis::RedisError> {
146+
fn create_redis_conn(url: &str) -> Result<redis::Connection, redis::RedisError> {
148147
let client = redis::Client::open(url)?;
149148
client.get_connection()
150149
}
@@ -166,6 +165,10 @@ fn read_logs(
166165
let raw_reply: redis::streams::StreamReadReply =
167166
redis_conn.xread_options(&LOGGING_ENDPOINT, &[last_id], &stream_read_opts(config))?;
168167

168+
if raw_reply.keys.len() == 0 {
169+
return Ok((Some(last_id.to_owned()), vec![]));
170+
}
171+
169172
let log_key = raw_reply
170173
.keys
171174
.get(0)
@@ -236,28 +239,60 @@ fn setup_consumer_group(conn: &mut redis::Connection, config: &RedisConfig) {
236239
));
237240
}
238241

239-
pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: RedisConfig) {
240-
let mut redis_conn = redis_conn(&config.url.full_url()).expect("Could not connect to Redis!");
242+
fn check_connection(redis_conn: &mut redis::Connection, config: &RedisConfig) -> Result<(), ()> {
241243
if let Ok(key_exists) = redis_conn.exists::<&str, bool>(&LOGGING_ENDPOINT[0]) {
242244
if !key_exists {
243245
println!("Logging endpoint doesn't exist, exiting.");
244-
return;
246+
return Err(());
245247
}
246248
} else {
247249
panic!("Something went wrong checking the logs endpoint")
248250
}
251+
setup_consumer_group(redis_conn, &config);
252+
Ok(())
253+
}
254+
pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: RedisConfig) {
255+
let mut redis_conn =
256+
create_redis_conn(&config.url.full_url()).expect("Could not connect to Redis!");
257+
if check_connection(&mut redis_conn, &config).is_err() {
258+
return;
259+
}
260+
249261
let stream_read_id: String = ">".into();
250-
setup_consumer_group(&mut redis_conn, &config);
251262

252263
'main: loop {
253-
if let Ok((Some(_), packed)) = read_logs(&mut redis_conn, &stream_read_id, &config) {
254-
let unpacked = process_data(packed).unwrap_or(vec![error_log_item()]);
255-
let records = extract_records(unpacked);
256-
257-
for record in records {
258-
if tx.send(record).is_err() {
259-
println!("Receiver dropped, stopping...");
260-
break 'main;
264+
let raw_read = read_logs(&mut redis_conn, &stream_read_id, &config);
265+
if let Ok(response) = raw_read {
266+
if let (Some(_), packed) = response {
267+
if packed.len() == 0 {
268+
continue;
269+
}
270+
let unpacked = process_data(packed).unwrap_or(vec![error_log_item()]);
271+
let records = extract_records(unpacked);
272+
273+
for record in records {
274+
if tx.send(record).is_err() {
275+
println!("Receiver dropped, stopping...");
276+
break 'main;
277+
}
278+
}
279+
}
280+
} else {
281+
println!("{:?}", raw_read);
282+
redis_conn = loop {
283+
{
284+
let new_conn = create_redis_conn(&config.url.full_url());
285+
if new_conn.is_err() {
286+
println!("Error reading from redis, retrying connection in 1s");
287+
thread::sleep(Duration::from_millis(1000));
288+
} else {
289+
println!("Reconnected");
290+
let mut conn = new_conn.unwrap();
291+
if check_connection(&mut conn, &config).is_err() {
292+
return;
293+
}
294+
break conn;
295+
}
261296
}
262297
}
263298
}

0 commit comments

Comments
 (0)