Skip to content

Commit b7ec931

Browse files
Merge pull request #64 from glassflow/fix/dlq-empty-response
ETL-772: Fix/dlq empty response
2 parents 133f6af + 3d50bb9 commit b7ec931

File tree

4 files changed

+51
-5
lines changed

4 files changed

+51
-5
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.7.2
1+
3.7.3

src/glassflow/etl/dlq.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]:
4040
"GET", f"{self.endpoint}/consume", params={"batch_size": batch_size}
4141
)
4242
response.raise_for_status()
43-
if response.status_code != 204:
44-
return response.json()
45-
return []
43+
if response.status_code == 204 or not response.content:
44+
return []
45+
return response.json()
4646
except errors.UnprocessableContentError as e:
4747
raise InvalidBatchSizeError(
4848
f"Invalid batch size: batch size should be larger than 1 "

src/glassflow/etl/models/stateless_transformation.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ class Transformation(BaseModel):
1616

1717

1818
class ExpressionConfig(BaseModel):
19-
transform: List[Transformation] = Field(description="The transformation expression")
19+
transform: Optional[List[Transformation]] = Field(
20+
description="The transformation expression", default=None
21+
)
2022

2123

2224
class StatelessTransformationConfig(BaseModel):
@@ -50,6 +52,11 @@ def validate(self) -> "StatelessTransformationConfig":
5052
raise ValueError(
5153
"config is required when stateless transformation is enabled"
5254
)
55+
else:
56+
if not self.config.transform:
57+
raise ValueError(
58+
"transform is required when stateless transformation is enabled"
59+
)
5360
return self
5461

5562
def update(

tests/test_dlq.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,45 @@ def test_consume_success(self, dlq, mock_success):
3333
{"id": "msg2", "content": "test message 2"},
3434
]
3535

36+
def test_consume_returns_empty_list_when_no_messages(self, dlq, mock_success):
37+
"""Test DLQ consume returns empty list when there are no messages to consume."""
38+
with mock_success(json_payloads=[[]]) as mock_get:
39+
result = dlq.consume(batch_size=50)
40+
41+
mock_get.assert_called_once_with(
42+
"GET", f"{dlq.endpoint}/consume", params={"batch_size": 50}
43+
)
44+
assert result == []
45+
46+
def test_consume_returns_empty_list_on_204_no_content(self, dlq):
47+
"""Test DLQ consume returns empty list when API responds with 204 No Content."""
48+
mock_response = mock_responses.create_mock_response_factory()(
49+
status_code=204,
50+
json_data=None,
51+
)
52+
with patch("httpx.Client.request", return_value=mock_response) as mock_get:
53+
result = dlq.consume(batch_size=50)
54+
55+
mock_get.assert_called_once_with(
56+
"GET", f"{dlq.endpoint}/consume", params={"batch_size": 50}
57+
)
58+
assert result == []
59+
60+
def test_consume_returns_empty_list_on_200_empty_body(self, dlq):
61+
"""Test DLQ consume returns empty list when API returns 200 with empty body."""
62+
mock_response = mock_responses.create_mock_response_factory()(
63+
status_code=200,
64+
json_data=None,
65+
)
66+
mock_response.content = b""
67+
with patch("httpx.Client.request", return_value=mock_response) as mock_get:
68+
result = dlq.consume(batch_size=50)
69+
70+
mock_get.assert_called_once_with(
71+
"GET", f"{dlq.endpoint}/consume", params={"batch_size": 50}
72+
)
73+
assert result == []
74+
3675
@pytest.mark.parametrize(
3776
"scenario",
3877
[

0 commit comments

Comments
 (0)