Skip to content

Commit 68e118c

Browse files
committed
estuary-cdk: add UnzipStream for stream processing ZIP files
Introduces a new utility that wraps `stream-unzip`, providing the ability to stream decompressed chunks from a ZIP file without loading the entire archive into memory. - Added `stream-unzip` dependency to `pyproject.toml`. - Created `test_unzip_stream.py` to test the `UnzipStream` functionality. - Implemented tests for various zip file contents, including handling of empty lines and nested directories. - Added a test for an empty zip file to ensure proper handling. This was introduced as part of `source-qualtrics` connector to handle survey responses exported from the Qualtrics API.
1 parent 7d61d24 commit 68e118c

File tree

4 files changed

+451
-20
lines changed

4 files changed

+451
-20
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from typing import AsyncGenerator
2+
3+
from stream_unzip import async_stream_unzip
4+
5+
6+
class UnzipStream:
7+
"""
8+
Incrementally decompresses a stream of ZIP file bytes, yielding uncompressed file contents
9+
as bytes in an asynchronous generator.
10+
11+
Example:
12+
```python
13+
async for chunk in UnzipStream(input_bytes, flatten=True):
14+
... # process chunk
15+
```
16+
17+
Example usage with IncrementalJsonProcessor:
18+
```python
19+
async for chunk in UnzipStream(input_bytes):
20+
async for obj in IncrementalJsonProcessor(chunk, "optional_prefix", streamed_item_cls=MyModel):
21+
... # process each parsed object
22+
```
23+
24+
Notes:
25+
- If the ZIP archive is empty, yields nothing.
26+
- If a file in the archive is empty, yields an empty generator for that file.
27+
- Filenames are yielded as bytes (per stream_unzip); decode as needed.
28+
- This is a thin wrapper around stream_unzip.async_stream_unzip and inherits its limitations.
29+
"""
30+
31+
def __init__(self, input: AsyncGenerator[bytes, None]):
32+
self.input = input
33+
self.done = False
34+
self._stream_iterator = None
35+
36+
def __aiter__(self):
37+
return self
38+
39+
async def __anext__(self) -> bytes:
40+
if self._stream_iterator is None:
41+
self._stream_iterator = self._stream()
42+
43+
try:
44+
return await anext(self._stream_iterator)
45+
except StopAsyncIteration:
46+
self.done = True
47+
raise
48+
49+
async def _stream(self) -> bytes:
50+
async for _, _, uncompressed_chunks in async_stream_unzip(self.input):
51+
async for chunk in uncompressed_chunks:
52+
yield chunk

0 commit comments

Comments
 (0)