Skip to content

Commit a8fac5e

Browse files
[CIVIS-1949] PERF more efficient polling at future objects (#492)
* REF _ResultPollingThread doesn't actually take poller and poller_args * ENH exponential polling * ENH switch to backoff factor of 1.2 * TST geometric polling * DOC update docstrings and changelog * DOC update docs * DOC update docs
1 parent 423d5f1 commit a8fac5e

File tree

8 files changed

+113
-85
lines changed

8 files changed

+113
-85
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
2020
- Improved the startup time of `import civis` with a 5x speed boost. (#490)
2121
- The downloaded API spec due to the `civis.APIClient` instantiation is now
2222
a time-to-live cache in memory (15 minutes for interactive Python, or 24 hours in scripts). (#491)
23+
- Polling at `PollableResult` (and consequently its subclasses as well: `CivisFuture`,
24+
`ContainerFuture`, and `ModelFuture`) now defaults to geometrically increased polling
25+
intervals. Short-running jobs' `future.result()` can now return faster, while
26+
longer-running jobs have a capped polling interval of 15 seconds. (#492)
2327

2428
### Deprecated
2529
### Removed

docs/source/user_guide.rst

+7-17
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ uploads the data back into Civis:
3434
... table="my_schema.my_correlations")
3535
>>> fut.result()
3636
37+
.. _civis_futures:
3738

3839
Civis Futures
3940
=============
@@ -157,28 +158,17 @@ data to export. Then we create the export job and run it.
157158
remote_host_id=db_id,
158159
credential_id=cred_id,
159160
sql=generate_table)
160-
>>> export_run = client.scripts.post_sql_runs(export_job.id)
161+
>>> export_future = civis.utils.run_job(export_job.id)
161162
162-
We can then poll and wait for the export to be completed.
163+
``export_future`` is a :class:`CivisFuture <civis.futures.CivisFuture>` object
164+
(see :ref:`civis_futures` above). Calling ``.result()`` on ``export_future``
165+
blocks the program until the SQL run has completed.
163166

164167
.. code:: python
165168
166-
>>> import time
167-
>>> export_state = client.scripts.get_sql_runs(export_job.id,
168-
... export_run.id)
169-
>>> while export_state.state in ['queued', 'running']:
170-
... time.sleep(60)
171-
... export_state = client.scripts.get_sql_runs(export_job.id,
172-
... export_run.id)
173-
174-
Now, we can get the URL of the exported csv. First, we grab the result of our
175-
export job.
176-
177-
.. code:: python
178-
179-
>>> export_result = client.scripts.get_sql_runs(export_job.id,
180-
... export_run.id)
169+
>>> export_result = export_future.result()
181170
171+
Now, we can get the URL of the exported csv.
182172
In the future, a script may export multiple jobs, so the output of this is a
183173
list.
184174

src/civis/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def _lazy_import(name):
1919
return module
2020

2121

22+
futures = _lazy_import("civis.futures")
2223
io = _lazy_import("civis.io")
2324
ml = _lazy_import("civis.ml")
2425
parallel = _lazy_import("civis.parallel")

src/civis/base.py

-32
Original file line numberDiff line numberDiff line change
@@ -173,40 +173,8 @@ class CivisAsyncResultBase(futures.Future):
173173
the result. The `_result_` object needs to be set to an object with a
174174
``state`` attribute. Alternatively the `_check_result` method can be
175175
overwritten to change how the state of the object is returned.
176-
177-
Parameters
178-
----------
179-
poller : func
180-
A function which returns an object that has a ``state`` attribute.
181-
poller_args : tuple
182-
The arguments with which to call the poller function.
183-
polling_interval : int or float
184-
The number of seconds between API requests to check whether a result
185-
is ready.
186-
client : :class:`civis.APIClient`, optional
187-
If not provided, an :class:`civis.APIClient` object will be
188-
created from the :envvar:`CIVIS_API_KEY`.
189-
poll_on_creation : bool, optional
190-
If ``True`` (the default), it will poll upon calling ``result()`` the
191-
first time. If ``False``, it will wait the number of seconds specified
192-
in `polling_interval` from object creation before polling.
193176
"""
194177

195-
def __init__(
196-
self,
197-
poller,
198-
poller_args,
199-
polling_interval=None,
200-
client=None,
201-
poll_on_creation=True,
202-
):
203-
super().__init__()
204-
self.poller = poller
205-
self.poller_args = poller_args
206-
self.polling_interval = polling_interval
207-
self.client = client
208-
self.poll_on_creation = poll_on_creation
209-
210178
def __repr__(self):
211179
# Almost the same as the superclass's __repr__, except we use
212180
# the `_civis_state` rather than the `_state`.

src/civis/futures.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ class CivisFuture(PollableResult):
3232
The arguments with which to call the poller function.
3333
polling_interval : int or float, optional
3434
The number of seconds between API requests to check whether a result
35-
is ready.
35+
is ready. If an integer or float is provided, this number will be used
36+
as the polling interval. If ``None`` (the default), the polling interval will
37+
start at 1 second and increase geometrically up to 15 seconds. The ratio of
38+
the increase is 1.2, resulting in polling intervals in seconds of
39+
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
40+
a short-running job and a capped polling interval for longer-running jobs.
3641
client : :class:`civis.APIClient`, optional
3742
poll_on_creation : bool, optional
3843
If ``True`` (the default), it will poll upon calling ``result()`` the
@@ -231,9 +236,14 @@ class ContainerFuture(CivisFuture):
231236
The ID for the run to monitor
232237
max_n_retries : int, optional
233238
If the job generates an exception, retry up to this many times
234-
polling_interval: int or float, optional
239+
polling_interval : int or float, optional
235240
The number of seconds between API requests to check whether a result
236-
is ready.
241+
is ready. If an integer or float is provided, this number will be used
242+
as the polling interval. If ``None`` (the default), the polling interval will
243+
start at 1 second and increase geometrically up to 15 seconds. The ratio of
244+
the increase is 1.2, resulting in polling intervals in seconds of
245+
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
246+
a short-running job and a capped polling interval for longer-running jobs.
237247
client : :class:`civis.APIClient`, optional
238248
If not provided, an :class:`civis.APIClient` object will be
239249
created from the :envvar:`CIVIS_API_KEY`.

src/civis/ml/_model.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Run CivisML jobs and retrieve the results"""
22

33
import builtins
4-
from builtins import super
54
import collections
65
from functools import lru_cache
76
import io
@@ -394,7 +393,12 @@ class ModelFuture(ContainerFuture):
394393
run, and ``train_run_id`` will equal ``run_id``.
395394
polling_interval : int or float, optional
396395
The number of seconds between API requests to check whether a result
397-
is ready.
396+
is ready. If an integer or float is provided, this number will be used
397+
as the polling interval. If ``None`` (the default), the polling interval will
398+
start at 1 second and increase geometrically up to 15 seconds. The ratio of
399+
the increase is 1.2, resulting in polling intervals in seconds of
400+
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
401+
a short-running job and a capped polling interval for longer-running jobs.
398402
client : :class:`civis.APIClient`, optional
399403
If not provided, an :class:`civis.APIClient` object will be
400404
created from the :envvar:`CIVIS_API_KEY`.
@@ -555,6 +559,8 @@ def __setstate__(self, state):
555559
self._condition = threading.Condition()
556560
self.client = APIClient()
557561
self.poller = self.client.scripts.get_containers_runs
562+
self._next_polling_interval = 1
563+
self._use_geometric_polling = True
558564
self._begin_tracking()
559565
self.add_done_callback(self._set_job_exception)
560566

src/civis/polling.py

+41-26
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,17 @@
55
from civis.response import Response
66

77

8-
_DEFAULT_POLLING_INTERVAL = 15
8+
_MAX_POLLING_INTERVAL = 15
99

1010

1111
class _ResultPollingThread(threading.Thread):
1212
"""Poll a function until it returns a Response with a DONE state"""
1313

1414
# Inspired by `threading.Timer`
1515

16-
def __init__(self, poller, poller_args, polling_interval):
16+
def __init__(self, pollable_result):
1717
super().__init__(daemon=True)
18-
self.polling_interval = polling_interval
19-
self.poller = poller
20-
self.poller_args = poller_args
18+
self.pollable_result = pollable_result
2119
self.finished = threading.Event()
2220

2321
def cancel(self):
@@ -31,11 +29,11 @@ def join(self, timeout=None):
3129

3230
def run(self):
3331
"""Poll until done."""
34-
while not self.finished.wait(self.polling_interval):
32+
while not self.finished.wait(self.pollable_result._next_polling_interval):
3533
# Spotty internet connectivity can result in polling functions
3634
# returning None. This treats None responses like responses which
3735
# have a non-DONE state.
38-
poller_result = self.poller(*self.poller_args)
36+
poller_result = self.pollable_result._check_result()
3937
if poller_result is not None and poller_result.state in DONE:
4038
self.finished.set()
4139

@@ -53,9 +51,14 @@ class PollableResult(CivisAsyncResultBase):
5351
A function which returns an object that has a ``state`` attribute.
5452
poller_args : tuple
5553
The arguments with which to call the poller function.
56-
polling_interval : int or float
54+
polling_interval : int or float, optional
5755
The number of seconds between API requests to check whether a result
58-
is ready.
56+
is ready. If an integer or float is provided, this number will be used
57+
as the polling interval. If ``None`` (the default), the polling interval will
58+
start at 1 second and increase geometrically up to 15 seconds. The ratio of
59+
the increase is 1.2, resulting in polling intervals in seconds of
60+
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
61+
a short-running job and a capped polling interval for longer-running jobs.
5962
client : :class:`civis.APIClient`, optional
6063
If not provided, an :class:`civis.APIClient` object will be
6164
created from the :envvar:`CIVIS_API_KEY`.
@@ -103,18 +106,20 @@ def __init__(
103106
client=None,
104107
poll_on_creation=True,
105108
):
106-
if polling_interval is None:
107-
polling_interval = _DEFAULT_POLLING_INTERVAL
108-
super().__init__(
109-
poller=poller,
110-
poller_args=poller_args,
111-
polling_interval=polling_interval,
112-
client=client,
113-
poll_on_creation=poll_on_creation,
114-
)
115-
if self.polling_interval <= 0:
109+
super().__init__()
110+
111+
self.poller = poller
112+
self.poller_args = poller_args
113+
self.polling_interval = polling_interval
114+
self.client = client
115+
self.poll_on_creation = poll_on_creation
116+
117+
if self.polling_interval is not None and self.polling_interval <= 0:
116118
raise ValueError("The polling interval must be positive.")
117119

120+
self._next_polling_interval = 1
121+
self._use_geometric_polling = True
122+
118123
# Polling arguments. Never poll more often than the requested interval.
119124
if poll_on_creation:
120125
self._last_polled = None
@@ -153,8 +158,20 @@ def _check_result(self):
153158
now = time.time()
154159
if (
155160
not self._last_polled
156-
or (now - self._last_polled) >= self.polling_interval
161+
or (now - self._last_polled) >= self._next_polling_interval
157162
):
163+
if self._use_geometric_polling:
164+
# Choosing a common ratio of 1.2 for these polling intervals:
165+
# 1, 1.2, 1.44, 1.73, 2.07, 2.49, 2.99, ..., and capped at 15.
166+
# Within the first 15 secs by wall time, we call the poller 7 times,
167+
# which gives a short-running job's future.result()
168+
# a higher chance to return faster.
169+
# For longer running jobs, the polling interval will be capped
170+
# at 15 secs when by wall time 87 secs have passed.
171+
self._next_polling_interval *= 1.2
172+
if self._next_polling_interval > _MAX_POLLING_INTERVAL:
173+
self._next_polling_interval = _MAX_POLLING_INTERVAL
174+
self._use_geometric_polling = False
158175
# Poll for a new result
159176
self._last_polled = now
160177
try:
@@ -204,18 +221,16 @@ def cleanup(self):
204221
if self._polling_thread.is_alive():
205222
self._polling_thread.cancel()
206223

207-
def _reset_polling_thread(
208-
self, polling_interval=_DEFAULT_POLLING_INTERVAL, start_thread=False
209-
):
224+
def _reset_polling_thread(self, polling_interval, start_thread=False):
210225
with self._condition:
211226
if (
212227
getattr(self, "_polling_thread", None) is not None
213228
and self._polling_thread.is_alive()
214229
):
215230
self._polling_thread.cancel()
216231
self.polling_interval = polling_interval
217-
self._polling_thread = _ResultPollingThread(
218-
self._check_result, (), polling_interval
219-
)
232+
self._next_polling_interval = 1 if (pi := polling_interval) is None else pi
233+
self._use_geometric_polling = polling_interval is None
234+
self._polling_thread = _ResultPollingThread(self)
220235
if start_thread:
221236
self._polling_thread.start()

tests/test_polling.py

+39-5
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,15 @@ def test_poll_on_creation(self):
7676
assert poller.call_count > 0
7777

7878
def test_poller_returns_none(self):
79-
poller = mock.Mock(side_effect=[None, None, Response({"state": "success"})])
80-
polling_thread = _ResultPollingThread(poller, (), polling_interval=0.01)
79+
check_result = mock.Mock(
80+
side_effect=[None, None, Response({"state": "success"})]
81+
)
82+
pollable_result = mock.Mock()
83+
pollable_result._check_result = check_result
84+
pollable_result._next_polling_interval = 0.01
85+
polling_thread = _ResultPollingThread(pollable_result)
8186
polling_thread.run()
82-
assert poller.call_count == 3
87+
assert check_result.call_count == 3
8388

8489
def test_reset_polling_thread(self):
8590
pollable = PollableResult(
@@ -89,16 +94,45 @@ def test_reset_polling_thread(self):
8994
)
9095
initial_polling_thread = pollable._polling_thread
9196
assert pollable.polling_interval == 0.1
92-
assert pollable._polling_thread.polling_interval == 0.1
97+
assert pollable._next_polling_interval == 0.1
9398
pollable._reset_polling_thread(0.2)
9499
# Check that the polling interval was updated
95100
assert pollable.polling_interval == 0.2
96-
assert pollable._polling_thread.polling_interval == 0.2
101+
assert pollable._next_polling_interval == 0.2
97102
# Check that the _polling_thread is a new thread
98103
assert pollable._polling_thread != initial_polling_thread
99104
# Check that the old thread was stopped
100105
assert not initial_polling_thread.is_alive()
101106

107+
def test_geometric_polling(self):
108+
# To test polling, we make the poller function spit out a timestamp every time
109+
# it is called. Then we check if these timestamps are what we'd expect.
110+
poller_timestamps = []
111+
112+
def append_new_timestamp(*args, **kwargs):
113+
nonlocal poller_timestamps
114+
poller_timestamps.append(time.time())
115+
if len(poller_timestamps) < 5:
116+
return Response({"state": "running"})
117+
else:
118+
return Response({"state": "succeeded"})
119+
120+
poller = mock.Mock()
121+
poller.side_effect = append_new_timestamp
122+
123+
pollable = PollableResult(poller, (), poll_on_creation=False)
124+
start_time = time.time()
125+
pollable.result()
126+
127+
assert len(poller_timestamps) == 5
128+
expected_intervals = [1, 1.2, 1.44, 1.728, 2.0736]
129+
actual_intervals = []
130+
for i, timestamp in enumerate(poller_timestamps):
131+
actual_intervals.append(
132+
timestamp - (poller_timestamps[i - 1] if i else start_time)
133+
)
134+
assert actual_intervals == pytest.approx(expected_intervals, abs=0.02)
135+
102136

103137
if __name__ == "__main__":
104138
unittest.main()

0 commit comments

Comments
 (0)