Skip to content

Commit 6c5afd8

Browse files
gl-puptanji
authored andcommitted
fix(redis sink): treat PUBLISH with 0 subscribers as success
For Channel data_type, Redis PUBLISH returns the number of subscribers that received the message. When no subscribers are connected (e.g. the consumer momentarily dropped its subscription), Redis returns 0. redis-rs deserializes this integer 0 as bool false into Vec<bool>, causing is_successful() to return false and Vector to retry the same request indefinitely — even though the connection and Redis are healthy. Fix: short-circuit is_successful() to true for DataType::Channel. A zero-subscriber publish is a valid outcome in pub/sub, not an error.
1 parent 92ee2b2 commit 6c5afd8

1 file changed

Lines changed: 8 additions & 2 deletions

File tree

src/sinks/redis/service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::task::{Context, Poll};
22

33
use super::{
4-
RedisRequest, RedisSinkError,
4+
DataType, RedisRequest, RedisSinkError,
55
config::{ListMethod, SortedSetMethod},
66
sink::{ConnectionState, RedisConnection},
77
};
@@ -72,6 +72,7 @@ impl Service<RedisRequest> for RedisService {
7272
}
7373

7474
let byte_size = kvs.metadata.events_byte_size();
75+
let data_type = self.data_type;
7576

7677
Box::pin(async move {
7778
let ConnectionState {
@@ -82,6 +83,7 @@ impl Service<RedisRequest> for RedisService {
8283
match pipe.query_async(&mut conn).await {
8384
Ok(event_status) => Ok(RedisResponse {
8485
event_status,
86+
data_type,
8587
events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(),
8688
byte_size,
8789
}),
@@ -96,13 +98,17 @@ impl Service<RedisRequest> for RedisService {
9698

9799
pub struct RedisResponse {
98100
pub event_status: Vec<bool>,
101+
pub data_type: DataType,
99102
pub events_byte_size: GroupedCountByteSize,
100103
pub byte_size: usize,
101104
}
102105

103106
impl RedisResponse {
104107
pub(super) fn is_successful(&self) -> bool {
105-
self.event_status.iter().all(|x| *x)
108+
// For Channel (pub/sub), PUBLISH returns the number of subscribers that received the
109+
// message. Zero subscribers is valid — the consumer may have momentarily dropped its
110+
// subscription — so we never treat it as a failure.
111+
matches!(self.data_type, DataType::Channel) || self.event_status.iter().all(|x| *x)
106112
}
107113
}
108114

0 commit comments

Comments
 (0)