Skip to content

Commit 1651786

Browse files
Merge pull request #637 from laughingman7743/feature/polars-chunk-processing
Add chunk processing support for PolarsCursor
2 parents 5f75492 + a5a0c79 commit 1651786

File tree

5 files changed

+689
-53
lines changed

5 files changed

+689
-53
lines changed

docs/polars.rst

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,94 @@ SQLAlchemy allows this option to be specified in the connection string.
246246
247247
NOTE: PolarsCursor handles the CSV file on memory. Pay attention to the memory capacity.
248248

249+
Chunksize Options
250+
~~~~~~~~~~~~~~~~~
251+
252+
PolarsCursor supports memory-efficient chunked processing of large query results
253+
using Polars' native lazy evaluation APIs. This allows processing datasets that
254+
are too large to fit in memory.
255+
256+
The chunksize option can be enabled by specifying an integer value in the ``cursor_kwargs``
257+
argument of the connect method or as an argument to the cursor method.
258+
259+
.. code:: python
260+
261+
from pyathena import connect
262+
from pyathena.polars.cursor import PolarsCursor
263+
264+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
265+
region_name="us-west-2",
266+
cursor_class=PolarsCursor,
267+
cursor_kwargs={
268+
"chunksize": 50_000
269+
}).cursor()
270+
271+
.. code:: python
272+
273+
from pyathena import connect
274+
from pyathena.polars.cursor import PolarsCursor
275+
276+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
277+
region_name="us-west-2",
278+
cursor_class=PolarsCursor).cursor(chunksize=50_000)
279+
280+
When the chunksize option is enabled, data is loaded lazily in chunks. This applies
281+
to all data access methods:
282+
283+
**Standard DB-API fetch methods** - ``fetchone()`` and ``fetchmany()`` load data
284+
chunk by chunk as needed, keeping memory usage bounded:
285+
286+
.. code:: python
287+
288+
from pyathena import connect
289+
from pyathena.polars.cursor import PolarsCursor
290+
291+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
292+
region_name="us-west-2",
293+
cursor_class=PolarsCursor).cursor(chunksize=50_000)
294+
295+
cursor.execute("SELECT * FROM large_table")
296+
# Data is loaded in 50,000 row chunks as you iterate
297+
for row in cursor:
298+
process_row(row)
299+
300+
**iter_chunks() method** - Use this when you want to process data as Polars DataFrames
301+
in chunks, which is more efficient for batch processing:
302+
303+
.. code:: python
304+
305+
from pyathena import connect
306+
from pyathena.polars.cursor import PolarsCursor
307+
308+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
309+
region_name="us-west-2",
310+
cursor_class=PolarsCursor).cursor(chunksize=50_000)
311+
312+
cursor.execute("SELECT * FROM large_table")
313+
for chunk in cursor.iter_chunks():
314+
# Process each chunk - chunk is a polars.DataFrame
315+
processed = chunk.group_by('category').agg(pl.sum('value'))
316+
print(f"Processed chunk with {chunk.height} rows")
317+
318+
This method uses Polars' ``scan_csv()`` and ``scan_parquet()`` with ``collect_batches()``
319+
for efficient lazy evaluation, minimizing memory usage when processing large datasets.
320+
321+
The chunked iteration also works with the unload option:
322+
323+
.. code:: python
324+
325+
from pyathena import connect
326+
from pyathena.polars.cursor import PolarsCursor
327+
328+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
329+
region_name="us-west-2",
330+
cursor_class=PolarsCursor).cursor(chunksize=100_000, unload=True)
331+
332+
cursor.execute("SELECT * FROM huge_table")
333+
for chunk in cursor.iter_chunks():
334+
# Process Parquet data in chunks
335+
process_chunk(chunk)
336+
249337
.. _async-polars-cursor:
250338

251339
AsyncPolarsCursor
@@ -414,6 +502,42 @@ As with AsyncPolarsCursor, the unload option is also available.
414502
region_name="us-west-2",
415503
cursor_class=AsyncPolarsCursor).cursor(unload=True)
416504
505+
As with PolarsCursor, the chunksize option is also available for memory-efficient processing.
506+
When chunksize is specified, data is loaded lazily in chunks for both standard fetch methods
507+
and ``iter_chunks()``.
508+
509+
.. code:: python
510+
511+
from pyathena import connect
512+
from pyathena.polars.async_cursor import AsyncPolarsCursor
513+
514+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
515+
region_name="us-west-2",
516+
cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000)
517+
518+
query_id, future = cursor.execute("SELECT * FROM large_table")
519+
result_set = future.result()
520+
521+
# Standard iteration - data loaded in chunks
522+
for row in result_set:
523+
process_row(row)
524+
525+
.. code:: python
526+
527+
from pyathena import connect
528+
from pyathena.polars.async_cursor import AsyncPolarsCursor
529+
530+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
531+
region_name="us-west-2",
532+
cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000)
533+
534+
query_id, future = cursor.execute("SELECT * FROM large_table")
535+
result_set = future.result()
536+
537+
# Process as DataFrame chunks
538+
for chunk in result_set.iter_chunks():
539+
process_chunk(chunk)
540+
417541
.. _`polars.DataFrame object`: https://docs.pola.rs/api/python/stable/reference/dataframe/index.html
418542
.. _`Polars`: https://pola.rs/
419543
.. _`Unload options`: arrow.html#unload-options

