Skip to content

Commit 527d227

Browse files
committed
fix retry
1 parent 4ba5054 commit 527d227

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

celery_singleton/singleton.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Singleton(BaseTask):
2222
raise_on_duplicate = None
2323
lock_expiry = None
2424

25+
def __init__(self, *args, **kwargs):
26+
self._unlock_to_super_retry = False
27+
super(Singleton, self).__init__(*args, **kwargs)
28+
2529
@property
2630
def _raise_on_duplicate(self):
2731
if self.raise_on_duplicate is not None:
@@ -75,17 +79,8 @@ def generate_lock(self, task_name, task_args=None, task_kwargs=None):
7579
key_prefix=self.singleton_config.key_prefix,
7680
)
7781

78-
def apply_async(
79-
self,
80-
args=None,
81-
kwargs=None,
82-
task_id=None,
83-
producer=None,
84-
link=None,
85-
link_error=None,
86-
shadow=None,
87-
**options
88-
):
82+
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
83+
link=None, link_error=None, shadow=None, **options):
8984
args = args or []
9085
kwargs = kwargs or {}
9186
task_id = task_id or uuid()
@@ -117,14 +112,15 @@ def apply_async(
117112

118113
def lock_and_run(self, lock, *args, task_id=None, **kwargs):
119114
lock_aquired = self.aquire_lock(lock, task_id)
120-
if lock_aquired:
115+
if lock_aquired or self._unlock_to_super_retry:
121116
try:
122117
return super(Singleton, self).apply_async(
123118
*args, task_id=task_id, **kwargs
124119
)
125120
except Exception:
126121
# Clear the lock if apply_async fails
127-
self.unlock(lock)
122+
if lock_aquired:
123+
self.unlock(lock)
128124
raise
129125

130126
def release_lock(self, task_args=None, task_kwargs=None):
@@ -137,7 +133,9 @@ def unlock(self, lock):
137133
def on_duplicate(self, existing_task_id):
138134
if self._raise_on_duplicate:
139135
raise DuplicateTaskError(
140-
"Attempted to queue a duplicate of task ID {}".format(existing_task_id),
136+
"Attempted to queue a duplicate of task ID {}".format(
137+
existing_task_id
138+
),
141139
task_id=existing_task_id,
142140
)
143141
return self.AsyncResult(existing_task_id)
@@ -147,3 +145,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
147145

148146
def on_success(self, retval, task_id, args, kwargs):
149147
self.release_lock(task_args=args, task_kwargs=kwargs)
148+
149+
def retry(self, args=None, kwargs=None, exc=None, throw=True,
150+
eta=None, countdown=None, max_retries=None, **options):
151+
self._unlock_to_super_retry = True
152+
retry_task = super(Singleton, self).retry(
153+
args, kwargs, exc, throw, eta, countdown, max_retries, **options
154+
)
155+
self._unlock_to_super_retry = False
156+
157+
return retry_task

0 commit comments

Comments
 (0)