Skip to content

Commit 32f0c84

Browse files
Add pluggable CSV reader architecture for S3FSCursor
- Add AthenaCSVReader (default): Custom parser that distinguishes NULL (unquoted empty) from empty string (quoted empty "") - Add DefaultCSVReader: Python's standard csv module wrapper for backward compatibility (both NULL and empty string become empty string) - Support multi-line quoted fields in AthenaCSVReader with optimized incremental quote state tracking (O(n) complexity) - Add csv_reader parameter to S3FSCursor and AsyncS3FSCursor - Refactor result_set.py to remove unnecessary instance variables - Move header skipping to _init_csv_reader() for cleaner initialization - Update documentation with CSV reader options and NULL handling details - Add comprehensive unit tests for both CSV readers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 7807aa6 commit 32f0c84

File tree

8 files changed

+783
-33
lines changed

8 files changed

+783
-33
lines changed

docs/api/s3fs.rst

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
S3FS Integration
44
================
55

6-
This section covers lightweight S3FS-based cursors and data converters that use Python's built-in ``csv`` module.
6+
This section covers lightweight S3FS-based cursors, CSV readers, and data converters.
77

88
S3FS Cursors
99
------------
@@ -16,6 +16,18 @@ S3FS Cursors
1616
:members:
1717
:inherited-members:
1818

19+
S3FS CSV Readers
20+
----------------
21+
22+
S3FSCursor supports pluggable CSV reader implementations to control how NULL values
23+
and empty strings are handled when parsing Athena's CSV output.
24+
25+
.. autoclass:: pyathena.s3fs.reader.AthenaCSVReader
26+
:members:
27+
28+
.. autoclass:: pyathena.s3fs.reader.DefaultCSVReader
29+
:members:
30+
1931
S3FS Data Converters
2032
--------------------
2133

docs/s3fs.rst

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ S3FSCursor
99
----------
1010

1111
S3FSCursor is a lightweight cursor that directly handles the CSV file of the query execution result output to S3.
12-
Unlike ArrowCursor or PandasCursor, this cursor uses Python's built-in ``csv`` module to parse results,
13-
making it ideal for environments where installing pandas or pyarrow is not desirable.
12+
Unlike ArrowCursor or PandasCursor, this cursor does not require pandas or pyarrow dependencies,
13+
making it ideal for environments where installing these libraries is not desirable.
1414

1515
**Key features:**
1616

1717
- No pandas or pyarrow dependencies required
18-
- Uses Python's built-in ``csv`` module for parsing
18+
- Lightweight CSV parsing (custom parser or Python's built-in ``csv`` module)
1919
- Lower memory footprint for simple query results
2020
- Full DB API 2.0 compatibility
2121

@@ -172,6 +172,83 @@ Then specify an instance of this class in the converter argument when creating a
172172
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
173173
region_name="us-west-2").cursor(S3FSCursor, converter=CustomS3FSTypeConverter())
174174
175+
CSV Reader Options
176+
~~~~~~~~~~~~~~~~~~
177+
178+
S3FSCursor supports pluggable CSV reader implementations to control how NULL values and empty strings
179+
are handled. Two readers are provided:
180+
181+
- ``AthenaCSVReader`` (default): Custom parser that distinguishes between NULL and empty string
182+
- ``DefaultCSVReader``: Uses Python's built-in ``csv`` module (treats both NULL and empty string as empty string)
183+
184+
**Default behavior (AthenaCSVReader):**
185+
186+
By default, ``AthenaCSVReader`` is used, which correctly distinguishes between NULL
187+
values and empty strings in query results.
188+
189+
.. code:: python
190+
191+
from pyathena import connect
192+
from pyathena.s3fs.cursor import S3FSCursor
193+
194+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
195+
region_name="us-west-2",
196+
cursor_class=S3FSCursor).cursor()
197+
198+
cursor.execute("SELECT NULL AS null_col, '' AS empty_col")
199+
row = cursor.fetchone()
200+
print(row) # (None, '') - NULL is None, empty string is ''
201+
202+
**Switching to Python's built-in csv module (DefaultCSVReader):**
203+
204+
If you prefer to use Python's built-in ``csv`` module, you can switch to ``DefaultCSVReader``.
205+
Note that this reader cannot distinguish between NULL and empty string - both become empty strings
206+
in the parsed result, which are then converted to ``None`` by the type converter.
207+
208+
.. code:: python
209+
210+
from pyathena import connect
211+
from pyathena.s3fs.cursor import S3FSCursor
212+
from pyathena.s3fs.reader import DefaultCSVReader
213+
214+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
215+
region_name="us-west-2",
216+
cursor_class=S3FSCursor,
217+
cursor_kwargs={"csv_reader": DefaultCSVReader}).cursor()
218+
219+
cursor.execute("SELECT NULL AS null_col, '' AS empty_col")
220+
row = cursor.fetchone()
221+
print(row) # (None, None) - Both NULL and empty string become None
222+
223+
**Comparison of CSV readers:**
224+
225+
.. list-table:: CSV Reader Behavior
226+
:header-rows: 1
227+
:widths: 30 20 25 25
228+
229+
* - Reader
230+
- Implementation
231+
- NULL value
232+
- Empty string
233+
* - AthenaCSVReader (default)
234+
- Custom parser
235+
- None
236+
- '' (empty string)
237+
* - DefaultCSVReader
238+
- Python csv module
239+
- None
240+
- None
241+
242+
**Why the difference?**
243+
244+
Athena's CSV output format distinguishes between NULL values and empty strings:
245+
246+
- NULL: unquoted empty field (e.g., ``a,,b`` → the middle field is NULL)
247+
- Empty string: quoted empty field (e.g., ``a,"",b`` → the middle field is an empty string)
248+
249+
Python's standard ``csv`` module parses both cases as empty strings, losing this distinction.
250+
The ``AthenaCSVReader`` implements a custom parser that preserves the difference.
251+
175252
Limitations
176253
~~~~~~~~~~~
177254

