Skip to content

Commit fbfb5f6

Browse files
author
Stephen Hoover
committed
PERF Replace PollableResult with CivisFuture
Now that the `futures.CivisFuture` class exists, replace all uses of `PollableResult` with `CivisFuture`. This improves performance by increasing speed (we find out that runs complete immediately, instead of after polling) and by reducing the number of API calls needed.
1 parent 06ed469 commit fbfb5f6

File tree

6 files changed

+83
-80
lines changed

6 files changed

+83
-80
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ This project adheres to [Semantic Versioning](http://semver.org/).
44

55
## [Unreleased]
66
### Added
7-
- `civis.futures.CivisFuture` for tracking future results
7+
- `civis.futures.CivisFuture` for tracking future results
88

99
### Performance Enhancements
1010
- ``civis.io.file_to_civis`` will perform a streaming upload to Platform if the optional ``requests-toolbelt`` package is installed.
11+
- Replace all ``PollableResult`` return values with ``CivisFuture`` to reduce the number of API calls and increase speed
1112

1213
## 1.2.0 - 2017-02-08
1314
### Added

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@ requests directly to the Civis API. See the
2424
```
2525
pip install civis
2626
```
27-
6. Optionally, install `pandas` and `requests-toolbelt` to enable some functionality in `civis-python`
27+
6. Optionally, install `pandas`, `pubnub`, and `requests-toolbelt` to enable some functionality in `civis-python`
2828
2929
```
3030
pip install pandas
31+
pip install pubnub
3132
pip install requests-toolbelt
3233
```
34+
Installation of `pandas` will allow some functions to return `DataFrame` outputs.
35+
Installation of `pubnub` will improve performance in all functions which
36+
wait for a Civis Platform job to complete.
37+
Installation of `requests-toolbelt` will allow streaming file uploads to
38+
Civis via `civis.io.file_to_civis`.
3339
3440
# Usage
3541

civis/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def _civis_state(self):
192192

193193
@property
194194
def _state(self):
195-
"""State of the PollableResult in `future` language."""
195+
"""State of the CivisAsyncResultBase in `future` language."""
196196
with self._condition:
197197
return STATE_TRANS[self._civis_state]
198198

civis/io/_databases.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
from civis import APIClient
22
from civis._utils import maybe_get_random_name
3-
from civis.polling import PollableResult, _DEFAULT_POLLING_INTERVAL
3+
from civis.futures import CivisFuture
44

55

66
def query_civis(sql, database, api_key=None, credential_id=None,
7-
preview_rows=10,
8-
polling_interval=_DEFAULT_POLLING_INTERVAL,
9-
hidden=True):
7+
preview_rows=10, polling_interval=None, hidden=True):
108
"""Execute a SQL statement as a Civis query.
119
1210
Run a query that may return no results or where only a small
@@ -35,8 +33,8 @@ def query_civis(sql, database, api_key=None, credential_id=None,
3533
3634
Returns
3735
-------
38-
results : :class:`~civis.polling.PollableResult`
39-
A `PollableResult` object.
36+
results : :class:`~civis.futures.CivisFuture`
37+
A `CivisFuture` object.
4038
4139
Examples
4240
--------
@@ -51,13 +49,13 @@ def query_civis(sql, database, api_key=None, credential_id=None,
5149
preview_rows,
5250
credential=cred_id,
5351
hidden=hidden)
54-
return PollableResult(client.queries.get, (resp.id, ), polling_interval)
52+
return CivisFuture(client.queries.get, (resp.id, ), polling_interval,
53+
api_key=api_key, poll_on_creation=False)
5554

5655

5756
def transfer_table(source_db, dest_db, source_table, dest_table,
5857
job_name=None, api_key=None, source_credential_id=None,
59-
dest_credential_id=None,
60-
polling_interval=_DEFAULT_POLLING_INTERVAL,
58+
dest_credential_id=None, polling_interval=None,
6159
**advanced_options):
6260
"""Transfer a table from one location to another.
6361
@@ -94,8 +92,8 @@ def transfer_table(source_db, dest_db, source_table, dest_table,
9492
9593
Returns
9694
-------
97-
results : :class:`~civis.polling.PollableResult`
98-
A `PollableResult` object.
95+
results : :class:`~civis.futures.CivisFuture`
96+
A `CivisFuture` object.
9997
10098
Examples
10199
--------
@@ -123,7 +121,7 @@ def transfer_table(source_db, dest_db, source_table, dest_table,
123121
advanced_options=advanced_options)
124122
run_id = client.imports.post_runs(id=job_id).run_id
125123

126-
poll = PollableResult(client.imports.get_files_runs,
127-
(job_id, run_id),
128-
polling_interval)
129-
return poll
124+
fut = CivisFuture(client.imports.get_files_runs, (job_id, run_id),
125+
polling_interval=polling_interval, api_key=api_key,
126+
poll_on_creation=False)
127+
return fut

civis/io/_tables.py

Lines changed: 50 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from civis.io import civis_to_file
77
from civis._utils import maybe_get_random_name
88
from civis.base import EmptyResultError
9-
from civis.polling import PollableResult, _DEFAULT_POLLING_INTERVAL
9+
from civis.futures import CivisFuture
1010
import requests
1111
import warnings
1212

@@ -30,8 +30,7 @@
3030

3131
def read_civis(table, database, columns=None, use_pandas=False,
3232
job_name=None, api_key=None, credential_id=None,
33-
polling_interval=_DEFAULT_POLLING_INTERVAL,
34-
archive=False, hidden=True, **kwargs):
33+
polling_interval=None, archive=False, hidden=True, **kwargs):
3534
"""Read data from a Civis table.
3635
3736
Parameters
@@ -115,8 +114,8 @@ def read_civis(table, database, columns=None, use_pandas=False,
115114

116115
def read_civis_sql(sql, database, use_pandas=False, job_name=None,
117116
api_key=None, credential_id=None,
118-
polling_interval=_DEFAULT_POLLING_INTERVAL,
119-
archive=False, hidden=True, **kwargs):
117+
polling_interval=None, archive=False,
118+
hidden=True, **kwargs):
120119
"""Read data from Civis using a custom SQL string.
121120
122121
Parameters
@@ -192,16 +191,16 @@ def read_civis_sql(sql, database, use_pandas=False, job_name=None,
192191
script_id, run_id = _sql_script(client, sql, database,
193192
job_name, credential_id,
194193
hidden=hidden)
195-
poll = PollableResult(client.scripts.get_sql_runs,
196-
(script_id, run_id),
197-
polling_interval)
194+
fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id),
195+
polling_interval=polling_interval, api_key=api_key,
196+
poll_on_creation=False)
198197
if archive:
199198

