Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,6 @@

logger = get_logger('celery.beat')

# Copied from:
# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
# Changes:
# The second line from the bottom: The original Lua script intends
# to extend time to (lock remaining time + additional time); while
# the script here extend time to a expected expiration time.
# KEYS[1] - lock name
# ARGS[1] - token
# ARGS[2] - additional milliseconds
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_TO_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""

REDBEAT_REDIS_KEY = "redbeat_redis"
REDBEAT_SENTINEL_KEY = "redbeat_sentinel"
Expand Down Expand Up @@ -535,7 +510,7 @@ def maybe_due(self, entry, **kwargs):
def tick(self, min=min, **kwargs):
if self.lock_key:
logger.debug('beat: Extending lock...')
self.lock.extend(int(self.lock_timeout))
self.lock.extend(int(self.lock_timeout), replace_ttl=True)

remaining_times = []
try:
Expand Down Expand Up @@ -593,9 +568,6 @@ def acquire_distributed_beat_lock(sender=None, **kwargs):
timeout=scheduler.lock_timeout,
sleep=scheduler.max_interval,
)
# overwrite redis-py's extend script
# which will add additional timeout instead of extend to a new timeout
lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT)
lock.acquire()
logger.info('beat: Acquired lock')
scheduler.lock = lock