Skip to content

Commit b45c434

Browse files
Add S3FS Cursor for lightweight CSV-based result reading
Implements Issue #272: Add a new cursor type that reads CSV results from S3 using Python's standard csv module and PyAthena's S3FileSystem, without requiring pandas or pyarrow dependencies. New features: - S3FSCursor: Synchronous cursor for reading CSV/TXT results from S3 - AsyncS3FSCursor: Asynchronous cursor using concurrent.futures - AthenaS3FSResultSet: Streaming CSV reader with type conversion - DefaultS3FSTypeConverter: Type converter for CSV-based results - SQLAlchemy dialect: awsathena+s3fs:// connection URL support Also adds rowcount property to WithResultSet mixin for CTAS support, benefiting all cursor types (base, pandas, arrow, s3fs). Closes #272 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 8d4e41c commit b45c434

File tree

16 files changed

+1388
-0
lines changed

16 files changed

+1388
-0
lines changed

pyathena/result_set.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,3 +736,16 @@ def result_reuse_minutes(self) -> Optional[int]:
736736
if not self.result_set:
737737
return None
738738
return self.result_set.result_reuse_minutes
739+
740+
@property
741+
def rowcount(self) -> int:
742+
"""Get the number of rows affected by the last operation.
743+
744+
For SELECT statements, this returns -1 as per DB API 2.0 specification.
745+
For DML operations (INSERT, UPDATE, DELETE) and CTAS, this returns
746+
the number of affected rows.
747+
748+
Returns:
749+
The number of rows, or -1 if not applicable or unknown.
750+
"""
751+
return self.result_set.rowcount if self.result_set else -1

pyathena/s3fs/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# -*- coding: utf-8 -*-

