Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion apex/common/async_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,32 @@ def netuid(self) -> int:
def network(self) -> list[str]:
return self._network

async def full_burn(self) -> bool:
try:
metagraph = await self.metagraph()
subtensor = await self.subtensor()
netuid = 1
owner_hotkey = await subtensor.query_subtensor("SubnetOwnerHotkey", params=[netuid])
owner_uid = metagraph.hotkeys.index(owner_hotkey)
logger.info(f"Burning to {owner_hotkey} hotkey, {owner_uid} UID")
uids: list[int] = [owner_uid]
weights: list[float] = [1.0]
success, message = await subtensor.set_weights(
self.wallet,
netuid,
uids,
weights,
version_key=__spec_version__,
wait_for_inclusion=True,
wait_for_finalization=True,
)
if not success:
print(f"Failed to apply full burn: {message}")
return bool(success)
except Exception as exc:
logger.exception(f"Error during full burn: {exc}")
return False

async def set_weights(self, rewards: dict[str, float]) -> bool:
try:
metagraph = await self.metagraph()
Expand Down Expand Up @@ -138,7 +164,7 @@ async def set_weights(self, rewards: dict[str, float]) -> bool:
if not success:
logger.error(f"Error during weight set: {err}")
return bool(success)
except BaseException as exc:
except Exception as exc:
logger.exception(f"Error during weight set: {exc}")
return False

Expand Down
184 changes: 91 additions & 93 deletions apex/validator/miner_scorer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import asyncio
import json
import time
from collections.abc import AsyncGenerator, Iterable
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path

import aiosqlite
import numpy as np
from loguru import logger

from apex.common.async_chain import AsyncChain
from apex.common.constants import VALIDATOR_REFERENCE_LABEL
from apex.validator.weight_syncer import WeightSyncer

# Scoring moving average in hours. Set to be: immunity_period - post_reg_threshold.
Expand Down Expand Up @@ -90,90 +85,93 @@ async def set_scores(self) -> bool:
expose each one as plain python objects so that downstream code can work with them,
and remove rows that are older than the time window.
"""
logger.debug("Retrieving miner's performance history")
async with self._db() as conn: # type: aiosqlite.Connection
# Calculate the cutoff timestamp (current time - window hours).
cutoff_timestamp = int(time.time() - SCORE_MA_WINDOW_HOURS * 3600)

# 1. Fetch every row from the last SCORE_MA_WINDOW_HOURS.
try:
async with conn.execute(
"""
SELECT generator_hotkey, generator_score, discriminator_hotkeys, discriminator_scores
FROM discriminator_results
WHERE timestamp >= ?
""",
(cutoff_timestamp,),
) as cursor:
rows: Iterable[aiosqlite.Row] = await cursor.fetchall()
except BaseException as exc:
logger.exception(f"Exception during DB fetch: {exc}")
return False

# 2. Iterate over the in-memory list so that the caller can process freely.
hkey_agg_rewards: dict[str, float] = {}
rows_count = 0
for generator_hotkey, generator_score, disc_hotkeys_json, disc_scores_json in rows:
rows_count += 1
# Deserialize JSON columns.
disc_hotkeys = json.loads(disc_hotkeys_json)
disc_scores = json.loads(disc_scores_json)

# Create reward dictionary with generator and discriminator scores.
reward_dict = dict(zip(disc_hotkeys, disc_scores, strict=False))

if generator_hotkey != VALIDATOR_REFERENCE_LABEL:
# Skip validator generated references in score calculation.
reward_dict[generator_hotkey] = generator_score

# Update the aggregate rewards.
for hotkey, reward in reward_dict.items():
hkey_agg_rewards[hotkey] = float(hkey_agg_rewards.get(hotkey, 0.0)) + float(reward)

logger.debug(f"Fetched {rows_count} rows for scoring")
logger.debug(f"Total hotkeys to score: {len(hkey_agg_rewards)}")

# 3. Delete rows that are older than the time window.
logger.debug("Cleaning up expired miner's history")
try:
deleted_total = await asyncio.wait_for(self._delete_expired(conn, cutoff_timestamp), timeout=15)
logger.debug(f"Expired rows cleanup done: {deleted_total} rows")
except TimeoutError:
logger.warning("Timed out deleting expired rows; will retry next loop")

if self._debug:
record: dict[str, str | dict[str, float]] = {
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"rewards": hkey_agg_rewards,
}
with self._debug_rewards_path.open("a+") as fh:
record_str: str = json.dumps(record)
fh.write(f"{record_str}\n")

if self._weight_syncer is not None:
logger.debug("Attempting to perform weight synchronization")
try:
hkey_agg_rewards = await self._weight_syncer.compute_weighted_rewards(hkey_agg_rewards)
logger.debug(f"Total hotkeys to score after weight sync: {len(hkey_agg_rewards)}")
except BaseException as exc:
logger.error(f"Failed to compute weighted average rewards over the network, skipping: {exc}")

if hkey_agg_rewards:
rewards_array = np.array(list(hkey_agg_rewards.values()))
if rewards_array.min() < 0:
logger.warning(f"Negative reward detected: {rewards_array.min():.4f}, assigning zero value instead")
hkey_agg_rewards = {hkey: max(reward, 0) for hkey, reward in hkey_agg_rewards.items()}
logger.debug(
f"Setting weights to {len(hkey_agg_rewards)} hotkeys; "
f"reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}"
)
else:
logger.warning(f"Setting empty rewards: {hkey_agg_rewards}")

# TODO: Flush the db only on set_weights_result is True.
set_weights_result = await self.chain.set_weights(hkey_agg_rewards)

# 4. Flush all deletions in a single commit.
logger.debug("Updating rewards DB")
await conn.commit()
return set_weights_result
set_weights_result = await self.chain.full_burn()
return set_weights_result

# logger.debug("Retrieving miner's performance history")
# async with self._db() as conn: # type: aiosqlite.Connection
# # Calculate the cutoff timestamp (current time - window hours).
# cutoff_timestamp = int(time.time() - SCORE_MA_WINDOW_HOURS * 3600)

# # 1. Fetch every row from the last SCORE_MA_WINDOW_HOURS.
# try:
# async with conn.execute(
# """
# SELECT generator_hotkey, generator_score, discriminator_hotkeys, discriminator_scores
# FROM discriminator_results
# WHERE timestamp >= ?
# """,
# (cutoff_timestamp,),
# ) as cursor:
# rows: Iterable[aiosqlite.Row] = await cursor.fetchall()
# except BaseException as exc:
# logger.exception(f"Exception during DB fetch: {exc}")
# return False

