Skip to content

Commit 2f5d953

Browse files
fix(resp3): accept push frames; count nil _ as miss in arbitrary command tracking (#435)
* fix(resp3): accept push frames; count nil _ as miss in arbitrary command tracking Two RESP3 correctness bugs surfaced by the 2.4 readiness review. #27: aggregate_type() lacked '>'. A server-pushed message (pubsub, keyspace notifications, client tracking invalidation) on a RESP3 connection hit the "unsupported response" path and dropped the connection. Now accepts and drains push frames inline. #25: SingleNullBulk and ArrayPerElementNulls miss sentinels only recognised RESP2 nil shapes ($-1/*-1/*0). RESP3 nil (_\r\n) was miscounted as a hit, silently understating miss rate and suppressing --miss-rate-threshold warnings. Refs: 2.4 review findings #25 + #27 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(resp3): flag RESP3 null at construction; distinguish from literal "_" Bugbot flagged that the ArrayPerElementNulls walker used a content heuristic (value_len==1 && value[0]=='_') that couldn't tell a RESP3 null parsed via single_type from a real bulk value of literal "_" parsed via blob_type. Both produce identical bulk_el. Added bulk_el::is_resp3_null flag, set only when single_type constructs the element for type byte '_'. Walker now reads the flag directly instead of inferring from content. Test added that stores "_" values and asserts they are NOT counted as misses. Refs: bugbot LOW on PR #435 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(resp3): drain server-pushed frames without delivering as reply Reviewer caught that the prior commit accepted RESP3 push (>) into aggregate_type() but didn't suppress its effect on m_last_response. The push frame fully parsed AS a reply, response_ended() returned 1, and memtier dequeued the push as the reply to the next in-flight command -- corrupting latency attribution + hit/miss accounting. Adds an m_push drain-mode flag: - Set when a push aggregate is encountered at top level - Suppresses every mutation to m_last_response while true - Cleared in response_ended(), which returns false so the parser continues to read the actual reply - Reset on rs_initial transition Refs: 2.4 review of PR #435 (R1 CRITICAL). * fix(resp3): reset m_response_len when draining push frame Bugbot caught that response_ended()'s m_push drain returned false without resetting m_response_len. The accumulated push frame bytes stayed in the counter and contaminated set_total_len() for the actual reply, skewing bandwidth and per-op size statistics. Resetting m_response_len to 0 at the push-drain-complete site matches the behaviour at rs_initial (the only other site where m_response_len is reset). Refs: bugbot LOW on PR #435 (follow-up). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(resp3): clear both m_push and m_attribute together in response_ended() Bugbot caught that if an attribute (|) appears nested inside a push frame (>), and both complete at the same response_ended() call, the m_attribute branch fired first and returned false, leaving m_push set. The parser then consumed the actual command reply as push-drain data, desynchronizing the connection. response_ended() now clears whichever flags are set in a single call and returns false if either was set. Refs: bugbot LOW on PR #435 (follow-up). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6288b5f commit 2f5d953

4 files changed

Lines changed: 280 additions & 20 deletions

File tree

client.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -795,11 +795,13 @@ void client::handle_response(unsigned int conn_id, struct timeval timestamp, req
795795
// bulk), *-1 (null array), or anything else (a value).
796796
const char *status = response->get_status();
797797
// Miss sentinels: $-1 (null bulk), *-1 (null array, RESP2),
798-
// and *0 (empty array - returned by SPOP/SRANDMEMBER with a
799-
// count argument when the key is absent). Anything else is
800-
// a value (or a non-empty container) and counts as a hit.
798+
// *0 (empty array - returned by SPOP/SRANDMEMBER with a
799+
// count argument when the key is absent), and "_" (RESP3
800+
// null, parsed via single_type and stored verbatim as the
801+
// status line without the CRLF). Anything else is a value
802+
// (or a non-empty container) and counts as a hit.
801803
bool is_null = (status != NULL && (strcmp(status, "$-1") == 0 || strcmp(status, "*-1") == 0 ||
802-
strcmp(status, "*0") == 0));
804+
strcmp(status, "*0") == 0 || strcmp(status, "_") == 0));
803805
// Always one bucket for SingleNullBulk: variadic-key blocking
804806
// commands like BLPOP carry the winning key in the reply but
805807
// we don't parse it, so per-key attribution beyond hit/miss
@@ -853,7 +855,15 @@ void client::handle_response(unsigned int conn_id, struct timeval timestamp, req
853855
// convention; the value-pointer check is the
854856
// semantically correct one and is forward-
855857
// compatible with a parser that distinguishes.
856-
if (bel != NULL && bel->value != NULL) {
858+
//
859+
// RESP3 nil "_\r\n" goes through single_type
860+
// and is stored as a bulk_el with is_resp3_null
861+
// set to true. Using the flag (set at parse time)
862+
// avoids false positives from a legitimate bulk
863+
// string whose content happens to be the single
864+
// character '_' (parsed via blob_type, which
865+
// never sets is_resp3_null). Treat it as a miss.
866+
if (bel != NULL && bel->value != NULL && !bel->is_resp3_null) {
857867
h = true;
858868
}
859869
} else if (el->is_mbulk_size()) {

protocol.cpp

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ class redis_protocol : public abstract_protocol
167167
mbulk_size_el *m_current_mbulk;
168168
bool m_resp3;
169169
bool m_attribute;
170+
// Drain mode for RESP3 server-pushed frames ('>'). Pushes are
171+
// out-of-band (pubsub, keyspace notifications, client tracking
172+
// invalidations) and must not be delivered as a reply to any
173+
// in-flight command. While true the parser still walks the frame
174+
// to advance the read buffer, but suppresses all writes into
175+
// m_last_response (status, mbulk tree, value, hits).
176+
// TODO: full out-of-band routing — surface push frames to a
177+
// listener instead of silently dropping them.
178+
bool m_push;
170179

171180
bool aggregate_type(char c);
172181
bool blob_type(char c);
@@ -181,7 +190,8 @@ class redis_protocol : public abstract_protocol
181190
m_total_bulks_count(0),
182191
m_current_mbulk(NULL),
183192
m_resp3(false),
184-
m_attribute(false)
193+
m_attribute(false),
194+
m_push(false)
185195
{
186196
}
187197
virtual redis_protocol *clone(void) { return new redis_protocol(); }
@@ -471,7 +481,16 @@ bool redis_protocol::aggregate_type(char c)
471481
{
472482
if (c == '*') return true;
473483

474-
if (m_resp3 && (c == '%' || c == '~' || c == '|')) return true;
484+
// RESP3: map (%), set (~), attribute (|), and push (>) are all aggregate
485+
// types. Push frames (">N\r\n" followed by N elements) are structurally
486+
// identical to arrays and can be emitted by Redis 7+ at any point on a
487+
// RESP3 connection (pubsub, keyspace notifications, client-tracking
488+
// invalidation). We parse them as ordinary aggregates and discard the
489+
// content; the alternative -- returning -1 from parse_response -- tears
490+
// down the connection unnecessarily.
491+
// TODO: route drained push frames to an out-of-band callback so callers
492+
// can observe server-initiated events without disrupting the reply stream.
493+
if (m_resp3 && (c == '%' || c == '~' || c == '|' || c == '>')) return true;
475494

476495
return false;
477496
}
@@ -498,11 +517,22 @@ bool redis_protocol::response_ended()
498517
{
499518
if (m_total_bulks_count != 0) return false;
500519

501-
if (m_attribute) {
502-
m_attribute = false;
503-
return false;
520+
bool was_push = m_push;
521+
bool was_attribute = m_attribute;
522+
523+
if (was_attribute) m_attribute = false;
524+
// A RESP3 push frame just finished draining. The actual reply to
525+
// the in-flight command (if any) is still pending — keep reading
526+
// and do NOT signal a complete reply for the push.
527+
// Also handles the case where an attribute (|) is nested inside a
528+
// push frame (>) and both complete at the same call: clear BOTH
529+
// flags together so m_push does not remain set and eat the next reply.
530+
if (was_push) {
531+
m_push = false;
532+
m_response_len = 0; // reset: push bytes don't count toward the next reply
504533
}
505534

535+
if (was_push || was_attribute) return false;
506536
return true;
507537
}
508538

@@ -519,6 +549,7 @@ int redis_protocol::parse_response(void)
519549
m_response_len = 0;
520550
m_total_bulks_count = 0;
521551
m_attribute = 0;
552+
m_push = 0;
522553
m_response_state = rs_read_line;
523554

524555
break;
@@ -553,12 +584,22 @@ int redis_protocol::parse_response(void)
553584
m_attribute = true;
554585
}
555586

587+
if (line[0] == '>') {
588+
// Top-level push frame: enter drain mode. Nested
589+
// pushes (push inside push) are not defined by RESP3
590+
// and not expected; only set the flag once.
591+
m_push = true;
592+
}
593+
556594
// Map or Attribute contain key-value pair
557595
if (line[0] == '%' || line[0] == '|') {
558596
count *= 2;
559597
}
560598

561-
if (m_keep_value) {
599+
// Suppress mbulk allocation while draining a push frame:
600+
// the contents are out-of-band and must not be delivered
601+
// as the reply to any in-flight command.
602+
if (m_keep_value && !m_push) {
562603
mbulk_size_el *new_mbulk_size = new mbulk_size_el();
563604
new_mbulk_size->bulks_count = count;
564605
new_mbulk_size->upper_level = m_current_mbulk;
@@ -574,7 +615,11 @@ int redis_protocol::parse_response(void)
574615
m_current_mbulk = new_mbulk_size->get_next_mbulk();
575616
}
576617

577-
m_last_response.set_status(line);
618+
if (!m_push) {
619+
m_last_response.set_status(line);
620+
} else {
621+
free(line);
622+
}
578623
m_total_bulks_count += count;
579624

580625
if (response_ended()) {
@@ -589,9 +634,14 @@ int redis_protocol::parse_response(void)
589634
}
590635

591636
m_bulk_len = strtol(line + 1, NULL, 10);
592-
m_last_response.set_status(line);
637+
// Suppress reply mutation while draining a push frame.
638+
if (!m_push) {
639+
m_last_response.set_status(line);
593640

594-
if (line[0] == '!') m_last_response.set_error();
641+
if (line[0] == '!') m_last_response.set_error();
642+
} else {
643+
free(line);
644+
}
595645

596646
/*
597647
* only on negative bulk, the data ends right after the first CRLF ($-1\r\n), so
@@ -609,22 +659,31 @@ int redis_protocol::parse_response(void)
609659
}
610660

611661
// if we are not inside mbulk, the status will be kept in m_status anyway
612-
if (m_keep_value && m_current_mbulk) {
662+
if (m_keep_value && m_current_mbulk && !m_push) {
613663
char *bulk_value = strdup(line);
614664
assert(bulk_value != NULL);
615665

616666
bulk_el *new_bulk = new bulk_el();
617667
new_bulk->value = bulk_value;
618668
new_bulk->value_len = strlen(bulk_value);
669+
// Mark RESP3 null elements explicitly so the walker can
670+
// distinguish them from a legitimate bulk string whose
671+
// content happens to be the single character '_' (which
672+
// would arrive via blob_type / rs_read_bulk, not here).
673+
new_bulk->is_resp3_null = (line[0] == '_');
619674

620675
// insert it to current mbulk
621676
m_current_mbulk->add_new_element(new_bulk);
622677
m_current_mbulk = m_current_mbulk->get_next_mbulk();
623678
}
624679

625-
if (line[0] == '-') m_last_response.set_error();
680+
if (!m_push) {
681+
if (line[0] == '-') m_last_response.set_error();
626682

627-
m_last_response.set_status(line);
683+
m_last_response.set_status(line);
684+
} else {
685+
free(line);
686+
}
628687
m_total_bulks_count--;
629688

630689
if (response_ended()) {
@@ -650,7 +709,9 @@ int redis_protocol::parse_response(void)
650709
* such key as well as non existing key or existing key without data
651710
* in the requested range
652711
*/
653-
if (m_bulk_len > 0) {
712+
// Suppress hit accounting while draining a push frame; the
713+
// bulk belongs to an out-of-band message, not the reply.
714+
if (m_bulk_len > 0 && !m_push) {
654715
m_last_response.incr_hits();
655716
}
656717

@@ -660,7 +721,14 @@ int redis_protocol::parse_response(void)
660721
}
661722
break;
662723
case rs_end_bulk:
663-
if (m_keep_value) {
724+
// Push-frame drain: read+discard the bulk bytes without
725+
// touching m_last_response so the reply remains clean.
726+
if (m_push) {
727+
if (m_bulk_len >= 0) {
728+
int ret = evbuffer_drain(m_read_buf, m_bulk_len + 2);
729+
assert(ret != -1);
730+
}
731+
} else if (m_keep_value) {
664732
/*
665733
* keep bulk value - in case we need to save bulk value it depends
666734
* if it's inside a mbulk or not.

protocol.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class mbulk_size_el : public mbulk_element
116116
class bulk_el : public mbulk_element
117117
{
118118
public:
119-
bulk_el() : mbulk_element(mbulk_element_bulk), value(NULL), value_len(0) { ; }
119+
bulk_el() : mbulk_element(mbulk_element_bulk), value(NULL), value_len(0), is_resp3_null(false) { ; }
120120
virtual ~bulk_el()
121121
{
122122
free(value);
@@ -129,6 +129,11 @@ class bulk_el : public mbulk_element
129129

130130
char *value;
131131
unsigned int value_len;
132+
// True only when this element was produced by the single_type parser
133+
// branch for the RESP3 null type byte '_'. Distinguishes a genuine
134+
// RESP3 null from a legitimate bulk string whose content happens to be
135+
// the single character '_' (parsed via blob_type / rs_read_bulk).
136+
bool is_resp3_null;
132137
};
133138

134139
struct protocol_response

0 commit comments

Comments
 (0)