Skip to content

Commit 7d61d24

Browse files
committed
estuary-cdk: add support for NDJSON processing in IncrementalJsonProcessor
1 parent 2cfd74c commit 7d61d24

File tree

2 files changed

+154
-19
lines changed

2 files changed

+154
-19
lines changed

estuary-cdk/estuary_cdk/incremental_json_processor.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,16 @@ class _NoopRemainder(BaseModel):
1515

1616
class IncrementalJsonProcessor(Generic[StreamedItem, Remainder]):
1717
"""
18-
Processes a stream of JSON bytes incrementally, yielding objects of type
19-
`streamed_item_cls` along the way. Once iteration is complete, the
18+
Processes a stream of JSON or NDJSON bytes incrementally, yielding objects of
19+
type `streamed_item_cls` along the way. Once iteration is complete, the
2020
"remainder" of the input that was not present under the specific prefix can
2121
be obtained using the `get_remainder` method.
2222
2323
The prefix is a path within the JSON document where objects to parse reside.
2424
Usually it will end with the ".item" suffix, which allows iteration through
25-
objects in an array. More nuanced scenarios are also possible.
26-
27-
Does not currently support parsing NDJSON since that can be more efficiently
28-
processed via other means.
25+
objects in an array. Prefixes can be used with NDJSON to extract specific
26+
nested objects from each newline-delimited JSON object. More nuanced
27+
scenarios are also possible.
2928
3029
Example usage if you only want the items and don't care about the remainder:
3130
```python
@@ -37,6 +36,16 @@ class IncrementalJsonProcessor(Generic[StreamedItem, Remainder]):
3736
do_something_with(item)
3837
```
3938
39+
Example usage for NDJSON with no prefix:
40+
```python
41+
async for item in IncrementalJsonProcessor(
42+
input,
43+
prefix="", # Use "" to yield the whole object per line
44+
streamed_item_cls=MyItem,
45+
):
46+
do_something_with(item)
47+
```
48+
4049
For using the remainder after iteration is complete, keep a reference to the
4150
processor:
4251
```python
@@ -47,7 +56,7 @@ class IncrementalJsonProcessor(Generic[StreamedItem, Remainder]):
4756
remainder_cls=MyRemainder,
4857
)
4958
50-
async for item processor:
59+
async for item in processor:
5160
do_something_with(item)
5261
5362
do_something_with(processor.get_remainder())
@@ -69,7 +78,10 @@ def __init__(
6978
self.prefix = prefix
7079
self.streamed_item_cls = streamed_item_cls
7180
self.remainder_cls = remainder_cls
72-
self.parser = ijson.parse_async(_AsyncStreamWrapper(input))
81+
self.parser = ijson.parse_async(
82+
_AsyncStreamWrapper(input),
83+
multiple_values=True,
84+
)
7385
self.remainder_builder = _ObjectBuilder()
7486
self.done = False
7587

@@ -121,18 +133,31 @@ class _AsyncStreamWrapper:
121133
def __init__(self, gen: AsyncGenerator[bytes, None]):
122134
self.gen = gen
123135
self.buf = b""
136+
self.had_data = False
137+
self.eof = False
124138

125139
async def read(self, size: int = -1) -> bytes:
126140
if size == -1:
127-
return self.buf + b"".join([chunk async for chunk in self.gen])
128-
129-
while len(self.buf) < size:
130-
try:
131-
self.buf += await anext(self.gen)
132-
except StopAsyncIteration:
133-
break
141+
data = self.buf + b"".join([chunk async for chunk in self.gen])
142+
self.eof = True
143+
else:
144+
while len(self.buf) < size:
145+
try:
146+
self.buf += await anext(self.gen)
147+
except StopAsyncIteration:
148+
self.eof = True
149+
break
134150

135151
data, self.buf = self.buf[:size], self.buf[size:]
152+
if not self.had_data:
153+
data = data.lstrip(b" \t\n\r")
154+
155+
if self.eof and not data and not self.had_data:
156+
raise StopAsyncIteration
157+
158+
if not self.had_data and data:
159+
self.had_data = True
160+
136161
return data
137162

138163

estuary-cdk/tests/test_incremental_json_processor.py

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import AsyncGenerator
22

33
import pytest
4-
from pydantic import BaseModel, TypeAdapter
4+
from pydantic import BaseModel
5+
from ijson.common import IncompleteJSONError
56

67
from estuary_cdk.incremental_json_processor import IncrementalJsonProcessor
78

@@ -238,17 +239,106 @@ class Timestamp(BaseModel):
238239
],
239240
None,
240241
),
242+
# NDJSON (Newline Delimited JSON)
243+
(
244+
b"""{"id": 1, "value": "test1"}
245+
{"id": 2, "value": "test2"}
246+
{"id": 3, "value": "test3"}""",
247+
"", # Note: No prefix typically for NDJSON if you want the whole record.
248+
SimpleRecord,
249+
None,
250+
[
251+
SimpleRecord(id=1, value="test1"),
252+
SimpleRecord(id=2, value="test2"),
253+
SimpleRecord(id=3, value="test3"),
254+
],
255+
None,
256+
),
257+
(
258+
b"""{"record": {"id": 1, "value": "test1"}}
259+
{"record": {"id": 2, "value": "test2"}}
260+
{"not_a_record": {"something": "else"}}
261+
{"record": {"id": 3, "value": "test3"}}""",
262+
"record", # Note: Prefix here is the key to the object in NDJSON.
263+
SimpleRecord,
264+
None,
265+
[
266+
SimpleRecord(id=1, value="test1"),
267+
SimpleRecord(id=2, value="test2"),
268+
SimpleRecord(id=3, value="test3"),
269+
],
270+
None,
271+
),
272+
# emptry NDJSON
273+
(
274+
b"",
275+
"",
276+
SimpleRecord,
277+
None,
278+
[],
279+
None,
280+
),
281+
# whitespace only NDJSON
282+
(
283+
b" \n\t\n ",
284+
"",
285+
SimpleRecord,
286+
None,
287+
[],
288+
None,
289+
),
290+
# NDJSON with trailing newline
291+
(
292+
b"""{"id": 1, "value": "test1"}
293+
{"id": 2, "value": "test2"}
294+
295+
""",
296+
"",
297+
SimpleRecord,
298+
None,
299+
[
300+
SimpleRecord(id=1, value="test1"),
301+
SimpleRecord(id=2, value="test2"),
302+
],
303+
None,
304+
),
305+
# NDJSON with multiple trailing newlines
306+
(
307+
b"""{"id": 1, "value": "test1"}
308+
309+
310+
""",
311+
"",
312+
SimpleRecord,
313+
None,
314+
[
315+
SimpleRecord(id=1, value="test1"),
316+
],
317+
None,
318+
),
241319
],
242320
)
243321
async def test_incremental_json_processor(
244-
input_data, prefix, record_cls, remainder_cls, want_records, want_remainder
322+
input_data,
323+
prefix,
324+
record_cls,
325+
remainder_cls,
326+
want_records,
327+
want_remainder,
245328
):
246329
if remainder_cls:
247330
processor = IncrementalJsonProcessor(
248-
bytes_gen(input_data), prefix, record_cls, remainder_cls
331+
bytes_gen(input_data),
332+
prefix,
333+
record_cls,
334+
remainder_cls,
249335
)
250336
else:
251-
processor = IncrementalJsonProcessor(bytes_gen(input_data), prefix, record_cls)
337+
processor = IncrementalJsonProcessor(
338+
bytes_gen(input_data),
339+
prefix,
340+
record_cls,
341+
)
252342

253343
got = []
254344
async for record in processor:
@@ -257,3 +347,23 @@ async def test_incremental_json_processor(
257347
assert want_records == got
258348
if remainder_cls:
259349
assert want_remainder == processor.get_remainder()
350+
351+
352+
@pytest.mark.asyncio
353+
async def test_malformed_ndjson_fails():
354+
# When ijson encounters malformed JSON in NDJSON mode, it fails immediately
355+
# This is the expected behavior - we want to fail fast on malformed data
356+
malformed_data = b"""
357+
{"id": 1, "value": "test1"}
358+
{"id": 2, "value": "test2"
359+
{"id": 3, "value": "test3"}"""
360+
361+
processor = IncrementalJsonProcessor(
362+
bytes_gen(malformed_data),
363+
"",
364+
SimpleRecord,
365+
)
366+
367+
with pytest.raises(IncompleteJSONError):
368+
async for record in processor:
369+
pass # We expect the error before any records are yielded due to ijson's buffering

0 commit comments

Comments
 (0)