# # 2. Iterate over the in-memory list so that the caller can process freely.
# hkey_agg_rewards: dict[str, float] = {}
# rows_count = 0
# for generator_hotkey, generator_score, disc_hotkeys_json, disc_scores_json in rows:
# rows_count += 1
# # Deserialize JSON columns.
# disc_hotkeys = json.loads(disc_hotkeys_json)
# disc_scores = json.loads(disc_scores_json)

# # Create reward dictionary with generator and discriminator scores.
# reward_dict = dict(zip(disc_hotkeys, disc_scores, strict=False))

# if generator_hotkey != VALIDATOR_REFERENCE_LABEL:
# # Skip validator generated references in score calculation.
# reward_dict[generator_hotkey] = generator_score

# # Update the aggregate rewards.
# for hotkey, reward in reward_dict.items():
# hkey_agg_rewards[hotkey] = float(hkey_agg_rewards.get(hotkey, 0.0)) + float(reward)

# logger.debug(f"Fetched {rows_count} rows for scoring")
# logger.debug(f"Total hotkeys to score: {len(hkey_agg_rewards)}")

# # 3. Delete rows that are older than the time window.
# logger.debug("Cleaning up expired miner's history")
# try:
# deleted_total = await asyncio.wait_for(self._delete_expired(conn, cutoff_timestamp), timeout=15)
# logger.debug(f"Expired rows cleanup done: {deleted_total} rows")
# except TimeoutError:
# logger.warning("Timed out deleting expired rows; will retry next loop")

# if self._debug:
# record: dict[str, str | dict[str, float]] = {
# "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# "rewards": hkey_agg_rewards,
# }
# with self._debug_rewards_path.open("a+") as fh:
# record_str: str = json.dumps(record)
# fh.write(f"{record_str}\n")

# if self._weight_syncer is not None:
# logger.debug("Attempting to perform weight synchronization")
# try:
# hkey_agg_rewards = await self._weight_syncer.compute_weighted_rewards(hkey_agg_rewards)
# logger.debug(f"Total hotkeys to score after weight sync: {len(hkey_agg_rewards)}")
# except BaseException as exc:
# logger.error(f"Failed to compute weighted average rewards over the network, skipping: {exc}")

# if hkey_agg_rewards:
# rewards_array = np.array(list(hkey_agg_rewards.values()))
# if rewards_array.min() < 0:
# logger.warning(f"Negative reward detected: {rewards_array.min():.4f}, assigning zero values")
# hkey_agg_rewards = {hkey: max(reward, 0) for hkey, reward in hkey_agg_rewards.items()}
# logger.debug(
# f"Setting weights to {len(hkey_agg_rewards)} hotkeys; "
# f"reward mean={rewards_array.mean():.4f} min={rewards_array.min():.4f}"
# )
# else:
# logger.warning(f"Setting empty rewards: {hkey_agg_rewards}")

# # TODO: Flush the db only on set_weights_result is True.
# set_weights_result = await self.chain.set_weights(hkey_agg_rewards)

# # 4. Flush all deletions in a single commit.
# logger.debug("Updating rewards DB")
# await conn.commit()
# return set_weights_result
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "apex"
version = "3.0.6"
version = "3.0.7"
description = "Bittensor Subnet 1: Apex"
readme = "README.md"
requires-python = "~=3.11"
Expand Down
Loading