Skip to content

Commit e743f11

Browse files
authored
New version of concurrent upload (#45)
The implementation now works with a queue, instead of threading. With this implementation only one connection is made at a time.
1 parent 69eec1d commit e743f11

5 files changed

Lines changed: 77 additions & 90 deletions

File tree

astrometry_net_client/client.py

Lines changed: 69 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import time
3-
from concurrent.futures import ThreadPoolExecutor, as_completed
3+
from queue import Queue
44

55
from astrometry_net_client.session import Session
66
from astrometry_net_client.settings import Settings
@@ -62,9 +62,7 @@ def __init__(self, session=None, settings=None, **kwargs):
6262
def upload_files_gen(
6363
self,
6464
files_iter,
65-
filter_func=None,
66-
filter_args=None,
67-
workers=MAX_WORKERS,
65+
queue_size=MAX_WORKERS,
6866
):
6967
"""
7068
Generator which uploads a number of files concurrently, yielding the
@@ -79,15 +77,11 @@ def upload_files_gen(
7977
files_iter: iterable
8078
Some iterable containing paths to the files which will be uploaded.
8179
Is fully consumed before any result is yielded.
82-
workers: int, optional
83-
A positive integer, controlling the amount of workers to use for
84-
the processing. Will not exceed the value of
80+
queue_size: int, optional
81+
A positive integer, controlling the size of the queue. This will
82+
determine the maximum number of simultaneous submissions. Must be
83+
greater than 0 and lower than :py:const:`MAX_WORKERS`. Default is
8584
:py:const:`MAX_WORKERS`.
86-
filter_func: Callable
87-
Predicate filter function which takes in the `filename` and
88-
optionally some argument (`filter_args`).
89-
filter_args: List
90-
Arguments which are to be passed to the filter function.
9185
9286
Yields
9387
------
@@ -97,75 +91,80 @@ def upload_files_gen(
9791
corresponding filename. Yields when the Job is finished.
9892
NOTE: Order of yielded filenames can (and likely will) be different
9993
from the given ``files_iter``
94+
95+
Raises
96+
------
97+
ValueError
98+
When the queue_size is invalid.
10099
"""
101-
workers = min(MAX_WORKERS, workers)
102-
with ThreadPoolExecutor(max_workers=workers) as executor:
103-
log.info("Spawned executor {}".format(executor))
104-
105-
# submit the files & save which future corresponds to which
106-
# filename
107-
future_to_file = {
108-
executor.submit(
109-
self.filtered_upload_wrapper,
110-
filename,
111-
filter_func=filter_func,
112-
filter_args=filter_args,
113-
): filename
114-
for filename in files_iter
115-
}
100+
SLEEP_TIME = 0.3 # seconds
101+
102+
files_iter = iter(files_iter)
103+
if queue_size < 1 or queue_size > MAX_WORKERS:
104+
raise ValueError(
105+
"queue_size must be greater than 0 and less or equal to ",
106+
f"{MAX_WORKERS}, was: {queue_size}",
107+
)
108+
processing_queue = Queue(maxsize=queue_size)
109+
110+
# Populate queue initially
111+
for _, filename in zip(range(queue_size), files_iter):
112+
self._insert_submission(filename, processing_queue)
113+
114+
while not processing_queue.empty():
115+
filename, submission, job = processing_queue.get()
116+
log_msg = "Checking file {}, job exists: {}"
117+
log.debug(log_msg.format(filename, job is not None))
118+
# The item in the queue has 2 states; if it is still only a
119+
# submission job will be None and we have to create a job out of
120+
# it. When the job is made, we can check if the job is done. When
121+
# the job is finished return (yield) the value, otherwise put it
122+
# back in the queue.
123+
124+
if job is None:
125+
submission.status()
126+
if submission.done():
127+
job = submission.jobs[0]
128+
else:
129+
processing_queue.put((filename, submission, job))
130+
continue
116131

117-
# iterate over the results once they are completed.
118-
for future in as_completed(future_to_file):
119-
result_filename = future_to_file[future]
132+
job.status()
133+
if job.done():
120134
try:
121-
res_job = future.result()
122-
except Exception as e:
123-
# This exception is thrown inside the computed function.
124-
err_msg = "File {} stopped with exception {}"
125-
log.error(err_msg.format(result_filename, e))
135+
filename = next(files_iter)
136+
except StopIteration:
137+
pass
126138
else:
127-
if res_job is not None: # ignore if file was filtered out
128-
yield res_job, result_filename
139+
self._insert_submission(filename, processing_queue)
140+
log_msg = "FINISHED submission {}, yielding..."
141+
log.info(log_msg.format(filename))
142+
yield (job, filename)
143+
else:
144+
processing_queue.put((filename, submission, job))
129145

130-
def filtered_upload_wrapper(
131-
self, filename, filter_func=None, filter_args=None, *args, **kwargs
132-
):
146+
time.sleep(SLEEP_TIME)
147+
148+
def _insert_submission(self, filename, queue):
133149
"""
134-
Wrapper around :py:func:`upload_file` which filters the given file
135-
based on a specified filter function. Main use for this is a
136-
computationally heavy filter function, like counting number of sources
137-
locally, and only uploading if not enough are detected.
150+
Helper function which creates an upload for the given filename, and
151+
inserts the submission in a queue. This is not intended to be used
152+
by a user.
138153
139154
Parameters
140155
----------
141156
filename: str
142-
File to be uploaded. See :py:func:`upload_file`.
143-
filter_func: Callable
144-
Predicate filter function which takes in the `filename` and
145-
optionally some argument (`filter_args`).
146-
filter_args: List
147-
Arguments which are to be passed to the filter function.
148-
args: other arguments
149-
Directly passed to :py:func:`upload_file`
150-
kwargs: keyword arguments
151-
Directly passed to :py:func:`upload_file`
152-
153-
Returns
154-
-------
155-
Job or None: :py:class:`astrometry_net_client.statusables.Job`, `None`
156-
Will be the job of the resulting upload (see
157-
:py:func:`upload_file`), or `None` when `filter_func` evaluated to
158-
`False`.
157+
The filename of the file to be submitted into the queue.
158+
queue: Queue
159+
The queue in which to insert the submission.
159160
"""
160-
if filter_args is None:
161-
# allow arguments to be unpackable if it is not specified
162-
filter_args = []
163-
164-
if filter_func is not None and not filter_func(filename, *filter_args):
165-
log.info("Filter function failed, skipping upload")
166-
return None
167-
168-
return self.upload_file(filename, *args, **kwargs)
161+
log_msg = "Submitting file {}"
162+
log.info(log_msg.format(filename))
163+
upl = FileUpload(
164+
filename, session=self.session, settings=self.settings
165+
)
166+
submission = upl.submit()
167+
queue.put((filename, submission, None))
169168

170169
def upload_file(self, filename, settings=None):
171170
"""

examples/concurrent_upload.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from astropy.io import fits
77
from astropy.stats import sigma_clipped_stats
8-
from photutils import DAOStarFinder
8+
from photutils.detection import DAOStarFinder
99

1010
from astrometry_net_client import Client
1111

@@ -98,12 +98,13 @@ def main():
9898

9999
# iterate over all the fits files in the specified diretory
100100
fits_files = filter(is_fits, files)
101+
fits_files = filter(
102+
lambda f: enough_sources(f, min_sources=10), fits_files
103+
)
101104

102105
# give the iterable of filenames to the function, which returns a
103106
# generator, generating pairs containing the finished job and filename.
104-
result_iter = c.upload_files_gen(
105-
fits_files, filter_func=enough_sources, filter_args=(10,)
106-
)
107+
result_iter = c.upload_files_gen(fits_files)
107108

108109
for job, filename in result_iter:
109110

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
setuptools.setup(
77
name="astrometry_net_client",
8-
version="0.2.10",
8+
version="0.3.0",
99
author="Sten Sipma",
1010
author_email="sten.sipma@ziggo.nl",
1111
description="A Python interface for the Astrometry.net API.",

tests/test_client.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,6 @@ def test_client_settings(mock_server):
4646
assert client.settings == client2.settings
4747

4848

49-
def test_client_upload_filter(mock_server):
50-
client = Client(api_key=VALID_KEY)
51-
52-
job = client.filtered_upload_wrapper(FILE, filter_func=lambda x: False)
53-
assert job is None
54-
55-
5649
@pytest.mark.long
5750
def test_client_upload(mock_server):
5851
client = Client(api_key=VALID_KEY)
@@ -61,17 +54,11 @@ def test_client_upload(mock_server):
6154
assert job.success()
6255
assert job.done()
6356

64-
# should be normal
65-
job = client.filtered_upload_wrapper(FILE)
66-
assert job is not None
67-
assert job.success()
68-
assert job.done()
69-
7057

7158
@pytest.mark.long
7259
def test_client_upload_multiple(mock_server):
7360
client = Client(api_key=VALID_KEY)
74-
jobs = client.upload_files_gen([FILE] * 5, workers=3)
61+
jobs = client.upload_files_gen([FILE] * 5, queue_size=3)
7562

7663
assert jobs is not None
7764

tests/test_online.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_old():
4848

4949
# TODO include some test data in the repository
5050
upl = FileUpload(
51-
"../test-data/target.200417.00000088.3x3.FR.fits", session=session
51+
"../test/data/target.200417.00000088.3x3.FR.fits", session=session
5252
)
5353

5454
submission = upl.submit()

0 commit comments

Comments
 (0)