-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathleader.py
More file actions
147 lines (122 loc) · 4.56 KB
/
leader.py
File metadata and controls
147 lines (122 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
MoltGrid Leader Election -- Redis-based leader election for multi-worker Uvicorn.
Only one worker should run background threads (scheduler, uptime, liveness,
usage reset, email queue, webhook delivery). This module uses Redis SET NX
with a TTL to elect a single leader among workers.
Graceful fallback: if Redis is unavailable, assumes leadership so that
single-worker deployments continue to work without Redis.
"""
import os
import time
import threading
import logging
logger = logging.getLogger("moltgrid.leader")
# Unique worker ID based on PID (each Uvicorn worker has a different PID)
WORKER_ID = f"worker-{os.getpid()}"
LEADER_KEY = "moltgrid:leader"
LEADER_TTL = 30 # seconds
RENEW_INTERVAL = 15 # seconds
_is_leader = False
_renew_thread = None
_stop_event = threading.Event()
def _get_redis_client():
"""Get a synchronous Redis client for leader election.
Uses a synchronous client (not async) because leader election runs
in background threads and during startup before the event loop.
"""
try:
import redis
from config import REDIS_URL
if not REDIS_URL:
return None
client = redis.from_url(
REDIS_URL,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
client.ping()
return client
except Exception as e:
logger.warning(f"Redis unavailable for leader election: {e}")
return None
def _renew_loop():
"""Background thread that renews the leader TTL every RENEW_INTERVAL seconds."""
global _is_leader
while not _stop_event.is_set():
try:
client = _get_redis_client()
if client is None:
# Redis gone, keep leadership assumption for graceful degradation
break
current = client.get(LEADER_KEY)
if current == WORKER_ID:
client.expire(LEADER_KEY, LEADER_TTL)
else:
# Lost leadership (another worker took over or key expired)
_is_leader = False
logger.warning(f"Lost leadership to {current}")
break
except Exception as e:
logger.error(f"Leader renewal error: {e}")
_stop_event.wait(RENEW_INTERVAL)
def acquire_leadership() -> bool:
"""Try to become the leader worker. Returns True if this worker is now leader.
If Redis is unavailable, returns True (fallback: assume single-worker mode).
"""
global _is_leader, _renew_thread, WORKER_ID
# Refresh worker ID in case PID changed (e.g. after fork)
WORKER_ID = f"worker-{os.getpid()}"
client = _get_redis_client()
if client is None:
logger.info(f"No Redis available. Worker {WORKER_ID} assumes leadership (single-worker fallback).")
_is_leader = True
return True
try:
# SET NX: only set if key does not exist
acquired = client.set(LEADER_KEY, WORKER_ID, nx=True, ex=LEADER_TTL)
if acquired:
_is_leader = True
logger.info(f"Worker {WORKER_ID} elected as leader.")
# Start renewal thread
_stop_event.clear()
_renew_thread = threading.Thread(target=_renew_loop, daemon=True, name="leader-renew")
_renew_thread.start()
return True
else:
current = client.get(LEADER_KEY)
_is_leader = False
logger.info(f"Worker {WORKER_ID} is follower. Leader is {current}.")
return False
except Exception as e:
logger.warning(f"Leader election failed: {e}. Assuming leadership (fallback).")
_is_leader = True
return True
def release_leadership() -> None:
"""Release leadership on shutdown. Only deletes the key if this worker owns it."""
global _is_leader
_stop_event.set() # Stop the renewal thread
if not _is_leader:
return
client = _get_redis_client()
if client is None:
_is_leader = False
return
try:
# Only delete if we still own the key (atomic check-and-delete via Lua)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
client.eval(lua_script, 1, LEADER_KEY, WORKER_ID)
logger.info(f"Worker {WORKER_ID} released leadership.")
except Exception as e:
logger.warning(f"Leader release error: {e}")
finally:
_is_leader = False
def is_leader() -> bool:
"""Check if the current worker is the leader."""
return _is_leader