pyathena/polars/async_cursor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def __init__(
7373
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
7474
block_size: Optional[int] = None,
7575
cache_type: Optional[str] = None,
76+
chunksize: Optional[int] = None,
7677
**kwargs,
7778
) -> None:
7879
"""Initialize an AsyncPolarsCursor.
@@ -93,10 +94,15 @@ def __init__(
9394
result_reuse_minutes: Minutes to reuse cached results.
9495
block_size: S3 read block size.
9596
cache_type: S3 caching strategy.
97+
chunksize: Number of rows per chunk for memory-efficient processing.
98+
If specified, data is loaded lazily in chunks for all data
99+
access methods including fetchone(), fetchmany(), and iter_chunks().
96100
**kwargs: Additional connection parameters.
97101
98102
Example:
99103
>>> cursor = connection.cursor(AsyncPolarsCursor, unload=True)
104+
>>> # With chunked processing
105+
>>> cursor = connection.cursor(AsyncPolarsCursor, chunksize=50000)
100106
"""
101107
super().__init__(
102108
s3_staging_dir=s3_staging_dir,
@@ -116,6 +122,7 @@ def __init__(
116122
self._unload = unload
117123
self._block_size = block_size
118124
self._cache_type = cache_type
125+
self._chunksize = chunksize
119126

120127
@staticmethod
121128
def get_default_converter(
@@ -172,6 +179,7 @@ def _collect_result_set(
172179
block_size=self._block_size,
173180
cache_type=self._cache_type,
174181
max_workers=self._max_workers,
182+
chunksize=self._chunksize,
175183
**kwargs,
176184
)
177185

pyathena/polars/cursor.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,18 @@
33

44
import logging
55
from multiprocessing import cpu_count
6-
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union, cast
6+
from typing import (
7+
TYPE_CHECKING,
8+
Any,
9+
Callable,
10+
Dict,
11+
Iterator,
12+
List,
13+
Optional,
14+
Tuple,
15+
Union,
16+
cast,
17+
)
718

819
from pyathena.common import BaseCursor, CursorIterator
920
from pyathena.error import OperationalError, ProgrammingError
@@ -74,6 +85,7 @@ def __init__(
7485
block_size: Optional[int] = None,
7586
cache_type: Optional[str] = None,
7687
max_workers: int = (cpu_count() or 1) * 5,
88+
chunksize: Optional[int] = None,
7789
**kwargs,
7890
) -> None:
7991
"""Initialize a PolarsCursor.
@@ -94,10 +106,15 @@ def __init__(
94106
block_size: S3 read block size.
95107
cache_type: S3 caching strategy.
96108
max_workers: Maximum worker threads for parallel S3 operations.
109+
chunksize: Number of rows per chunk for memory-efficient processing.
110+
If specified, data is loaded lazily in chunks for all data
111+
access methods including fetchone(), fetchmany(), and iter_chunks().
97112
**kwargs: Additional connection parameters.
98113
99114
Example:
100115
>>> cursor = connection.cursor(PolarsCursor, unload=True)
116+
>>> # With chunked processing
117+
>>> cursor = connection.cursor(PolarsCursor, chunksize=50000)
101118
"""
102119
super().__init__(
103120
s3_staging_dir=s3_staging_dir,
@@ -117,6 +134,7 @@ def __init__(
117134
self._block_size = block_size
118135
self._cache_type = cache_type
119136
self._max_workers = max_workers
137+
self._chunksize = chunksize
120138
self._query_id: Optional[str] = None
121139
self._result_set: Optional[AthenaPolarsResultSet] = None
122140

@@ -272,6 +290,7 @@ def execute(
272290
block_size=self._block_size,
273291
cache_type=self._cache_type,
274292
max_workers=self._max_workers,
293+
chunksize=self._chunksize,
275294
**kwargs,
276295
)
277296
else:
@@ -404,3 +423,37 @@ def as_arrow(self) -> "Table":
404423
raise ProgrammingError("No result set.")
405424
result_set = cast(AthenaPolarsResultSet, self.result_set)
406425
return result_set.as_arrow()
426+
427+
def iter_chunks(self) -> Iterator["pl.DataFrame"]:
428+
"""Iterate over result chunks as Polars DataFrames.
429+
430+
This method provides an iterator interface for processing result sets.
431+
When chunksize is specified, it yields DataFrames in chunks using lazy
432+
evaluation for memory-efficient processing. When chunksize is not specified,
433+
it yields the entire result as a single DataFrame, providing a consistent
434+
interface regardless of chunking configuration.
435+
436+
Yields:
437+
Polars DataFrame for each chunk of rows, or the entire DataFrame
438+
if chunksize was not specified.
439+
440+
Raises:
441+
ProgrammingError: If no result set is available.
442+
443+
Example:
444+
>>> # With chunking for large datasets
445+
>>> cursor = connection.cursor(PolarsCursor, chunksize=50000)
446+
>>> cursor.execute("SELECT * FROM large_table")
447+
>>> for chunk in cursor.iter_chunks():
448+
... process_chunk(chunk) # Each chunk is a Polars DataFrame
449+
>>>
450+
>>> # Without chunking - yields entire result as single chunk
451+
>>> cursor = connection.cursor(PolarsCursor)
452+
>>> cursor.execute("SELECT * FROM small_table")
453+
>>> for df in cursor.iter_chunks():
454+
... process(df) # Single DataFrame with all data
455+
"""
456+
if not self.has_result_set:
457+
raise ProgrammingError("No result set.")
458+
result_set = cast(AthenaPolarsResultSet, self.result_set)
459+
yield from result_set.iter_chunks()

0 commit comments

Comments
 (0)