Skip to content

Commit 1434646

Browse files
authored
Pickle dicts (#123)
* dicts when pickling and better error catching * returning JobResult and JobDef * fix linting, tests for unpicklable * more tests for pickling * correct docs
1 parent 3183b25 commit 1434646

File tree

10 files changed

+206
-89
lines changed

10 files changed

+206
-89
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ __pycache__/
1616
/demo/tmp/
1717
.vscode/
1818
.venv/
19+
/.auto-format

HISTORY.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
History
44
-------
55

6+
v0.16.0b1 (2019-04-23)
7+
......................
8+
* use dicts for pickling not tuples, better handling of pickling errors, #123
9+
610
v0.16.0a5 (2019-04-22)
711
......................
812
* use ``pipeline`` in ``enqueue_job``

arq/connections.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
import asyncio
22
import logging
3-
import pickle
43
from dataclasses import dataclass
54
from datetime import datetime, timedelta
6-
from operator import itemgetter
7-
from typing import Any, Dict, List, Optional, Union
5+
from operator import attrgetter
6+
from typing import Any, List, Optional, Union
87
from uuid import uuid4
98

109
import aioredis
1110
from aioredis import MultiExecError, Redis
1211

1312
from .constants import job_key_prefix, queue_name, result_key_prefix
14-
from .jobs import Job
13+
from .jobs import Job, JobResult, pickle_job
1514
from .utils import timestamp_ms, to_ms, to_unix_ms
1615

1716
logger = logging.getLogger('arq.connections')
@@ -96,7 +95,7 @@ async def enqueue_job(
9695

9796
expires_ms = expires_ms or score - enqueue_time_ms + expires_extra_ms
9897

99-
job = pickle.dumps((enqueue_time_ms, _job_try, function, args, kwargs))
98+
job = pickle_job(function, args, kwargs, _job_try, enqueue_time_ms)
10099
tr = conn.multi_exec()
101100
tr.psetex(job_key, expires_ms, job)
102101
tr.zadd(queue_name, score, job_id)
@@ -111,16 +110,16 @@ async def _get_job_result(self, key):
111110
job_id = key[len(result_key_prefix) :]
112111
job = Job(job_id, self)
113112
r = await job.result_info()
114-
r['job_id'] = job_id
113+
r.job_id = job_id
115114
return r
116115

117-
async def all_job_results(self) -> List[Dict]:
116+
async def all_job_results(self) -> List[JobResult]:
118117
"""
119118
Get results for all jobs in redis.
120119
"""
121120
keys = await self.keys(result_key_prefix + '*')
122121
results = await asyncio.gather(*[self._get_job_result(k) for k in keys])
123-
return sorted(results, key=itemgetter('enqueue_time'))
122+
return sorted(results, key=attrgetter('enqueue_time'))
124123

125124

126125
async def create_pool(settings: RedisSettings = None, *, _retry: int = 0) -> ArqRedis:

arq/jobs.py

Lines changed: 107 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import asyncio
2+
import logging
23
import pickle
4+
from dataclasses import dataclass
5+
from datetime import datetime
36
from enum import Enum
4-
from typing import Any, Dict, Optional
7+
from typing import Any, Optional
58

69
from .constants import in_progress_key_prefix, job_key_prefix, queue_name, result_key_prefix
710
from .utils import ms_to_datetime, poll, timestamp_ms
811

12+
logger = logging.getLogger('arq.jobs')
13+
914

1015
class JobStatus(str, Enum):
1116
"""
@@ -24,6 +29,25 @@ class JobStatus(str, Enum):
2429
not_found = 'not_found'
2530

2631

32+
@dataclass
33+
class JobDef:
34+
function: str
35+
args: tuple
36+
kwargs: dict
37+
job_try: int
38+
enqueue_time: datetime
39+
score: Optional[int]
40+
41+
42+
@dataclass
43+
class JobResult(JobDef):
44+
success: bool
45+
result: Any
46+
start_time: datetime
47+
finish_time: datetime
48+
job_id: Optional[str] = None
49+
50+
2751
class Job:
2852
"""
2953
Holds data a reference to a job.
@@ -46,53 +70,35 @@ async def result(self, timeout: Optional[float] = None, *, pole_delay: float = 0
4670
async for delay in poll(pole_delay):
4771
info = await self.result_info()
4872
if info:
49-
result = info['result']
50-
if info['success']:
73+
result = info.result
74+
if info.success:
5175
return result
5276
else:
5377
raise result
5478
if timeout is not None and delay > timeout:
5579
raise asyncio.TimeoutError()
5680

57-
async def info(self) -> Optional[Dict[str, Any]]:
81+
async def info(self) -> Optional[JobDef]:
5882
"""
5983
All information on a job, including its result if it's available, does not wait for the result.
6084
"""
6185
info = await self.result_info()
6286
if not info:
6387
v = await self._redis.get(job_key_prefix + self.job_id, encoding=None)
6488
if v:
65-
enqueue_time_ms, job_try, function, args, kwargs = pickle.loads(v)
66-
info = dict(
67-
enqueue_time=ms_to_datetime(enqueue_time_ms),
68-
job_try=job_try,
69-
function=function,
70-
args=args,
71-
kwargs=kwargs,
72-
)
89+
info = unpickle_job(v)
7390
if info:
74-
info['score'] = await self._redis.zscore(queue_name, self.job_id)
91+
info.score = await self._redis.zscore(queue_name, self.job_id)
7592
return info
7693

77-
async def result_info(self) -> Optional[Dict[str, Any]]:
94+
async def result_info(self) -> Optional[JobResult]:
7895
"""
7996
Information about the job result if available, does not wait for the result. Does not raise an exception
8097
even if the job raised one.
8198
"""
8299
v = await self._redis.get(result_key_prefix + self.job_id, encoding=None)
83100
if v:
84-
enqueue_time_ms, job_try, function, args, kwargs, s, r, start_time_ms, finish_time_ms = pickle.loads(v)
85-
return dict(
86-
enqueue_time=ms_to_datetime(enqueue_time_ms),
87-
job_try=job_try,
88-
function=function,
89-
args=args,
90-
kwargs=kwargs,
91-
result=r,
92-
success=s,
93-
start_time=ms_to_datetime(start_time_ms),
94-
finish_time=ms_to_datetime(finish_time_ms),
95-
)
101+
return unpickle_result(v)
96102

97103
async def status(self) -> JobStatus:
98104
"""
@@ -110,3 +116,78 @@ async def status(self) -> JobStatus:
110116

111117
def __repr__(self):
112118
return f'<arq job {self.job_id}>'
119+
120+
121+
class PickleError(RuntimeError):
122+
pass
123+
124+
125+
def pickle_job(function_name: str, args: tuple, kwargs: dict, job_try: int, enqueue_time_ms: int):
126+
data = {'t': job_try, 'f': function_name, 'a': args, 'k': kwargs, 'et': enqueue_time_ms}
127+
try:
128+
return pickle.dumps(data)
129+
except Exception as e:
130+
raise PickleError(f'unable to pickle job "{function_name}"') from e
131+
132+
133+
def pickle_result(
134+
function: str,
135+
args: tuple,
136+
kwargs: dict,
137+
job_try: int,
138+
enqueue_time_ms: int,
139+
success: bool,
140+
result: Any,
141+
start_ms: int,
142+
finished_ms: int,
143+
ref: str,
144+
) -> Optional[bytes]:
145+
data = {
146+
't': job_try,
147+
'f': function,
148+
'a': args,
149+
'k': kwargs,
150+
'et': enqueue_time_ms,
151+
's': success,
152+
'r': result,
153+
'st': start_ms,
154+
'ft': finished_ms,
155+
}
156+
try:
157+
return pickle.dumps(data)
158+
except Exception:
159+
logger.warning('error pickling result of %s', ref, exc_info=True)
160+
161+
data.update(r=PickleError('unable to pickle result'), s=False)
162+
try:
163+
return pickle.dumps(data)
164+
except Exception:
165+
logger.critical('error pickling result of %s even after replacing result', ref, exc_info=True)
166+
167+
168+
def unpickle_job(r: bytes) -> JobDef:
169+
d = pickle.loads(r)
170+
return JobDef(
171+
function=d['f'], args=d['a'], kwargs=d['k'], job_try=d['t'], enqueue_time=ms_to_datetime(d['et']), score=None
172+
)
173+
174+
175+
def unpickle_job_raw(r: bytes) -> tuple:
176+
d = pickle.loads(r)
177+
return d['f'], d['a'], d['k'], d['t'], d['et']
178+
179+
180+
def unpickle_result(r: bytes) -> JobResult:
181+
d = pickle.loads(r)
182+
return JobResult(
183+
job_try=d['t'],
184+
function=d['f'],
185+
args=d['a'],
186+
kwargs=d['k'],
187+
enqueue_time=ms_to_datetime(d['et']),
188+
score=None,
189+
success=d['s'],
190+
result=d['r'],
191+
start_time=ms_to_datetime(d['st']),
192+
finish_time=ms_to_datetime(d['ft']),
193+
)

arq/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.16a5')
5+
VERSION = StrictVersion('0.16b1')

arq/worker.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import inspect
33
import logging
4-
import pickle
54
import signal
65
from dataclasses import dataclass
76
from datetime import datetime
@@ -15,6 +14,7 @@
1514
from pydantic.utils import import_string
1615

1716
from arq.cron import CronJob
17+
from arq.jobs import pickle_result, unpickle_job_raw
1818

1919
from .connections import ArqRedis, RedisSettings, create_pool, log_redis_info
2020
from .constants import (
@@ -107,7 +107,7 @@ def __init__(self, count, job_results):
107107

108108
def __str__(self):
109109
if self.count == 1 and self.job_results:
110-
exc = self.job_results[0]['result']
110+
exc = self.job_results[0].result
111111
return f'1 job failed "{exc.__class__.__name__}: {exc}"'
112112
else:
113113
return f'{self.count} jobs failed'
@@ -219,7 +219,7 @@ async def run_check(self) -> int:
219219
"""
220220
await self.async_run()
221221
if self.jobs_failed:
222-
failed_job_results = [r for r in await self.pool.all_job_results() if not r['success']]
222+
failed_job_results = [r for r in await self.pool.all_job_results() if not r.success]
223223
raise FailedJobs(self.jobs_failed, failed_job_results)
224224
else:
225225
return self.jobs_complete
@@ -290,7 +290,7 @@ async def run_job(self, job_id, score): # noqa: C901
290290
self.jobs_failed += 1
291291
return await asyncio.shield(self.abort_job(job_id))
292292

293-
enqueue_time_ms, enqueue_job_try, function_name, args, kwargs = pickle.loads(v)
293+
function_name, args, kwargs, enqueue_job_try, enqueue_time_ms = unpickle_job_raw(v)
294294

295295
try:
296296
function: Union[Function, CronJob] = self.functions[function_name]
@@ -376,11 +376,9 @@ async def run_job(self, job_id, score): # noqa: C901
376376
result_timeout_s = self.keep_result_s if function.keep_result_s is None else function.keep_result_s
377377
result_data = None
378378
if result is not no_result and result_timeout_s > 0:
379-
d = enqueue_time_ms, job_try, function_name, args, kwargs, success, result, start_ms, finished_ms
380-
try:
381-
result_data = pickle.dumps(d)
382-
except Exception:
383-
logger.critical('error pickling result of %s', ref, exc_info=True)
379+
result_data = pickle_result(
380+
function_name, args, kwargs, job_try, enqueue_time_ms, success, result, start_ms, finished_ms, ref
381+
)
384382

385383
await asyncio.shield(self.finish_job(job_id, finish, result_data, result_timeout_s, incr_score))
386384

docs/examples/job_results.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ async def main():
2525
debug(await job.info())
2626
"""
2727
> docs/examples/job_results.py:23 main
28-
{
29-
'enqueue_time': datetime.datetime(2019, 3, 3, 12, 32, 19, 975000),
30-
'function': 'the_task',
31-
'args': (),
32-
'kwargs': {},
33-
'score': 1551616339975,
34-
} (dict) len=5
28+
JobDef(
29+
function='the_task',
30+
args=(),
31+
kwargs={},
32+
job_try=None,
33+
enqueue_time=datetime.datetime(2019, 4, 23, 13, 58, 56, 781000),
34+
score=1556027936781
35+
) (JobDef)
3536
"""
3637

3738
# get the Job's status

0 commit comments

Comments
 (0)