200199
def f(x):
201200
return client.scripts.put_sql_archive(script_id, True)
202201

203-
poll.add_done_callback(f)
204-
poll.result()
202+
fut.add_done_callback(f)
203+
fut.result()
205204
outputs = client.scripts.get_sql_runs(script_id, run_id)["output"]
206205
if not outputs:
207206
raise EmptyResultError("Query {} returned no output."
@@ -218,7 +217,7 @@ def f(x):
218217

219218
def civis_to_csv(filename, sql, database, job_name=None, api_key=None,
220219
credential_id=None, archive=False, hidden=True,
221-
polling_interval=_DEFAULT_POLLING_INTERVAL):
220+
polling_interval=None):
222221
"""Export data from Civis to a local CSV file.
223222
224223
Parameters
@@ -247,14 +246,14 @@ def civis_to_csv(filename, sql, database, job_name=None, api_key=None,
247246
248247
Returns
249248
-------
250-
results : :class:`~civis.polling.PollableResult`
251-
A `PollableResult` object.
249+
results : :class:`~civis.futures.CivisFuture`
250+
A `CivisFuture` object.
252251
253252
Examples
254253
--------
255254
>>> sql = "SELECT * FROM schema.table"
256-
>>> poll = civis_to_csv("file.csv", sql, "my_database")
257-
>>> poll.result() # Wait for job to complete
255+
>>> fut = civis_to_csv("file.csv", sql, "my_database")
256+
>>> fut.result() # Wait for job to complete
258257
259258
See Also
260259
--------
@@ -268,27 +267,26 @@ def civis_to_csv(filename, sql, database, job_name=None, api_key=None,
268267
script_id, run_id = _sql_script(client, sql, database,
269268
job_name, credential_id,
270269
hidden=hidden)
271-
poll = PollableResult(client.scripts.get_sql_runs,
272-
(script_id, run_id),
273-
polling_interval)
270+
fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id),
271+
polling_interval=polling_interval, api_key=api_key,
272+
poll_on_creation=False)
274273
download = _download_callback(script_id, run_id, client, filename)
275-
poll.add_done_callback(download)
274+
fut.add_done_callback(download)
276275
if archive:
277276

278277
def f(x):
279278
return client.scripts.put_sql_archive(script_id, True)
280279

281-
poll.add_done_callback(f)
280+
fut.add_done_callback(f)
282281

283-
return poll
282+
return fut
284283

285284

286285
def civis_to_multifile_csv(sql, database, job_name=None, api_key=None,
287286
credential_id=None, include_header=True,
288287
compression='none', delimiter='|',
289288
unquoted=False, prefix=None,
290-
polling_interval=_DEFAULT_POLLING_INTERVAL,
291-
hidden=True):
289+
polling_interval=None, hidden=True):
292290
"""Unload the result of SQL query and return presigned urls.
293291
294292
This function is intended for unloading large queries/tables from redshift
@@ -372,11 +370,10 @@ def civis_to_multifile_csv(sql, database, job_name=None, api_key=None,
372370
credential_id, hidden,
373371
csv_settings=csv_settings)
374372