pyathena/s3fs/async_cursor.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
import logging
5+
from concurrent.futures import Future
6+
from multiprocessing import cpu_count
7+
from typing import Any, Dict, List, Optional, Tuple, Union, cast
8+
9+
from pyathena.async_cursor import AsyncCursor
10+
from pyathena.common import CursorIterator
11+
from pyathena.error import ProgrammingError
12+
from pyathena.model import AthenaQueryExecution
13+
from pyathena.s3fs.converter import DefaultS3FSTypeConverter
14+
from pyathena.s3fs.result_set import AthenaS3FSResultSet
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
class AsyncS3FSCursor(AsyncCursor):
20+
"""Asynchronous cursor that reads CSV results via S3FileSystem.
21+
22+
This cursor extends AsyncCursor to provide asynchronous query execution
23+
with results read via Python's standard csv module and PyAthena's S3FileSystem.
24+
It's a lightweight alternative when pandas/pyarrow are not needed.
25+
26+
Features:
27+
- Asynchronous query execution with concurrent futures
28+
- Uses Python's standard csv module for parsing
29+
- Uses PyAthena's S3FileSystem for S3 access
30+
- No external dependencies beyond boto3
31+
- Memory-efficient streaming for large datasets
32+
33+
Attributes:
34+
arraysize: Number of rows to fetch per batch (configurable).
35+
36+
Example:
37+
>>> from pyathena.s3fs.async_cursor import AsyncS3FSCursor
38+
>>>
39+
>>> cursor = connection.cursor(AsyncS3FSCursor)
40+
>>> query_id, future = cursor.execute("SELECT * FROM my_table")
41+
>>>
42+
>>> # Get result when ready
43+
>>> result_set = future.result()
44+
>>> rows = result_set.fetchall()
45+
46+
Note:
47+
This cursor does not require pandas or pyarrow.
48+
"""
49+
50+
def __init__(
51+
self,
52+
s3_staging_dir: Optional[str] = None,
53+
schema_name: Optional[str] = None,
54+
catalog_name: Optional[str] = None,
55+
work_group: Optional[str] = None,
56+
poll_interval: float = 1,
57+
encryption_option: Optional[str] = None,
58+
kms_key: Optional[str] = None,
59+
kill_on_interrupt: bool = True,
60+
max_workers: int = (cpu_count() or 1) * 5,
61+
arraysize: int = CursorIterator.DEFAULT_FETCH_SIZE,
62+
result_reuse_enable: bool = False,
63+
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
64+
**kwargs,
65+
) -> None:
66+
"""Initialize an AsyncS3FSCursor.
67+
68+
Args:
69+
s3_staging_dir: S3 location for query results.
70+
schema_name: Default schema name.
71+
catalog_name: Default catalog name.
72+
work_group: Athena workgroup name.
73+
poll_interval: Query status polling interval in seconds.
74+
encryption_option: S3 encryption option (SSE_S3, SSE_KMS, CSE_KMS).
75+
kms_key: KMS key ARN for encryption.
76+
kill_on_interrupt: Cancel running query on keyboard interrupt.
77+
max_workers: Maximum number of workers for concurrent execution.
78+
arraysize: Number of rows to fetch per batch.
79+
result_reuse_enable: Enable Athena query result reuse.
80+
result_reuse_minutes: Minutes to reuse cached results.
81+
**kwargs: Additional connection parameters.
82+
83+
Example:
84+
>>> cursor = connection.cursor(AsyncS3FSCursor)
85+
>>> query_id, future = cursor.execute("SELECT * FROM my_table")
86+
"""
87+
super().__init__(
88+
s3_staging_dir=s3_staging_dir,
89+
schema_name=schema_name,
90+
catalog_name=catalog_name,
91+
work_group=work_group,
92+
poll_interval=poll_interval,
93+
encryption_option=encryption_option,
94+
kms_key=kms_key,
95+
kill_on_interrupt=kill_on_interrupt,
96+
max_workers=max_workers,
97+
arraysize=arraysize,
98+
result_reuse_enable=result_reuse_enable,
99+
result_reuse_minutes=result_reuse_minutes,
100+
**kwargs,
101+
)
102+
103+
@staticmethod
104+
def get_default_converter(
105+
unload: bool = False, # noqa: ARG004
106+
) -> DefaultS3FSTypeConverter:
107+
"""Get the default type converter for S3FS cursor.
108+
109+
Args:
110+
unload: Unused. S3FS cursor does not support UNLOAD operations.
111+
112+
Returns:
113+
DefaultS3FSTypeConverter instance.
114+
"""
115+
return DefaultS3FSTypeConverter()
116+
117+
@property
118+
def arraysize(self) -> int:
119+
"""Get the number of rows to fetch at a time."""
120+
return self._arraysize
121+
122+
@arraysize.setter
123+
def arraysize(self, value: int) -> None:
124+
"""Set the number of rows to fetch at a time.
125+
126+
Args:
127+
value: Number of rows (must be positive).
128+
129+
Raises:
130+
ProgrammingError: If value is not positive.
131+
"""
132+
if value <= 0:
133+
raise ProgrammingError("arraysize must be a positive integer value.")
134+
self._arraysize = value
135+
136+
def _collect_result_set(
137+
self,
138+
query_id: str,
139+
kwargs: Optional[Dict[str, Any]] = None,
140+
) -> AthenaS3FSResultSet:
141+
"""Collect result set after query execution.
142+
143+
Args:
144+
query_id: The Athena query execution ID.
145+
kwargs: Additional keyword arguments for result set.
146+
147+
Returns:
148+
AthenaS3FSResultSet containing the query results.
149+
"""
150+
if kwargs is None:
151+
kwargs = {}
152+
query_execution = cast(AthenaQueryExecution, self._poll(query_id))
153+
return AthenaS3FSResultSet(
154+
connection=self._connection,
155+
converter=self._converter,
156+
query_execution=query_execution,
157+
arraysize=self._arraysize,
158+
retry_config=self._retry_config,
159+
**kwargs,
160+
)
161+
162+
def execute(
163+
self,
164+
operation: str,
165+
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
166+
work_group: Optional[str] = None,
167+
s3_staging_dir: Optional[str] = None,
168+
cache_size: Optional[int] = 0,
169+
cache_expiration_time: Optional[int] = 0,
170+
result_reuse_enable: Optional[bool] = None,
171+
result_reuse_minutes: Optional[int] = None,
172+
paramstyle: Optional[str] = None,
173+
**kwargs,
174+
) -> Tuple[str, "Future[Union[AthenaS3FSResultSet, Any]]"]:
175+
"""Execute a SQL query asynchronously.
176+
177+
Submits the query to Athena and returns immediately with a query ID
178+
and a Future that will contain the result set when complete.
179+
180+
Args:
181+
operation: SQL query string to execute.
182+
parameters: Query parameters for parameterized queries.
183+
work_group: Athena workgroup to use for this query.
184+
s3_staging_dir: S3 location for query results.
185+
cache_size: Number of queries to check for result caching.
186+
cache_expiration_time: Cache expiration time in seconds.
187+
result_reuse_enable: Enable Athena result reuse for this query.
188+
result_reuse_minutes: Minutes to reuse cached results.
189+
paramstyle: Parameter style ('qmark' or 'pyformat').
190+
**kwargs: Additional execution parameters.
191+
192+
Returns:
193+
Tuple of (query_id, Future[AthenaS3FSResultSet]).
194+
195+
Example:
196+
>>> query_id, future = cursor.execute("SELECT * FROM my_table")
197+
>>> result_set = future.result()
198+
>>> rows = result_set.fetchall()
199+
"""
200+
query_id = self._execute(
201+
operation,
202+
parameters=parameters,
203+
work_group=work_group,
204+
s3_staging_dir=s3_staging_dir,
205+
cache_size=cache_size,
206+
cache_expiration_time=cache_expiration_time,
207+
result_reuse_enable=result_reuse_enable,
208+
result_reuse_minutes=result_reuse_minutes,
209+
paramstyle=paramstyle,
210+
)
211+
return (
212+
query_id,
213+
self._executor.submit(
214+
self._collect_result_set,
215+
query_id,
216+
kwargs,
217+
),
218+
)

