Skip to content

Commit f51a976

Browse files
authored
Merge pull request #9 from b97pla/download_option
Download option
2 parents 9b6fec9 + b88030f commit f51a976

File tree

10 files changed

+248
-86
lines changed

10 files changed

+248
-86
lines changed

archive_verify/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414

1515
def setup_routes(app):
16-
app.router.add_post(app["config"]["base_url"] + "/verify", handlers.verify)
16+
app.router.add_post(app["config"]["base_url"] + r"/{endpoint:(verify|download)}", handlers.verify)
1717
app.router.add_get(app["config"]["base_url"] + "/status/{job_id}", handlers.status)
1818

1919

@@ -56,6 +56,7 @@ def start():
5656
log.info("Starting archive-verify-ws on {}...".format(conf["port"]))
5757
app = web.Application()
5858
app['config'] = conf
59+
app.cleanup_ctx.append(handlers.redis_context)
5960
setup_routes(app)
6061
web.run_app(app, port=conf["port"])
6162

archive_verify/handlers.py

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import os
33

44
from aiohttp import web
5-
import archive_verify.redis_client as redis_client
65
from rq import Queue
76

87
from archive_verify.workers import verify_archive
8+
import archive_verify.redis_client as redis_client
99

1010
log = logging.getLogger(__name__)
1111

12+
1213
async def verify(request):
1314
"""
1415
Handler accepts a POST call with JSON parameters in the body. Upon a request it will
@@ -21,6 +22,8 @@ async def verify(request):
2122
:return JSON containing job id and link which we can poll for current job status
2223
"""
2324
body = await request.json()
25+
endpoint = request.match_info["endpoint"]
26+
keep_download = (endpoint == "download")
2427
host = body["host"]
2528
archive = body["archive"]
2629
description = body["description"]
@@ -30,28 +33,38 @@ async def verify(request):
3033
# use a supplied path if available, otherwise construct it from the src_root and archive
3134
archive_path = path or os.path.join(src_root, archive)
3235

33-
redis_conn = redis_client.get_redis_instance()
34-
q = Queue(connection=redis_conn)
36+
q = request.app['redis_q']
3537

3638
# Enqueue the verify_archive function with the user supplied input parameters.
37-
# Note that the TTL and timeout parameters are important for e.g. how long
38-
# the jobs and their results will be kept in the Redis queue. By default our
39-
# config e.g. setups the queue to keep the job results indefinately,
40-
# therefore they we will have to remove them ourselves afterwards.
41-
job = q.enqueue_call(verify_archive,
42-
args=(archive, archive_path, description, request.app["config"]),
43-
timeout=request.app["config"]["job_timeout"],
44-
result_ttl=request.app["config"]["job_result_ttl"],
45-
ttl=request.app["config"]["job_ttl"])
39+
# Note that the TTL and timeout parameters are important for e.g. how long
40+
# the jobs and their results will be kept in the Redis queue. By default our
41+
# config e.g. setups the queue to keep the job results indefinately,
42+
# therefore they we will have to remove them ourselves afterwards.
43+
job = q.enqueue_call(verify_archive,
44+
args=(
45+
archive,
46+
archive_path,
47+
description,
48+
keep_download,
49+
request.app["config"]),
50+
timeout=request.app["config"]["job_timeout"],
51+
result_ttl=request.app["config"]["job_result_ttl"],
52+
ttl=request.app["config"]["job_ttl"])
4653

4754
url = request.url
4855
url_base = request.app["config"]["base_url"]
4956

5057
status_end_point = "{0}://{1}:{2}{3}/status/{4}".format(url.scheme, url.host, url.port, url_base, job.id)
51-
response = { "status": "pending", "job_id": job.id, "link": status_end_point, "path": archive_path }
52-
58+
response = {
59+
"status": "pending",
60+
"job_id": job.id,
61+
"link": status_end_point,
62+
"path": archive_path,
63+
"action": endpoint}
64+
5365
return web.json_response(response)
5466

67+
5568
async def status(request):
5669
"""
5770
Handler accepts a GET call with an URL parameter which corresponds to a previously
@@ -62,34 +75,47 @@ async def status(request):
6275
"""
6376
job_id = str(request.match_info['job_id'])
6477