375-
poll = PollableResult(client.scripts.get_sql_runs,
376-
(script_id, run_id),
377-
polling_interval)
378-
poll.result()
379-
outputs = client.scripts.get_sql_runs(script_id, run_id)["output"]
373+
fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id),
374+
polling_interval=polling_interval, api_key=api_key,
375+
poll_on_creation=False)
376+
outputs = fut.result()["output"]
380377
if not outputs:
381378
raise EmptyResultError("Unload query {} returned no manifest."
382379
.format(script_id))
@@ -394,7 +391,7 @@ def dataframe_to_civis(df, database, table, api_key=None,
394391
max_errors=None, existing_table_rows="fail",
395392
distkey=None, sortkey1=None, sortkey2=None,
396393
headers=None, credential_id=None,
397-
polling_interval=_DEFAULT_POLLING_INTERVAL,
394+
polling_interval=None,
398395
archive=False, hidden=True, **kwargs):
399396
"""Upload a `pandas` `DataFrame` into a Civis table.
400397
@@ -442,16 +439,16 @@ def dataframe_to_civis(df, database, table, api_key=None,
442439
443440
Returns
444441
-------
445-
poll : :class:`~civis.polling.PollableResult`
446-
A `PollableResult` object.
442+
fut : :class:`~civis.futures.CivisFuture`
443+
A `CivisFuture` object.
447444
448445
Examples
449446
--------
450447
>>> import pandas as pd
451448
>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
452-
>>> poller = civis.io.dataframe_to_civis(df, 'my-database',
453-
... 'scratch.df_table')
454-
>>> poller.result()
449+
>>> fut = civis.io.dataframe_to_civis(df, 'my-database',
450+
... 'scratch.df_table')
451+
>>> fut.result()
455452
"""
456453
if archive:
457454
warnings.warn("`archive` is deprecated and will be removed in v2.0.0. "
@@ -472,8 +469,7 @@ def csv_to_civis(filename, database, table, api_key=None,
472469
max_errors=None, existing_table_rows="fail",
473470
distkey=None, sortkey1=None, sortkey2=None,
474471
delimiter=",", headers=None,
475-
credential_id=None,
476-
polling_interval=_DEFAULT_POLLING_INTERVAL,
472+
credential_id=None, polling_interval=None,
477473
archive=False, hidden=True):
478474
"""Upload the contents of a local CSV file to Civis.
479475
@@ -520,8 +516,8 @@ def csv_to_civis(filename, database, table, api_key=None,
520516
521517
Returns
522518
-------
523-
results : :class:`~civis.polling.PollableResult`
524-
A `PollableResult` object.
519+
results : :class:`~civis.futures.CivisFuture`
520+
A `CivisFuture` object.
525521
526522
Notes
527523
-----
@@ -531,20 +527,20 @@ def csv_to_civis(filename, database, table, api_key=None,
531527
--------
532528
>>> with open('input_file.csv', 'w') as _input:
533529
... _input.write('a,b,c\\n1,2,3')
534-
>>> poller = civis.io.csv_to_civis('input_file.csv',
535-
... 'my-database',
536-
... 'scratch.my_data')
537-
>>> poller.result()
530+
>>> fut = civis.io.csv_to_civis('input_file.csv',
531+
... 'my-database',
532+
... 'scratch.my_data')
533+
>>> fut.result()
538534
"""
539535
if archive:
540536
warnings.warn("`archive` is deprecated and will be removed in v2.0.0. "
541537
"Use `hidden` instead.", FutureWarning)
542538
with open(filename, "rb") as data:
543-
poll = _import_bytes(data, database, table, api_key, max_errors,
544-
existing_table_rows, distkey, sortkey1, sortkey2,
545-
delimiter, headers, credential_id,
546-
polling_interval, archive, hidden=hidden)
547-
return poll
539+
fut = _import_bytes(data, database, table, api_key, max_errors,
540+
existing_table_rows, distkey, sortkey1, sortkey2,
541+
delimiter, headers, credential_id,
542+
polling_interval, archive, hidden=hidden)
543+
return fut
548544

549545

550546
def _sql_script(client, sql, database, job_name, credential_id, hidden=False,
@@ -615,13 +611,15 @@ def _import_bytes(buf, database, table, api_key, max_errors,
615611
run_job_result = client._session.post(import_job.run_uri)
616612
run_job_result.raise_for_status()
617613
run_info = run_job_result.json()
618-
poll = PollableResult(client.imports.get_files_runs,
619-
(run_info['importId'], run_info['id']),
620-
polling_interval=polling_interval)
614+
fut = CivisFuture(client.imports.get_files_runs,
615+
(run_info['importId'], run_info['id']),
616+
polling_interval=polling_interval,
617+
api_key=api_key,
618+
poll_on_creation=False)
621619
if archive:
622620

623621
def f(x):
624622
return client.imports.put_archive(import_job.id, True)
625623

626-
poll.add_done_callback(f)
627-
return poll
624+
fut.add_done_callback(f)
625+
return fut

docs/source/user_guide.rst

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,27 @@ uploads the data back into Civis:
2929
... use_pandas=True)
3030
>>> correlation_matrix = df.corr()
3131
>>> correlation_matrix["corr_var"] = correlation_matrix.index
32-
>>> poller = civis.io.dataframe_to_civis(df=correlation_matrix,
33-
... database="database",
34-
... table="my_schema.my_correlations")
35-
>>> poller.result()
32+
>>> fut = civis.io.dataframe_to_civis(df=correlation_matrix,
33+
... database="database",
34+
... table="my_schema.my_correlations")
35+
>>> fut.result()
3636
3737
38-
Pollable Results
39-
================
38+
Civis Futures
39+
=============
4040

4141
In the code above, :func:`~civis.io.dataframe_to_civis` returns a special
42-
:class:`~civis.polling.PollableResult` object. Making a request to the Civis
42+
:class:`~civis.futures.CivisFuture` object. Making a request to the Civis
4343
API usually results in a long running job. To account for this, various
4444
functions in the ``civis`` namespace return a
45-
:class:`PollableResult <civis.polling.PollableResult>` to allow you to
45+
:class:`CivisFuture <civis.futures.CivisFuture>` to allow you to
4646
process multiple long running jobs simultaneously. For instance, you may
4747
want to start many jobs in parallel and wait for them all to finish rather
4848
than wait for each job to finish before starting the next one.
4949

50-
The :class:`PollableResult <civis.polling.PollableResult>` follows the
50+
The :class:`CivisFuture <civis.futures.CivisFuture>` follows the
5151
:class:`python:concurrent.futures.Future` API fairly closely. For example,
52-
calling ``result()`` on ``poller`` above forces the program to wait for the
52+
calling ``result()`` on ``fut`` above forces the program to wait for the
5353
job started with :func:`~civis.io.dataframe_to_civis` to finish and
5454
returns the result.
5555

0 commit comments

Comments
 (0)