Skip to content

Commit 98b71ad

Browse files
bkumaranclaude
andcommitted
Fix DCP test failures and log noise for magma buckets
self.vbuckets was hard-coded to range(1024) but magma buckets default to 128 vbuckets; accessing vb_map[128+] raised IndexError in select_dcp_client for every DCP collection test. Fix by deriving the range from the actual vb_map length after fetching the cluster config. Fix TypeError in handleSystemEvent where response['key'] arrives as bytes in Python 3 and was stored raw into the manifest, causing json.dumps(vb['manifest']) to fail. Decode with surrogateescape so non-UTF-8 binary keys are round-tripped safely rather than raising UnicodeDecodeError, consistent with the existing guard in handleMutation. Fix recv_op to suppress hundreds of spurious "recv_op Exception: timed out" lines that appeared during disk-checkpoint streams. Socket poll timeouts are expected when the server is flushing to disk and are already handled by the retry logic in process_dcp_traffic; printing them on every poll obscures real errors. Also fix a dead-code branch where 'Timeout' (capital T) never matched the actual socket exception message 'timed out' (lowercase), and remove the _stream_timeout assignment from this path since a transient poll timeout should not permanently short-circuit the connection. Co-authored-by: Claude <noreply@anthropic.com> Change-Id: Iecb5d78aaeafd435d7da9882e9c2e2d3f7ee6e7f Reviewed-on: https://review.couchbase.org/c/TAF/+/245571 Reviewed-by: Ashwin <ashwin.govindarajulu@couchbase.com> Tested-by: Balakumaran G <balakumaran.gopal@couchbase.com> Tested-by: Couchbase QE Reviewed-by: Balakumaran G <balakumaran.gopal@couchbase.com>
1 parent f63c053 commit 98b71ad

2 files changed

Lines changed: 9 additions & 6 deletions

File tree

lib/dcp_bin_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,10 @@ def stream_req(self, vbucket, takeover, start_seqno, end_seqno,
152152
response = self._handle_op(op)
153153

154154
def __generator(response):
155-
156155
yield response
157156
last_by_seqno = 0
158157

159158
while True:
160-
161159
if not op.queue.empty():
162160
response = op.queue.get()
163161
else:
@@ -247,14 +245,16 @@ def recv_op(self, op):
247245
self.ack_dcp_noop_req(opaque)
248246

249247
except Exception as ex:
250-
print("recv_op Exception:", ex)
251-
if 'died' in str(ex):
248+
ex_str = str(ex)
249+
if 'died' in ex_str:
250+
print("recv_op Exception:", ex)
252251
return {'opcode': op.opcode,
253252
'status': 0xff}
254-
elif 'Timeout' in str(ex):
255-
self._stream_timeout = True
253+
elif 'timed out' in ex_str or 'Timeout' in ex_str:
254+
# Socket poll timeout — expected on slow streams, not an error
256255
return None
257256
else:
257+
print("recv_op Exception:", ex)
258258
return None
259259

260260
def ack_stream_req(self, opaque):

pytests/dcp_new/dcp_base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def initialise_cluster_connections(self):
5959

6060
config_json = json.loads(DcpClient.get_config(init_dcp_client)[2])
6161
self.vb_map = config_json['vBucketServerMap']['vBucketMap']
62+
self.vbuckets = range(len(self.vb_map))
6263

6364
self.dcp_client_dict = dict()
6465

@@ -183,6 +184,8 @@ def handle_stream_create_response(self, dcpStream):
183184

184185
def handleSystemEvent(self, response, manifest):
185186
# Unpack a DCP system event
187+
if isinstance(response.get('key'), bytes):
188+
response['key'] = response['key'].decode('utf-8', errors='surrogateescape')
186189
if response['event'] == EVENT_CREATE_COLLECTION:
187190
if response['version'] == 0:
188191
uid, sid, cid = struct.unpack(">QII", response['value'])

0 commit comments

Comments
 (0)