65-
redis_conn = redis_client.get_redis_instance()
66-
q = Queue(connection=redis_conn)
78+
q = request.app['redis_q']
6779
job = q.fetch_job(job_id)
6880

6981
if job:
7082
if job.is_started:
71-
payload = {"state": "started", "msg": "Job {} is currently running.".format(job_id)}
83+
payload = {
84+
"state": "started",
85+
"msg": f"Job {job_id} is currently running."}
7286
code = 200
73-
elif job.is_finished:
87+
elif job.is_finished or job.is_failed:
7488
result = job.result
7589

76-
if result and result["state"] == "done":
77-
payload = {"state": "done", "msg": "Job {} has returned with result: {}".format(job_id, job.result)}
90+
if result["state"] == "done":
91+
payload = {
92+
"state": result["state"],
93+
"msg": f"Job {job_id} has returned with result: {result['msg']}"}
7894
code = 200
79-
else:
80-
payload = {"state": "error", "msg": "Job {} has returned with result: {}".format(job_id, job.result), "debug": job.exc_info}
95+
else:
96+
payload = {
97+
"state": result["state"],
98+
"msg": f"Job {job_id} has returned with result: {result['msg']}",
99+
"debug": job.exc_info if job.exc_info else result}
81100
code = 500
82101

83102
job.delete()
84-
elif job.is_failed:
85-
payload = {"state": "error", "msg": "Job {} failed with error: {}".format(job_id, job.exc_info)}
86-
job.delete()
87-
code = 500
88103
else:
89-
payload = {"state": "pending", "msg": "Job {} has not started yet.".format(job_id)}
104+
payload = {
105+
"state": job.get_status(),
106+
"msg": f"Job {job_id} is {job.get_status()}"}
90107
code = 200
91108
else:
92-
payload = {"state": "error", "msg": "No such job {} found!".format(job_id)}
109+
payload = {
110+
"state": "error",
111+
"msg": f"No such job {job_id} found!"}
93112
code = 400
94113

95114
return web.json_response(payload, status=code)
115+
116+
117+
async def redis_context(app):
118+
app["redis_q"] = Queue(
119+
connection=redis_client.get_redis_instance(),
120+
is_async=app["config"].get("async_redis", True))
121+
yield

archive_verify/pdc_client.py

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
log = logging.getLogger('archive_verify.workers')
1010

1111