pyathena/s3fs/converter.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
import logging
5+
from copy import deepcopy
6+
from typing import Any, Optional
7+
8+
from pyathena.converter import (
9+
_DEFAULT_CONVERTERS,
10+
Converter,
11+
_to_default,
12+
)
13+
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
class DefaultS3FSTypeConverter(Converter):
18+
"""Type converter for S3FS Cursor results.
19+
20+
This converter is specifically designed for the S3FSCursor and provides
21+
type conversion for CSV-based result files read via the S3 FileSystem.
22+
It converts Athena data types to Python types using the standard
23+
converter mappings.
24+
25+
The converter uses the same mappings as DefaultTypeConverter, providing
26+
consistent behavior with the standard Cursor while using the S3FileSystem
27+
for file access.
28+
29+
Example:
30+
>>> from pyathena.s3fs.converter import DefaultS3FSTypeConverter
31+
>>> converter = DefaultS3FSTypeConverter()
32+
>>>
33+
>>> # Used automatically by S3FSCursor
34+
>>> cursor = connection.cursor(S3FSCursor)
35+
>>> # converter is applied automatically to results
36+
37+
Note:
38+
This converter is used by default in S3FSCursor.
39+
Most users don't need to instantiate it directly.
40+
"""
41+
42+
def __init__(self) -> None:
43+
super().__init__(
44+
mappings=deepcopy(_DEFAULT_CONVERTERS),
45+
default=_to_default,
46+
)
47+
48+
def convert(self, type_: str, value: Optional[str]) -> Optional[Any]:
49+
converter = self.get(type_)
50+
return converter(value)

0 commit comments

Comments
 (0)