Skip to content

Commit b486a29

Browse files
lsaavedrLuis Saavedra
authored and
Luis Saavedra
committed
fix retry
1 parent 9821cbb commit b486a29

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

celery_singleton/singleton.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Singleton(BaseTask):
2222
raise_on_duplicate = None
2323
lock_expiry = None
2424

25+
def __init__(self, *args, **kwargs):
26+
self._unlock_to_super_retry = False
27+
super(Singleton, self).__init__(*args, **kwargs)
28+
2529
@property
2630
def _raise_on_duplicate(self):
2731
if self.raise_on_duplicate is not None:
@@ -78,17 +82,8 @@ def generate_lock(self, task_name, task_args=None, task_kwargs=None):
7882
key_prefix=self.singleton_config.key_prefix,
7983
)
8084

81-
def apply_async(
82-
self,
83-
args=None,
84-
kwargs=None,
85-
task_id=None,
86-
producer=None,
87-
link=None,
88-
link_error=None,
89-
shadow=None,
90-
**options
91-
):
85+
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
86+
link=None, link_error=None, shadow=None, **options):
9287
args = args or []
9388
kwargs = kwargs or {}
9489
task_id = task_id or uuid()
@@ -120,14 +115,15 @@ def apply_async(
120115

121116
def lock_and_run(self, lock, *args, task_id=None, **kwargs):
122117
lock_aquired = self.aquire_lock(lock, task_id)
123-
if lock_aquired:
118+
if lock_aquired or self._unlock_to_super_retry:
124119
try:
125120
return super(Singleton, self).apply_async(
126121
*args, task_id=task_id, **kwargs
127122
)
128123
except Exception:
129124
# Clear the lock if apply_async fails
130-
self.unlock(lock)
125+
if lock_aquired:
126+
self.unlock(lock)
131127
raise
132128

133129
def release_lock(self, task_args=None, task_kwargs=None):
@@ -140,7 +136,9 @@ def unlock(self, lock):
140136
def on_duplicate(self, existing_task_id):
141137
if self._raise_on_duplicate:
142138
raise DuplicateTaskError(
143-
"Attempted to queue a duplicate of task ID {}".format(existing_task_id),
139+
"Attempted to queue a duplicate of task ID {}".format(
140+
existing_task_id
141+
),
144142
task_id=existing_task_id,
145143
)
146144
return self.AsyncResult(existing_task_id)
@@ -150,3 +148,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
150148

151149
def on_success(self, retval, task_id, args, kwargs):
152150
self.release_lock(task_args=args, task_kwargs=kwargs)
151+
152+
def retry(self, args=None, kwargs=None, exc=None, throw=True,
153+
eta=None, countdown=None, max_retries=None, **options):
154+
self._unlock_to_super_retry = True
155+
retry_task = super(Singleton, self).retry(
156+
args, kwargs, exc, throw, eta, countdown, max_retries, **options
157+
)
158+
self._unlock_to_super_retry = False
159+
160+
return retry_task

0 commit comments

Comments
 (0)