Skip to content

Commit 37449e5

Browse files
Merge pull request #21 from laughingman7743/async_cursor
Implement asynchronus cursor (refs #19)
2 parents 576c568 + 24ab2b8 commit 37449e5

File tree

12 files changed

+1065
-395
lines changed

12 files changed

+1065
-395
lines changed

README.rst

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,135 @@ As Pandas DataFrame:
171171
df = as_pandas(cursor)
172172
print(df.describe())
173173
174+
Asynchronous Cursor
175+
~~~~~~~~~~~~~~~~~~~
176+
177+
Asynchronous cursor is a simple implementation using the concurrent.futures package.
178+
Python 2.7 uses `backport of the concurrent.futures`_ package.
179+
This cursor is not `DB API 2.0 (PEP 249)`_ compliant.
180+
181+
You can use the asynchronous cursor by specifying the ``cursor_class``
182+
with the connect method or connection object.
183+
184+
.. code:: python
185+
186+
from pyathena import connect
187+
from pyathena.async_cursor import AsyncCursor
188+
189+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
190+
region_name='us-west-2',
191+
cursor_class=AsyncCursor).cursor()
192+
193+
.. code:: python
194+
195+
from pyathena import connect
196+
from pyathena.async_cursor import AsyncCursor
197+
198+
cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
199+
region_name='us-west-2',
200+
cursor_class=AsyncCursor).cursor()
201+
202+
It can also be used by specifying the cursor class when calling the connection object's cursor method.
203+
204+
.. code:: python
205+
206+
from pyathena import connect
207+
from pyathena.async_cursor import AsyncCursor
208+
209+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
210+
region_name='us-west-2').cursor(AsyncCursor)
211+
212+
.. code:: python
213+
214+
from pyathena import connect
215+
from pyathena.async_cursor import AsyncCursor
216+
217+
cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
218+
region_name='us-west-2').cursor(AsyncCursor)
219+
220+
The default number of workers is 5 or cpu number * 5.
221+
If you want to change the number of workers you can specify like the following.
222+
223+
.. code:: python
224+
225+
from pyathena import connect
226+
from pyathena.async_cursor import AsyncCursor
227+
228+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
229+
region_name='us-west-2',
230+
cursor_class=AsyncCursor).cursor(max_workers=10)
231+
232+
The execute method of the asynchronous cursor returns the tuple of the query ID and the `future object`_.
233+
234+
.. code:: python
235+
236+
from pyathena import connect
237+
from pyathena.async_cursor import AsyncCursor
238+
239+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
240+
region_name='us-west-2',
241+
cursor_class=AsyncCursor).cursor()
242+
243+
query_id, future = cursor.execute("SELECT * FROM many_rows")
244+
245+
The return value of the `future object`_ is an ``AthenaResultSet`` object.
246+
This object has an interface that can fetch and iterate query results similar to synchronous cursors.
247+
It also has information on the result of query execution.
248+
249+
.. code:: python
250+
251+
from pyathena import connect
252+
from pyathena.async_cursor import AsyncCursor
253+
254+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
255+
region_name='us-west-2',
256+
cursor_class=AsyncCursor).cursor()
257+
258+
query_id, future = cursor.execute("SELECT * FROM many_rows")
259+
result_set = future.result()
260+
print(result_set.state)
261+
print(result_set.state_change_reason)
262+
print(result_set.completion_date_time)
263+
print(result_set.submission_date_time)
264+
print(result_set.data_scanned_in_bytes)
265+
print(result_set.execution_time_in_millis)
266+
print(result_set.output_location)
267+
print(result_set.description)
268+
for row in result_set:
269+
print(row)
270+
271+
.. code:: python
272+
273+
from pyathena import connect
274+
from pyathena.async_cursor import AsyncCursor
275+
276+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
277+
region_name='us-west-2',
278+
cursor_class=AsyncCursor).cursor()
279+
280+
query_id, future = cursor.execute("SELECT * FROM many_rows")
281+
result_set = future.result()
282+
print(result_set.fetchall())
283+
284+
A query ID is required to cancel a query with the asynchronous cursor.
285+
286+
.. code:: python
287+
288+
from pyathena import connect
289+
from pyathena.async_cursor import AsyncCursor
290+
291+
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
292+
region_name='us-west-2',
293+
cursor_class=AsyncCursor).cursor()
294+
295+
query_id, future = cursor.execute("SELECT * FROM many_rows")
296+
cursor.cancel(query_id)
297+
298+
NOTE: The cancel method of the `future object`_ does not cancel the query.
299+
300+
.. _`backport of the concurrent.futures`: https://pypi.python.org/pypi/futures
301+
.. _`future object`: https://docs.python.org/3/library/concurrent.futures.html#future-objects
302+
174303
Credentials
175304
-----------
176305

