Skip to content

Commit d22b034

Browse files
gmrclaude
andcommitted
Address CodeRabbit nitpicks: retry, state alias, test result handling
codecs.py: HTTP schema fetch retries 3 times with exponential backoff, catching httpx.RequestError and non-200 responses before raising DecodeError. connection.py: STATE_CLOSED explicitly assigned from state.State.STATE_STOPPED with comment clarifying the intentional alias for is_closed/is_stopped equivalence. testing.py: process_message now raises AssertionError on MESSAGE_REQUEUE (unexpected in tests). MESSAGE_DROP returns normally (valid test outcome). Uses match/case for clarity. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent b8d6db9 commit d22b034

3 files changed

Lines changed: 37 additions & 19 deletions

File tree

rejected/codecs.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,25 @@ async def _load_http_schema(
262262
)
263263
if self._http_client is None:
264264
self._http_client = httpx.AsyncClient(timeout=30)
265-
response = await self._http_client.get(uri)
266-
if response.status_code != 200:
267-
raise DecodeError(
268-
f'Failed to fetch Avro schema for {message_type}: '
269-
f'HTTP {response.status_code}'
270-
)
271-
schema: dict[str, typing.Any] = response.json()
272-
return schema
265+
last_err: Exception | None = None
266+
for attempt in range(3):
267+
try:
268+
response = await self._http_client.get(uri)
269+
if response.status_code == 200:
270+
schema: dict[str, typing.Any] = response.json()
271+
return schema
272+
last_err = DecodeError(
273+
f'Failed to fetch Avro schema for '
274+
f'{message_type}: HTTP {response.status_code}'
275+
)
276+
except httpx.RequestError as err:
277+
last_err = err
278+
if attempt < 2:
279+
await asyncio.sleep(0.5 * (2**attempt))
280+
raise DecodeError(
281+
f'Failed to fetch Avro schema for {message_type} '
282+
f'after 3 attempts: {last_err}'
283+
) from last_err
273284

274285

275286
# --- Internal helpers (stateless, sync) ---

rejected/connection.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
class Connection(state.State):
2121
HB_INTERVAL: typing.ClassVar[int] = 300
22-
STATE_CLOSED: typing.ClassVar[int] = 0x08
22+
# Intentionally aliased to STATE_STOPPED — is_closed delegates
23+
# to is_stopped so both report the same terminal state.
24+
STATE_CLOSED: typing.ClassVar[int] = state.State.STATE_STOPPED
2325
STATES: typing.ClassVar[dict[int, str]] = {
2426
**state.State.STATES,
2527
STATE_CLOSED: 'Closed',

rejected/testing.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,16 +237,21 @@ def _capture_log(
237237

238238
await self.consumer.execute(ctx)
239239

240-
if ctx.result == models.Result.CONSUMER_EXCEPTION:
241-
raise exceptions.ConsumerException()
242-
elif ctx.result == models.Result.MESSAGE_EXCEPTION:
243-
raise exceptions.MessageException()
244-
elif ctx.result == models.Result.PROCESSING_EXCEPTION:
245-
raise exceptions.ProcessingException()
246-
elif ctx.result == models.Result.UNHANDLED_EXCEPTION:
247-
if self.exc_info:
248-
raise self.exc_info[1]
249-
raise AssertionError('UNHANDLED_EXCEPTION')
240+
match ctx.result:
241+
case models.Result.CONSUMER_EXCEPTION:
242+
raise exceptions.ConsumerException()
243+
case models.Result.MESSAGE_EXCEPTION:
244+
raise exceptions.MessageException()
245+
case models.Result.PROCESSING_EXCEPTION:
246+
raise exceptions.ProcessingException()
247+
case models.Result.UNHANDLED_EXCEPTION:
248+
if self.exc_info:
249+
raise self.exc_info[1]
250+
raise AssertionError('UNHANDLED_EXCEPTION')
251+
case models.Result.MESSAGE_REQUEUE:
252+
raise AssertionError(
253+
'Message was requeued — consumer returned MESSAGE_REQUEUE'
254+
)
250255
return ctx.measurement
251256

252257
@staticmethod

0 commit comments

Comments
 (0)