Skip to content

Commit 87439f7

Browse files
gmrclaude
andcommitted
Address CodeRabbit review: codec and connection hardening
- Pass content_type to _load_bs4 so XML content uses the xml parser instead of silently applying html.parser to text/xml - Wrap encode() in try/except to maintain EncodeError contract, matching the existing pattern in decode() - Extract _serialize and _compress helpers to reduce encode complexity - Guard against missing umsgpack with explicit EncodeError - Catch ConnectionWrongStateError alongside ConnectionClosed in on_open - Treat AMQP reply codes 403, 405, 406 as terminal channel errors to prevent infinite reopen loops on configuration/permission failures Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7639306 commit 87439f7

2 files changed

Lines changed: 60 additions & 32 deletions

File tree

rejected/codecs.py

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async def decode(
119119
if content_type == 'text/csv':
120120
return _load_csv(body)
121121
if bs4 and content_type in BS4_MIME_TYPES:
122-
return _load_bs4(body)
122+
return _load_bs4(body, content_type)
123123
if content_type in YAML_MIME_TYPES:
124124
return yaml.safe_load(body)
125125
except DecodeError:
@@ -142,34 +142,46 @@ async def encode(
142142
str/bytes), then content_encoding compression.
143143
144144
"""
145-
if not isinstance(body, (str, bytes)):
146-
if content_type == AVRO_DATUM_MIME_TYPE:
147-
body = await self._encode_avro(body, message_type)
148-
elif content_type == 'application/json':
149-
body = json.dumps(body, ensure_ascii=True).encode('utf-8')
150-
elif umsgpack and content_type == 'application/msgpack':
151-
body = umsgpack.packb(body)
152-
elif content_type == 'application/x-plist':
153-
body = plistlib.dumps(body)
154-
elif content_type == 'text/csv':
155-
body = _dump_csv(body)
156-
elif (
157-
bs4
158-
and isinstance(body, bs4.BeautifulSoup)
159-
and content_type in BS4_MIME_TYPES
160-
):
161-
body = str(body)
162-
elif content_type in YAML_MIME_TYPES:
163-
body = yaml.dump(body)
164-
165-
if content_encoding:
166-
if isinstance(body, str):
167-
body = body.encode('utf-8')
168-
if content_encoding == 'gzip':
169-
body = gzip.compress(body)
170-
elif content_encoding == 'bzip2':
171-
body = bz2.compress(body)
145+
try:
146+
if not isinstance(body, (str, bytes)):
147+
body = await self._serialize(body, content_type, message_type)
148+
if content_encoding:
149+
body = _compress(body, content_encoding)
150+
except EncodeError:
151+
raise
152+
except Exception as error:
153+
raise EncodeError(str(error)) from error
154+
return body
172155

156+
async def _serialize(
157+
self,
158+
body: typing.Any,
159+
content_type: str | None,
160+
message_type: str | None,
161+
) -> typing.Any:
162+
"""Serialize a non-string/bytes body by content type."""
163+
if content_type == AVRO_DATUM_MIME_TYPE:
164+
return await self._encode_avro(body, message_type)
165+
if content_type == 'application/json':
166+
return json.dumps(body, ensure_ascii=True).encode('utf-8')
167+
if content_type == 'application/msgpack':
168+
if umsgpack is None:
169+
raise EncodeError(
170+
'umsgpack is required for MessagePack encoding'
171+
)
172+
return umsgpack.packb(body)
173+
if content_type == 'application/x-plist':
174+
return plistlib.dumps(body)
175+
if content_type == 'text/csv':
176+
return _dump_csv(body)
177+
if (
178+
bs4
179+
and isinstance(body, bs4.BeautifulSoup)
180+
and content_type in BS4_MIME_TYPES
181+
):
182+
return str(body)
183+
if content_type in YAML_MIME_TYPES:
184+
return yaml.dump(body)
173185
return body
174186

175187
async def _encode_avro(
@@ -282,6 +294,17 @@ async def _load_http_schema(
282294
# --- Internal helpers (stateless, sync) ---
283295

284296

297+
def _compress(body: typing.Any, content_encoding: str) -> bytes:
298+
"""Apply content-encoding compression to a body."""
299+
if isinstance(body, str):
300+
body = body.encode('utf-8')
301+
if content_encoding == 'gzip':
302+
return gzip.compress(body)
303+
if content_encoding == 'bzip2':
304+
return bz2.compress(body)
305+
return body
306+
307+
285308
def _load_json(value: bytes | str) -> typing.Any:
286309
"""Deserialize a JSON value."""
287310
if isinstance(value, bytes):
@@ -310,13 +333,14 @@ def _load_csv(value: bytes | str) -> csv.DictReader[str]:
310333
return csv.DictReader(csv_buffer, dialect=dialect)
311334

312335

313-
def _load_bs4(value: bytes | str) -> typing.Any:
336+
def _load_bs4(value: bytes | str, content_type: str) -> typing.Any:
314337
"""Parse HTML or XML into a BeautifulSoup object."""
315338
if not bs4:
316339
raise DecodeError('BeautifulSoup4 is not enabled')
317340
if isinstance(value, bytes):
318341
value = value.decode('utf-8')
319-
return bs4.BeautifulSoup(value, 'html.parser')
342+
parser = 'xml' if content_type == 'text/xml' else 'html.parser'
343+
return bs4.BeautifulSoup(value, parser)
320344

321345

322346
def _dump_csv(value: list[list[typing.Any]]) -> str:

rejected/connection.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,10 @@ def on_open(
104104
return
105105
try:
106106
self.connection.channel(on_open_callback=self.on_channel_open)
107-
except pika.exceptions.ConnectionClosed:
107+
except (
108+
pika.exceptions.ConnectionWrongStateError,
109+
pika.exceptions.ConnectionClosed,
110+
):
108111
LOGGER.warning('Channel open on closed connection')
109112
self.set_state(self.STATE_CLOSED)
110113
self.callbacks.on_closed(self.name)
@@ -202,7 +205,8 @@ def on_channel_closed(
202205
reply_code = 0
203206
reply_text = str(closing_reason) or 'unknown'
204207

205-
if reply_code <= 0 or reply_code == 404:
208+
terminal_codes = {0, 403, 404, 405, 406}
209+
if reply_code <= 0 or reply_code in terminal_codes:
206210
LOGGER.error(
207211
'Channel Error (%r): %s', reply_code, reply_text or 'unknown'
208212
)

0 commit comments

Comments
 (0)