12-
class PdcClient():
12+
class PdcClient:
1313
"""
1414
Base class representing a PDC client.
1515
Staging and production environments should instantiate PdcClient (default).
@@ -19,13 +19,15 @@ def __init__(self, archive_name, archive_pdc_path, archive_pdc_description, job_
1919
"""
2020
:param archive_name: The name of the archive we shall download
2121
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
22-
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
22+
:param archive_pdc_description: The unique description that was used when uploading the
23+
archive to PDC
2324
:param job_id: The current rq worker job id
2425
:param config: A dict containing the apps configuration
2526
"""
2627
self.dest_root = config["verify_root_dir"]
2728
self.dsmc_log_dir = config["dsmc_log_dir"]
2829
self.whitelisted_warnings = config["whitelisted_warnings"]
30+
self.dsmc_extra_args = config.get("dsmc_extra_args", {})
2931
self.archive_name = archive_name
3032
self.archive_pdc_path = archive_pdc_path
3133
self.archive_pdc_description = archive_pdc_description
@@ -35,27 +37,50 @@ def dest(self):
3537
"""
3638
:returns The unique path where the archive will be downloaded.
3739
"""
38-
return "{}_{}".format(os.path.join(self.dest_root, self.archive_name), self.job_id)
40+
return f"{os.path.join(self.dest_root, self.archive_name)}_{self.job_id}"
41+
42+
def dsmc_args(self):
43+
"""
44+
Fetch a list of arguments that will be passed to the dsmc command line. If there are
45+
extra arguments specified in the config, with "dsmc_extra_args", these are included as well.
46+
If arguments specified in dsmc_extra_args has the same key as the default arguments, the
47+
defaults will be overridden.
48+
49+
:return: a string with arguments that should be appended to the dsmc command line
50+
"""
51+
key_values = {
52+
"subdir": "yes",
53+
"description": self.archive_pdc_description
54+
}
55+
key_values.update(self.dsmc_extra_args)
56+
args = [f"-{k}='{v}'" for k, v in key_values.items() if v is not None]
57+
args.extend([f"-{k}" for k, v in key_values.items() if v is None])
58+
return " ".join(args)
3959

4060
def download(self):
4161
"""
4262
Downloads the specified archive from PDC to a unique location.
4363
:returns True if no errors or only whitelisted warnings were encountered, False otherwise
4464
"""
45-
log.info("Download_from_pdc started for {}".format(self.archive_pdc_path))
46-
cmd = "export DSM_LOG={} && dsmc retr {}/ {}/ -subdir=yes -description='{}'".format(self.dsmc_log_dir,
47-
self.archive_pdc_path,
48-
self.dest(),
49-
self.archive_pdc_description)
50-
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
65+
log.info(f"Download_from_pdc started for {self.archive_pdc_path}")
66+
cmd = f"export DSM_LOG={self.dsmc_log_dir} && " \
67+
f"dsmc retr {self.archive_pdc_path}/ {self.dest()}/ {self.dsmc_args()}"
68+
69+
p = subprocess.Popen(
70+
cmd,
71+
shell=True,
72+
stdout=subprocess.PIPE,
73+
stderr=subprocess.STDOUT,
74+
text=True)
5175

5276
dsmc_output, _ = p.communicate()
5377
dsmc_exit_code = p.returncode
5478

5579
if dsmc_exit_code != 0:
56-
return PdcClient._parse_dsmc_return_code(dsmc_exit_code, dsmc_output, self.whitelisted_warnings)
80+
return PdcClient._parse_dsmc_return_code(
81+
dsmc_exit_code, dsmc_output, self.whitelisted_warnings)
5782

58-
log.info("Download_from_pdc completed successfully for {}".format(self.archive_pdc_path))
83+
log.info(f"Download_from_pdc completed successfully for {self.archive_pdc_path}")
5984
return True
6085

6186
def downloaded_archive_path(self):
@@ -67,46 +92,42 @@ def cleanup(self):
6792
@staticmethod
6893
def _parse_dsmc_return_code(exit_code, output, whitelist):
6994
"""
70-
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit codes,
71-
warnings and errors we still want to return successfully.
95+
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit
96+
codes, warnings and errors we still want to return successfully.
7297
7398
:param exit_code: The exit code received from the failing dsmc process
7499
:param output: The text output from the dsmc process
75100
:param whitelist: A list of whitelisted warnings
76101
:returns True if only whitelisted warnings was encountered in the output, otherwise False
77102
"""
78-
log.info("DSMC process returned an error!")
79103

80104
# DSMC sets return code to 8 when a warning was encountered.
81-
if exit_code == 8:
82-
log.info("DSMC process actually returned a warning.")
105+
log_fn = log.warning if exit_code == 8 else log.error
106+
log_fn(f"DSMC process returned a{' warning' if exit_code == 8 else 'n error'}!")
83107

84-
output = output.splitlines()
108+
# parse the DSMC output and extract error/warning codes and messages
109+
codes = []
110+
for line in output.splitlines():
111+
if line.startswith("ANS"):
112+
log_fn(line)
85113

86-
# Search through the DSMC log and see if we only have
87-
# whitelisted warnings. If that is the case, change the
88-
# return code to 0 instead. Otherwise keep the error state.
89-
warnings = []
114+
matches = re.findall(r'ANS[0-9]+[EW]', line)
115+
for match in matches:
116+
codes.append(match)
90117

91-
for line in output:
92-
matches = re.findall(r'ANS[0-9]+W', line)
118+
unique_codes = set(sorted(codes))
119+
if unique_codes:
120+
log_fn(f"ANS codes found in DSMC output: {', '.join(unique_codes)}")
93121

94-
for match in matches:
95-
warnings.append(match)
122+
# if we only have whitelisted warnings, change the return code to 0 instead
123+
if unique_codes.issubset(set(whitelist)):
124+
log.info("Only whitelisted DSMC ANS code(s) were encountered. Everything is OK.")
125+
return True
96126

97-
log.info("Warnings found in DSMC output: {}".format(set(warnings)))
98-
99-
for warning in warnings:
100-
if warning not in whitelist:
101-
log.error("A non-whitelisted DSMC warning was encountered. Reporting it as an error! ('{}')".format(
102-
warning))
103-
return False
104-
105-
log.info("Only whitelisted DSMC warnings were encountered. Everything is OK.")
106-
return True
107-
else:
108-
log.error("An uncaught DSMC error code was encountered!")
109-
return False
127+
log.error(
128+
f"Non-whitelisted DSMC ANS code(s) encountered: "
129+
f"{', '.join(unique_codes.difference(set(whitelist)))}")
130+
return False
110131

111132

112133
class MockPdcClient(PdcClient):
@@ -138,8 +159,10 @@ def dest(self):
138159

139160
def download(self):
140161
if not self.predownloaded_archive_path:
141-
log.error(f"No archive containing the name {self.archive_name} found in {self.dest_root}")
162+
log.error(
163+
f"No archive containing the name {self.archive_name} found in {self.dest_root}")
142164
return False
143165
else:
144-
log.info(f"Found pre-downloaded archive at {self.predownloaded_archive_path}")
166+
log.info(
167+
f"Found pre-downloaded archive at {self.predownloaded_archive_path}")
145168
return True

archive_verify/workers.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,21 @@ def configure_log(dsmc_log_dir, archive_pdc_description):
5151
log.addHandler(fh)
5252

5353

54-
def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, config):
54+
def verify_archive(
55+
archive_name,
56+
archive_pdc_path,
57+
archive_pdc_description,
58+
keep_downloaded_archive,
59+
config):
5560
"""
5661
Our main worker function. This will be put into the RQ/Redis queue when the /verify endpoint gets called.
5762
Downloads the specified archive from PDC and then verifies the MD5 sums.
5863
5964
:param archive_name: The name of the archive we shall download
6065
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
6166
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
67+
:param keep_downloaded_archive: If True, the downloaded archive will not be removed from local
68+
storage
6269
:param config: A dict containing the apps configuration
6370
:returns A JSON with the result that will be kept in the Redis queue
6471
"""
@@ -70,13 +77,22 @@ def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, conf
7077
log.debug(f"Using PDC Client of type: {pdc_class.__name__}")
7178

7279
job_id = rq.get_current_job().id
73-
pdc_client = pdc_class(archive_name, archive_pdc_path, archive_pdc_description, job_id, config)
80+
pdc_client = pdc_class(
81+
archive_name,
82+
archive_pdc_path,
83+
archive_pdc_description,
84+
job_id,
85+
config)
7486
dest = pdc_client.dest()
7587
download_ok = pdc_client.download()
7688

7789
if not download_ok:
7890
log.debug("Download of {} failed.".format(archive_name))
79-
return {"state": "error", "msg": "failed to properly download archive from pdc", "path": dest}
91+
return {
92+
"state": "error",
93+
"msg": "failed to properly download archive from pdc",
94+
"path": dest
95+
}
8096
else:
8197
log.debug("Verifying {}...".format(archive_name))
8298
archive = pdc_client.downloaded_archive_path()
@@ -85,8 +101,17 @@ def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, conf
85101

86102
if verified_ok:
87103
log.info("Verify of {} succeeded.".format(archive))
88-
pdc_client.cleanup()
89-
return {"state": "done", "path": output_file, "msg": "Successfully verified archive md5sums."}
104+
if not keep_downloaded_archive:
105+
pdc_client.cleanup()
106+
return {
107+
"state": "done",
108+
"path": output_file,
109+
"msg": "Successfully verified archive md5sums."
110+
}
90111
else:
91112
log.info("Verify of {} failed.".format(archive))
92-
return {"state": "error", "path": output_file, "msg": "Failed to verify archive md5sums."}
113+
return {
114+
"state": "error",
115+
"path": output_file,
116+
"msg": "Failed to verify archive md5sums."
117+
}

0 commit comments

Comments
 (0)