generated from opentensor/bittensor-subnet-template
-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathvalidator.py
393 lines (326 loc) · 14 KB
/
validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import asyncio
import atexit
import os
import signal
import sys
import time
from multiprocessing.managers import AcquirerProxy
from multiprocessing.synchronize import Event
import netaddr
import psutil
import requests
import torch
import torch.multiprocessing as mp
import wandb
from bittensor.core.extrinsics.serving import serve_extrinsic
from loguru import logger
from prompting.llms.model_manager import AsyncModelScheduler, ModelManager
from prompting.rewards.scoring import task_scorer
# ruff: noqa: E402
from shared import settings
from shared.logging import init_wandb
def init_process_logging(name: str):
"""Initialize logging for a new process."""
logger.remove() # Remove any existing handlers
try:
# Add process-specific handlers
logger.add(
f"{name}_{os.getpid()}.log",
rotation="100 MB",
retention="10 days",
level="DEBUG",
enqueue=True, # Use queue for thread-safe logging
)
logger.add(
f"{name}_err_{os.getpid()}.log", rotation="100 MB", retention="10 days", level="WARNING", enqueue=True
)
logger.add(sys.stderr, level=settings.shared_settings.LOG_LEVEL, enqueue=True)
except Exception as e:
print(f"Failed to initialize logging for process {os.getpid()}: {e}")
settings.shared_settings = settings.SharedSettings.load(mode="validator")
from prompting.llms.utils import GPUInfo
logger.remove()
logger.add("logfile.log", rotation="100 MB", retention="10 days", level="DEBUG")
logger.add("err.log", rotation="100 MB", retention="10 days", level="WARNING")
logger.add(sys.stderr, level=settings.shared_settings.LOG_LEVEL)
torch.multiprocessing.set_start_method("spawn", force=True)
async def create_loop_process(
model_scheduler: AsyncModelScheduler,
task_queue: list,
scoring_queue: list,
reward_events: list,
miners_dict: dict,
mp_lock: AcquirerProxy,
):
# Load settings and initialize external services.
settings.shared_settings = settings.SharedSettings.load(mode="validator")
if settings.shared_settings.WANDB_ON:
init_wandb(neuron="validator")
all_tasks: list[asyncio.Task] = []
async def cleanup(model_scheduler):
logger.info("Cleaning up resources...")
torch.distributed.destroy_process_group()
await model_scheduler.llm_model.cleanup()
for t in all_tasks:
t.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)
if settings.shared_settings.WANDB_ON:
wandb.finish()
logger.info("WandB run finished.")
async def spawn_loops(task_queue: list, scoring_queue: list, reward_events: list, miners_dict: dict):
# Import modules that are local to this scope.
from prompting.tasks.task_creation import task_loop
from shared.profiling import profiler
logger.info("Starting loops...")
profile = asyncio.create_task(profiler.print_stats(), name="Profiler")
task_loop_task = asyncio.create_task(
task_loop.start(task_queue, scoring_queue, miners_dict, simultaneous_loops=1), name="TaskLoop"
)
model_scheduler_task = asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler")
task_scorer_task = asyncio.create_task(
task_scorer.start(
model_scheduler,
scoring_queue,
reward_events,
mp_lock=mp_lock,
task_queue=task_queue,
simultaneous_loops=1,
),
name="TaskScorer",
)
all_tasks.extend([profile, task_loop_task, model_scheduler_task, task_scorer_task])
try:
while True:
await asyncio.sleep(10)
logger.debug(
f"Task Queue {len(task_queue)}. Scoring Queue {len(scoring_queue)}. Reward Events {len(reward_events)}"
)
if model_scheduler.memory_error is not None:
raise model_scheduler.memory_error
except asyncio.CancelledError:
logger.info("spawn_loops received cancellation signal.")
raise
try:
await spawn_loops(task_queue, scoring_queue, reward_events, miners_dict)
except MemoryError as e:
logger.error(f"MemoryError encountered. Terminating program: {e}")
await cleanup(model_scheduler)
sys.exit(1)
except Exception as e:
logger.exception(f"Terminating loop process: {e}")
finally:
await cleanup(model_scheduler)
def start_api(
scoring_queue: list,
reward_events: list,
miners_dict: dict,
event_stop: Event,
):
init_process_logging(name="APIProcess")
from prompting.api.api import start_scoring_api
logger.info("Starting API process...")
async def start():
try:
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
netaddr.IPAddress(external_ip)
serve_success = serve_extrinsic(
subtensor=settings.shared_settings.SUBTENSOR,
wallet=settings.shared_settings.WALLET,
ip=external_ip,
port=settings.shared_settings.SCORING_API_PORT,
protocol=4,
netuid=settings.shared_settings.NETUID,
)
logger.debug(f"Serve success: {serve_success}")
except Exception as e:
logger.warning(f"Failed to serve scoring api to chain: {e}")
await start_scoring_api(task_scorer, scoring_queue, reward_events, miners_dict)
while not event_stop.is_set():
await asyncio.sleep(10)
asyncio.run(start())
def start_task_sending_loop(
task_queue: list,
scoring_queue: list,
miners_dict: dict,
event_stop: Event,
):
init_process_logging(name="TaskSending") # Initialize logging for task sending process
async def spawn_loops(task_queue, scoring_queue, miners_dict: dict):
from prompting.tasks.task_sending import TaskSender
logger.info("Starting task sending loop in validator...")
task_sender = TaskSender()
asyncio.create_task(task_sender.start(task_queue, scoring_queue, miners_dict, simultaneous_loops=1))
logger.debug("Task sending loop started")
while not event_stop.is_set():
await asyncio.sleep(5)
logger.debug("Task sending loop is running")
try:
logger.info("Starting task sending loop in validator...")
asyncio.run(spawn_loops(task_queue, scoring_queue, miners_dict))
except Exception as e:
logger.exception(f"Task sending loop error: {e}")
raise
def start_availability_checking_loop(miners_dict: dict, event_stop: Event):
init_process_logging(name="AvailabilityChecking") # Initialize logging for availability checking process
async def spawn_loops(miners_dict: dict):
from prompting.miner_availability.miner_availability import availability_checking_loop
logger.info("Starting availability checking loop in validator...")
asyncio.create_task(availability_checking_loop.start(miners_dict))
while not event_stop.is_set():
await asyncio.sleep(5)
logger.debug("Availability checking loop is running")
try:
logger.info("Starting availability checking loop in validator...")
asyncio.run(spawn_loops(miners_dict))
except Exception as e:
logger.exception(f"Availability checking loop error: {e}")
raise
def start_weight_setter_loop(reward_events, event_stop: Event):
init_process_logging(name="WeightSetter") # Initialize logging for weight setter process
async def spawn_loops(reward_events):
from prompting.weight_setting.weight_setter import weight_setter
logger.info("Starting weight setter loop in validator...")
asyncio.create_task(weight_setter.start(reward_events))
while not event_stop.is_set():
await asyncio.sleep(5)
logger.debug("Weight setter loop is running")
try:
logger.info("Starting weight setter loop in validator...")
asyncio.run(spawn_loops(reward_events))
except Exception as e:
logger.exception(f"Weight setter loop error: {e}")
raise
def health_check(parent_pid: int, event_stop: Event):
init_process_logging(name="HealthCheck") # Initialize logging for health check process
"""Monitor parent process and kill all child processes in case of emergency."""
step = 0
while True:
try:
if not psutil.pid_exists(parent_pid):
event_stop.set()
logger.warning("Parent process died, killing all child processes")
os.killpg(0, signal.SIGKILL)
block = settings.shared_settings.block
if block - settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID] > 320 and step > 60:
event_stop.set()
last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID]
logger.warning(
f"Metagraph hasn't been updated for {block - last_update_block} blocks. "
f"Staled block: {block}, Last update: {last_update_block}"
)
os.killpg(0, signal.SIGKILL)
step += 1
except Exception as e:
logger.error(f"Failed to kill process group: {e}")
finally:
sys.exit(1)
time.sleep(60)
async def main(
cache_rewards: list | None = None,
cache_scores: list | None = None,
cache_tasks: list | None = None,
cache_miners: dict | None = None,
):
# will start checking the availability of miners at regular intervals, needed for API and Validator
with mp.Manager() as manager:
reward_events = manager.list(list(cache_rewards) if cache_rewards else [])
scoring_queue = manager.list(list(cache_scores) if cache_scores else [])
task_queue = manager.list(list(cache_tasks) if cache_tasks else [])
miners_dict = manager.dict(dict(cache_miners) if cache_miners else {})
mp_lock = manager.Lock()
processes: list[mp.Process] = []
tasks: list[asyncio.Task] = []
event_stop = mp.Event()
model_scheduler = AsyncModelScheduler(llm_model_manager=ModelManager(), mp_lock=mp_lock, sync=True)
try:
# Start checking the availability of miners at regular intervals
if settings.shared_settings.DEPLOY_SCORING_API and not settings.shared_settings.NEURON_DISABLE_SET_WEIGHTS:
# Use multiprocessing to bypass API blocking issue
api_process = mp.Process(
target=start_api,
args=(scoring_queue, reward_events, miners_dict, event_stop),
name="APIProcess",
daemon=True,
)
api_process.start()
processes.append(api_process)
availability_process = mp.Process(
target=start_availability_checking_loop,
args=(miners_dict, event_stop),
name="AvailabilityProcess",
daemon=True,
)
availability_process.start()
processes.append(availability_process)
loop_task = asyncio.create_task(
create_loop_process(
model_scheduler=model_scheduler,
task_queue=task_queue,
scoring_queue=scoring_queue,
reward_events=reward_events,
miners_dict=miners_dict,
mp_lock=mp_lock,
)
)
tasks.append(loop_task)
sending_task = mp.Process(
target=start_task_sending_loop,
args=(task_queue, scoring_queue, miners_dict, event_stop),
name="SendingTaskProcess",
daemon=True,
)
sending_task.start()
processes.append(sending_task)
weight_setter_process = mp.Process(
target=start_weight_setter_loop,
args=(reward_events, event_stop),
name="WeightSetterProcess",
daemon=True,
)
weight_setter_process.start()
processes.append(weight_setter_process)
# health_check_process = mp.Process(
# target=health_check,
# args=(os.getpid(), event_stop),
# name="HealthCheckProcess",
# daemon=True,
# )
# health_check_process.start()
# processes.append(health_check_process)
GPUInfo.log_gpu_info()
while True:
await asyncio.sleep(30)
except KeyboardInterrupt:
event_stop.set()
logger.info("KeyboardInterrupt detected. Shutting down gracefully...")
except Exception as e:
logger.error(f"Main loop error: {e}")
raise
finally:
logger.warning("🚨 Force‑killing entire process‑group")
# 1. Cancel in‑process tasks so they stop touching the Manager.
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(5)
# 2. Manager cleanup *first* (so its socket vanishes).
manager.shutdown()
# 3. Sledgehammer.
try:
os.killpg(0, signal.SIGKILL)
except Exception as e:
logger.error(f"Failed to kill process group: {e}")
sys.exit(1)
def kill_process_group():
try:
os.killpg(os.getpgid(0), signal.SIGKILL)
except Exception as e:
logger.error(f"Failed to kill process group: {e}")
# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
try:
os.setpgrp()
atexit.register(kill_process_group)
except BaseException:
logger.warning("Failed to set process group; emergency termination may not work.")
asyncio.run(main())