pyathena/s3fs/async_cursor.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pyathena.error import ProgrammingError
1212
from pyathena.model import AthenaQueryExecution
1313
from pyathena.s3fs.converter import DefaultS3FSTypeConverter
14-
from pyathena.s3fs.result_set import AthenaS3FSResultSet
14+
from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType
1515

1616
_logger = logging.getLogger(__name__)
1717

@@ -20,12 +20,12 @@ class AsyncS3FSCursor(AsyncCursor):
2020
"""Asynchronous cursor that reads CSV results via S3FileSystem.
2121
2222
This cursor extends AsyncCursor to provide asynchronous query execution
23-
with results read via Python's standard csv module and PyAthena's S3FileSystem.
23+
with results read via PyAthena's S3FileSystem.
2424
It's a lightweight alternative when pandas/pyarrow are not needed.
2525
2626
Features:
2727
- Asynchronous query execution with concurrent futures
28-
- Uses Python's standard csv module for parsing
28+
- Lightweight CSV parsing via pluggable readers
2929
- Uses PyAthena's S3FileSystem for S3 access
3030
- No external dependencies beyond boto3
3131
- Memory-efficient streaming for large datasets
@@ -61,6 +61,7 @@ def __init__(
6161
arraysize: int = CursorIterator.DEFAULT_FETCH_SIZE,
6262
result_reuse_enable: bool = False,
6363
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
64+
csv_reader: Optional[CSVReaderType] = None,
6465
**kwargs,
6566
) -> None:
6667
"""Initialize an AsyncS3FSCursor.
@@ -78,6 +79,11 @@ def __init__(
7879
arraysize: Number of rows to fetch per batch.
7980
result_reuse_enable: Enable Athena query result reuse.
8081
result_reuse_minutes: Minutes to reuse cached results.
82+
csv_reader: CSV reader class to use for parsing results.
83+
Use AthenaCSVReader (default) to distinguish between NULL
84+
(unquoted empty) and empty string (quoted empty "").
85+
Use DefaultCSVReader for backward compatibility where empty
86+
strings are treated as NULL.
8187
**kwargs: Additional connection parameters.
8288
8389
Example:
@@ -99,6 +105,7 @@ def __init__(
99105
result_reuse_minutes=result_reuse_minutes,
100106
**kwargs,
101107
)
108+
self._csv_reader = csv_reader
102109

103110
@staticmethod
104111
def get_default_converter(
@@ -156,6 +163,7 @@ def _collect_result_set(
156163
query_execution=query_execution,
157164
arraysize=self._arraysize,
158165
retry_config=self._retry_config,
166+
csv_reader=self._csv_reader,
159167
**kwargs,
160168
)
161169

pyathena/s3fs/cursor.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from pyathena.model import AthenaQueryExecution
1010
from pyathena.result_set import WithResultSet
1111
from pyathena.s3fs.converter import DefaultS3FSTypeConverter
12-
from pyathena.s3fs.result_set import AthenaS3FSResultSet
12+
from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType
1313

1414
_logger = logging.getLogger(__name__)
1515

@@ -59,6 +59,7 @@ def __init__(
5959
result_reuse_enable: bool = False,
6060
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
6161
on_start_query_execution: Optional[Callable[[str], None]] = None,
62+
csv_reader: Optional[CSVReaderType] = None,
6263
**kwargs,
6364
) -> None:
6465
"""Initialize an S3FSCursor.
@@ -75,11 +76,20 @@ def __init__(
7576
result_reuse_enable: Enable Athena query result reuse.
7677
result_reuse_minutes: Minutes to reuse cached results.
7778
on_start_query_execution: Callback invoked when query starts.
79+
csv_reader: CSV reader class to use for parsing results.
80+
Use AthenaCSVReader (default) to distinguish between NULL
81+
(unquoted empty) and empty string (quoted empty "").
82+
Use DefaultCSVReader for backward compatibility where empty
83+
strings are treated as NULL.
7884
**kwargs: Additional connection parameters.
7985
8086
Example:
8187
>>> cursor = connection.cursor(S3FSCursor)
8288
>>> cursor.execute("SELECT * FROM my_table")
89+
>>>
90+
>>> # Use DefaultCSVReader for backward compatibility
91+
>>> from pyathena.s3fs.reader import DefaultCSVReader
92+
>>> cursor = connection.cursor(S3FSCursor, csv_reader=DefaultCSVReader)
8393
"""
8494
super().__init__(
8595
s3_staging_dir=s3_staging_dir,
@@ -95,6 +105,7 @@ def __init__(
95105
**kwargs,
96106
)
97107
self._on_start_query_execution = on_start_query_execution
108+
self._csv_reader = csv_reader
98109
self._query_id: Optional[str] = None
99110
self._result_set: Optional[AthenaS3FSResultSet] = None
100111

@@ -232,6 +243,7 @@ def execute(
232243
query_execution=query_execution,
233244
arraysize=self.arraysize,
234245
retry_config=self._retry_config,
246+
csv_reader=self._csv_reader,
235247
**kwargs,
236248
)
237249
else:

0 commit comments

Comments
 (0)