pyathena/__init__.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,6 @@ def __cmp__(self, other):
4747
Timestamp = datetime.datetime
4848

4949

50-
def connect(s3_staging_dir=None, region_name=None, schema_name='default',
51-
poll_interval=1, encryption_option=None, kms_key=None, profile_name=None,
52-
converter=None, formatter=None,
53-
retry_exceptions=('ThrottlingException', 'TooManyRequestsException'),
54-
retry_attempt=5, retry_multiplier=1,
55-
retry_max_delay=1800, retry_exponential_base=2,
56-
**kwargs):
50+
def connect(*args, **kwargs):
5751
from pyathena.connection import Connection
58-
return Connection(s3_staging_dir, region_name, schema_name,
59-
poll_interval, encryption_option, kms_key, profile_name,
60-
converter, formatter, retry_exceptions, retry_attempt,
61-
retry_multiplier, retry_max_delay, retry_exponential_base,
62-
**kwargs)
52+
return Connection(*args, **kwargs)

pyathena/async_cursor.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import absolute_import
3+
from __future__ import unicode_literals
4+
import logging
5+
6+
from concurrent.futures.thread import ThreadPoolExecutor
7+
8+
from pyathena.common import CursorIterator
9+
from pyathena.cursor import BaseCursor
10+
from pyathena.error import ProgrammingError, NotSupportedError
11+
from pyathena.result_set import AthenaResultSet
12+
13+
try:
14+
from multiprocessing import cpu_count
15+
except ImportError:
16+
def cpu_count():
17+
return None
18+
19+
20+
_logger = logging.getLogger(__name__)
21+
22+
23+
class AsyncCursor(BaseCursor):
24+
25+
def __init__(self, client, s3_staging_dir, schema_name, poll_interval,
26+
encryption_option, kms_key, converter, formatter,
27+
retry_exceptions, retry_attempt, retry_multiplier,
28+
retry_max_delay, retry_exponential_base,
29+
max_workers=(cpu_count() or 1) * 5,
30+
arraysize=CursorIterator.DEFAULT_FETCH_SIZE):
31+
super(AsyncCursor, self).__init__(client, s3_staging_dir, schema_name, poll_interval,
32+
encryption_option, kms_key, converter, formatter,
33+
retry_exceptions, retry_attempt, retry_multiplier,
34+
retry_max_delay, retry_exponential_base)
35+
self._executor = ThreadPoolExecutor(max_workers=max_workers)
36+
self._arraysize = arraysize
37+
38+
@property
39+
def arraysize(self):
40+
return self._arraysize
41+
42+
@arraysize.setter
43+
def arraysize(self, value):
44+
if value <= 0 or value > CursorIterator.DEFAULT_FETCH_SIZE:
45+
raise ProgrammingError('MaxResults is more than maximum allowed length {0}.'.format(
46+
CursorIterator.DEFAULT_FETCH_SIZE))
47+
self._arraysize = value
48+
49+
def close(self, wait=False):
50+
self._executor.shutdown(wait=wait)
51+
52+
def _description(self, query_id):
53+
result_set = self._collect_result_set(query_id)
54+
return result_set.description
55+
56+
def description(self, query_id):
57+
return self._executor.submit(self._description, query_id)
58+
59+
def query_execution(self, query_id):
60+
return self._executor.submit(self._query_execution, query_id)
61+
62+
def poll(self, query_id):
63+
return self._executor.submit(self._poll, query_id)
64+
65+
def _collect_result_set(self, query_id):
66+
query_execution = self._poll(query_id)
67+
return AthenaResultSet(
68+
self._connection, self._converter, query_execution, self._arraysize,
69+
self.retry_exceptions, self.retry_attempt, self.retry_multiplier,
70+
self.retry_max_delay, self.retry_exponential_base)
71+
72+
def execute(self, operation, parameters=None):
73+
query_id = self._execute(operation, parameters)
74+
return query_id, self._executor.submit(self._collect_result_set, query_id)
75+
76+
def executemany(self, operation, seq_of_parameters):
77+
raise NotSupportedError
78+
79+
def cancel(self, query_id):
80+
return self._executor.submit(self._cancel, query_id)

0 commit comments

Comments
 (0)