Skip to content

Commit 97843f4

Browse files
committed
estuary-cdk: introduce IncrementalCSVProcessor
Many data sources provide CSV data in streaming formats or very large files that cannot be efficiently loaded into memory all at once. Processing these CSV streams while handling edge cases like multi-byte characters split across chunk boundaries can be challenging for connectors. This adds IncrementalCSVProcessor, which can process CSV data from an async byte iterator incrementally, yielding complete records as dictionaries. The processor handles: - Incremental decoding of multi-byte UTF-8 characters that may be split across chunk boundaries using Python's codecs.IncrementalDecoder - Configurable CSV dialects through CSVConfig (delimiters, quote chars, encodings, line terminators, etc.) - Integration with aiocsv for async CSV parsing The implementation uses aiocsv.AsyncDictReader with a custom AsyncByteReader that wraps the incoming byte stream and handles incremental character decoding. This ensures that CSV records are parsed correctly even when the underlying byte chunks don't align with character or record boundaries. This implementation is modeled after the incremental CSV processing developed for source-salesforce-native, but generalized and enhanced for broader use across connectors. The processor has been tested with various chunk sizes, encodings, CSV dialects, and edge cases including very large fields, Unicode content, and malformed data. AI assisted with the initial code draft using Claude AI.
1 parent e45cf08 commit 97843f4

File tree

4 files changed

+695
-24
lines changed

4 files changed

+695
-24
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import aiocsv
2+
import aiocsv.protocols
3+
import codecs
4+
import csv
5+
import sys
6+
from typing import AsyncIterator, Optional, Any
7+
from dataclasses import dataclass
8+
9+
10+
# Python's csv module has a default field size limit of 131,072 bytes,
11+
# and it will raise an _csv.Error exception if a field value is larger
12+
# than that limit. Some users have CSVs with fields larger than 131,072
13+
# bytes, so we max out the limit.
14+
csv.field_size_limit(sys.maxsize)
15+
16+
17+
@dataclass
18+
class CSVConfig:
19+
"""Configuration for CSV processing."""
20+
delimiter: str = ','
21+
quotechar: str = '"'
22+
escapechar: Optional[str] = None
23+
doublequote: bool = True
24+
skipinitialspace: bool = False
25+
lineterminator: str = '\r\n'
26+
quoting: int = 0
27+
strict: bool = True
28+
encoding: str = 'utf-8'
29+
30+
31+
class CSVProcessingError(Exception):
32+
"""Exception raised when CSV processing fails."""
33+
34+
def __init__(self, message: str, config: Optional[dict] = None):
35+
self.config = config
36+
if config:
37+
config_str = ", ".join([f"{k}={repr(v)}" for k, v in config.items()])
38+
message = f"{message}. CSV reader configuration: {config_str}"
39+
super().__init__(message)
40+
41+
42+
class IncrementalCSVProcessor:
43+
"""
44+
Process a stream of CSV bytes incrementally, yielding rows as dictionaries.
45+
46+
This processor handles CSV data that arrives in chunks and uses incremental
47+
decoding to handle multi-byte characters that may be split across
48+
chunk boundaries.
49+
50+
Example usage with default configuration:
51+
```python
52+
async for row in IncrementalCSVProcessor(byte_iterator):
53+
do_something_with(row)
54+
```
55+
56+
Example usage with custom configuration:
57+
```python
58+
# For tab-separated values with different encoding
59+
config = CSVConfig(
60+
delimiter='\t',
61+
encoding='utf-8-sig',
62+
lineterminator='\n'
63+
)
64+
65+
async for row in IncrementalCSVProcessor(byte_iterator, config):
66+
do_something_with(row)
67+
```
68+
"""
69+
70+
def __init__(self, byte_iterator: AsyncIterator[bytes], config: Optional[CSVConfig] = None):
71+
"""
72+
Initialize the processor with byte iterator and optional CSV configuration.
73+
74+
Args:
75+
byte_iterator: Async iterator of CSV byte chunks
76+
config: CSV configuration options
77+
"""
78+
self.byte_iterator = byte_iterator
79+
self.config = config or CSVConfig()
80+
self._row_iterator: Optional[AsyncIterator[dict[str, Any]]] = None
81+
82+
def __aiter__(self) -> AsyncIterator[dict[str, Any]]:
83+
return self
84+
85+
async def __anext__(self) -> dict[str, Any]:
86+
if self._row_iterator is None:
87+
self._row_iterator = self._process_stream()
88+
89+
try:
90+
return await self._row_iterator.__anext__()
91+
except StopAsyncIteration:
92+
raise
93+
94+
async def _process_stream(self) -> AsyncIterator[dict[str, Any]]:
95+
"""
96+
Internal method to process the byte stream and yield CSV records.
97+
98+
Yields:
99+
dict[str, Any]: Complete CSV records as dictionaries
100+
101+
Raises:
102+
CSVProcessingError: When CSV data is malformed or cannot be parsed
103+
"""
104+
105+
class AsyncByteReader(aiocsv.protocols.WithAsyncRead):
106+
"""Internal class that handles incremental decoding for aiocsv."""
107+
108+
def __init__(self, byte_gen: AsyncIterator[bytes], encoding: str):
109+
self.byte_gen = byte_gen
110+
self.decoder = codecs.getincrementaldecoder(encoding)(errors='strict')
111+
self._exhausted = False
112+
113+
async def read(self, size: int = -1) -> str:
114+
"""Read and incrementally decode data from the byte stream."""
115+
if self._exhausted:
116+
return ""
117+
118+
try:
119+
chunk = await self.byte_gen.__anext__()
120+
# Use incremental decoder to handle multi-byte characters
121+
# that may be split across chunk boundaries.
122+
return self.decoder.decode(chunk, final=False)
123+
124+
except StopAsyncIteration:
125+
self._exhausted = True
126+
127+
# Finalize the decoder to get any remaining characters.
128+
# This will raise UnicodeDecodeError if there are incomplete characters.
129+
final_chunk = self.decoder.decode(b'', final=True)
130+
131+
return final_chunk if final_chunk else ""
132+
133+
async_reader = AsyncByteReader(self.byte_iterator, self.config.encoding)
134+
135+
reader_kwargs = {
136+
'delimiter': self.config.delimiter,
137+
'quotechar': self.config.quotechar,
138+
'doublequote': self.config.doublequote,
139+
'skipinitialspace': self.config.skipinitialspace,
140+
'lineterminator': self.config.lineterminator,
141+
'quoting': self.config.quoting,
142+
'strict': self.config.strict,
143+
}
144+
145+
if self.config.escapechar is not None:
146+
reader_kwargs['escapechar'] = self.config.escapechar
147+
148+
try:
149+
async for row in aiocsv.AsyncDictReader(async_reader, **reader_kwargs):
150+
yield row
151+
except csv.Error as e:
152+
raise CSVProcessingError(f"Failed to parse CSV data: {str(e)}", config=reader_kwargs) from e

0 commit comments

Comments
 (0)