Skip to content

Commit 1551963

Browse files
piochelepiotrclaude
andcommitted
Apply review fixes: degrade highwater offset collection gracefully
Mirror cluster_metadata.fetch_earliest_offsets in get_partition_offsets: broaden the per-future handler to catch any Exception so one bad partition does not abort the loop, and wrap the outer list_offsets call so a request/broker-level failure logs a warning and returns [] instead of aborting the whole highwater collection. Strengthen unit tests: assert list_offsets is called with isolation_level=READ_UNCOMMITTED and the request timeout, cover the non-Kafka per-partition error and request-level failure paths, and add an empty-partitions test that issues no request. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent bfad118 commit 1551963

2 files changed

Lines changed: 64 additions & 7 deletions

File tree

kafka_consumer/datadog_checks/kafka_consumer/client.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,21 @@ def get_partition_offsets(self, partitions, offset=-1):
159159
if not request:
160160
return []
161161

162-
futures = self.kafka_client.list_offsets(
163-
request,
164-
isolation_level=IsolationLevel.READ_UNCOMMITTED,
165-
request_timeout=self.config._request_timeout,
166-
)
162+
try:
163+
futures = self.kafka_client.list_offsets(
164+
request,
165+
isolation_level=IsolationLevel.READ_UNCOMMITTED,
166+
request_timeout=self.config._request_timeout,
167+
)
168+
except Exception as e:
169+
self.log.warning("Failed to issue list_offsets request; highwater offsets will be skipped this run: %s", e)
170+
return []
171+
167172
results = []
168173
for tp, future in futures.items():
169174
try:
170175
results.append((tp.topic, tp.partition, future.result().offset))
171-
except KafkaException as e:
176+
except Exception as e:
172177
self.log.debug("Skipping offsets for %s/%s: %s", tp.topic, tp.partition, e)
173178
return results
174179

kafka_consumer/tests/test_unit.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1093,9 +1093,58 @@ def test_get_partition_offsets_skips_unqueryable_partitions():
10931093
assert results == [("healthy_topic", 0, 100)]
10941094

10951095

1096+
def test_get_partition_offsets_skips_partition_on_non_kafka_error():
1097+
"""A non-Kafka error on one partition's future is skipped, not propagated, so the loop survives."""
1098+
from confluent_kafka import TopicPartition
1099+
1100+
config = mock.MagicMock()
1101+
config._request_timeout = 5
1102+
1103+
client = KafkaClient(config, logging.getLogger(__name__))
1104+
1105+
futures = {
1106+
TopicPartition(topic="healthy_topic", partition=0): _offset_future(100),
1107+
TopicPartition(topic="bad_topic", partition=0): _raising_future(RuntimeError("unexpected")),
1108+
}
1109+
client._kafka_client = mock.MagicMock()
1110+
client._kafka_client.list_offsets.return_value = futures
1111+
1112+
results = client.get_partition_offsets([("healthy_topic", 0), ("bad_topic", 0)])
1113+
1114+
assert results == [("healthy_topic", 0, 100)]
1115+
1116+
1117+
def test_get_partition_offsets_degrades_when_list_offsets_request_fails():
1118+
"""A request/broker-level list_offsets failure degrades to [] instead of aborting collection."""
1119+
config = mock.MagicMock()
1120+
config._request_timeout = 5
1121+
1122+
client = KafkaClient(config, logging.getLogger(__name__))
1123+
client._kafka_client = mock.MagicMock()
1124+
client._kafka_client.list_offsets.side_effect = RuntimeError("connection dropped")
1125+
1126+
results = client.get_partition_offsets([("topic_a", 0)])
1127+
1128+
assert results == []
1129+
1130+
1131+
def test_get_partition_offsets_empty_partitions_returns_empty_without_request():
1132+
"""No partitions means no list_offsets request is issued and an empty result is returned."""
1133+
config = mock.MagicMock()
1134+
config._request_timeout = 5
1135+
1136+
client = KafkaClient(config, logging.getLogger(__name__))
1137+
client._kafka_client = mock.MagicMock()
1138+
1139+
results = client.get_partition_offsets([])
1140+
1141+
assert results == []
1142+
assert client._kafka_client.list_offsets.call_count == 0
1143+
1144+
10961145
def test_get_partition_offsets_returns_all_healthy_partitions():
10971146
"""When every list_offsets future succeeds, all partition offsets are returned."""
1098-
from confluent_kafka import TopicPartition
1147+
from confluent_kafka import IsolationLevel, TopicPartition
10991148

11001149
config = mock.MagicMock()
11011150
config._request_timeout = 5
@@ -1113,3 +1162,6 @@ def test_get_partition_offsets_returns_all_healthy_partitions():
11131162

11141163
assert sorted(results) == [("topic_a", 0, 42), ("topic_b", 1, 7)]
11151164
assert client._kafka_client.list_offsets.call_count == 1
1165+
# READ_UNCOMMITTED is load-bearing: READ_COMMITTED would return the LSO, not the true high watermark.
1166+
assert client._kafka_client.list_offsets.call_args.kwargs["isolation_level"] == IsolationLevel.READ_UNCOMMITTED
1167+
assert client._kafka_client.list_offsets.call_args.kwargs["request_timeout"] == 5

0 commit comments

Comments
 (0)