Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ dependencies = [
"fastapi[all]",
"uvicorn",
"jinja2",
"huggingface-hub>=0.20",
"pyarrow>=14.0",
]

[project.optional-dependencies]
Expand All @@ -37,6 +39,9 @@ dev = [
[tool.setuptools.packages.find]
where = ["src"]

[tool.setuptools.package-data]
libkernelbot = ["sql/*.sql"]

[tool.coverage.run]
omit = ["src/libkernelbot/run_eval.py", "src/libkernelbot/launchers/*.py"]
relative_files = true
Expand Down
47 changes: 47 additions & 0 deletions src/kernelbot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,53 @@ async def admin_update_problems(
}


@app.post("/admin/export-hf")
async def admin_export_hf(
payload: dict,
_: Annotated[None, Depends(require_admin)],
db_context=Depends(get_db),
) -> dict:
"""Export competition submissions to a Hugging Face dataset as parquet.

Payload:
leaderboard_ids: list[int] - IDs of leaderboards to export
filename: str - parquet filename in the repo (e.g. "nvidia_nvfp4_submissions.parquet")
private: bool - if true, upload to private live repo; if false, upload to public repo (default: true)
"""
from libkernelbot.hf_export import export_to_hf

leaderboard_ids = payload.get("leaderboard_ids")
filename = payload.get("filename")
private = payload.get("private", True)

if not isinstance(leaderboard_ids, list) or not leaderboard_ids:
raise HTTPException(status_code=400, detail="leaderboard_ids must be a non-empty list of integers")
if not all(isinstance(leaderboard_id, int) for leaderboard_id in leaderboard_ids):
raise HTTPException(status_code=400, detail="leaderboard_ids must be a non-empty list of integers")
if not isinstance(filename, str) or not filename.endswith(".parquet"):
raise HTTPException(status_code=400, detail="filename must end with .parquet")
if not env.HF_TOKEN:
raise HTTPException(status_code=500, detail="HF_TOKEN not configured")

repo_id = env.HF_PUBLIC_DATASET if not private else env.HF_PRIVATE_DATASET

try:
with db_context as db:
result = export_to_hf(
db=db,
leaderboard_ids=leaderboard_ids,
repo_id=repo_id,
filename=filename,
token=env.HF_TOKEN,
private=private,
)
return {"status": "ok", **result}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export failed: {e}") from e


@app.get("/leaderboards")
async def get_leaderboards(db_context=Depends(get_db)):
"""An endpoint that returns all leaderboards.
Expand Down
106 changes: 106 additions & 0 deletions src/kernelbot/cogs/admin_cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ def __init__(self, bot: "ClusterBot"):
name="set-forum-ids", description="Sets forum IDs"
)(self.set_forum_ids)

self.export_to_hf = bot.admin_group.command(
name="export-hf", description="Export competition data to Hugging Face dataset"
)(self.export_to_hf)

self._scheduled_cleanup_temp_users.start()
if env.HF_TOKEN:
self._scheduled_hf_export.start()

# --------------------------------------------------------------------------
# | HELPER FUNCTIONS |
Expand Down Expand Up @@ -873,6 +879,106 @@ async def _scheduled_cleanup_temp_users(self):
db.cleanup_temp_users()
logger.info("Temporary users cleanup completed")

@tasks.loop(hours=24)
async def _scheduled_hf_export(self):
"""Daily export of active competition submissions to private HF dataset.

Once a competition expires, it drops out of the scheduled export set. If
there are still results settling after the deadline, a manual export is
needed once the queue drains. Currently public HF dataset releases are
handled manually.
"""
from libkernelbot.hf_export import export_to_hf, get_active_competition_leaderboards

try:
with self.bot.leaderboard_db as db:
leaderboards = db.get_leaderboards()
active = get_active_competition_leaderboards(
leaderboards,
now=datetime.now(timezone.utc),
)

if not active:
logger.info("HF export: no active competitions, skipping")
return

leaderboard_ids = [lb["id"] for lb in active]
result = export_to_hf(
db=db,
leaderboard_ids=leaderboard_ids,
repo_id=env.HF_PRIVATE_DATASET,
filename="active_submissions.parquet",
token=env.HF_TOKEN,
private=True,
)
logger.info("Scheduled HF export complete: %s", result)
except Exception:
logger.exception("Scheduled HF export failed")

@_scheduled_hf_export.before_loop
async def _before_hf_export(self):
await self.bot.wait_until_ready()

@discord.app_commands.describe(
leaderboard_name="Name of the competition to export",
filename="Parquet filename (default: <leaderboard_name>.parquet)",
private="Upload to private repo (default: true)",
)
@discord.app_commands.autocomplete(leaderboard_name=leaderboard_name_autocomplete)
@with_error_handling
async def export_to_hf(
self,
interaction: discord.Interaction,
leaderboard_name: str,
filename: Optional[str] = None,
private: bool = True,
):
from libkernelbot.hf_export import export_to_hf as do_export

is_admin = await self.admin_check(interaction)
if not is_admin:
await send_discord_message(
interaction,
"You need to have Admin permissions to run this command",
ephemeral=True,
)
return

if not env.HF_TOKEN:
await send_discord_message(interaction, "HF_TOKEN not configured.", ephemeral=True)
return

await interaction.response.defer(ephemeral=True)

if filename is None:
filename = f"{leaderboard_name}.parquet"
if not filename.endswith(".parquet"):
filename += ".parquet"

repo_id = env.HF_PRIVATE_DATASET if private else env.HF_PUBLIC_DATASET

try:
with self.bot.leaderboard_db as db:
lb_id = db.get_leaderboard_id(leaderboard_name)
result = do_export(
db=db,
leaderboard_ids=[lb_id],
repo_id=repo_id,
filename=filename,
token=env.HF_TOKEN,
private=private,
)
await send_discord_message(
interaction,
f"Exported {result['rows']} rows to `{repo_id}/{filename}`.",
ephemeral=True,
)
except ValueError as e:
await send_discord_message(interaction, str(e), ephemeral=True)
except Exception as e:
logger.error("HF export failed: %s", e, exc_info=True)
await send_discord_message(interaction, f"Export failed: {e}", ephemeral=True)

####################################################################################################################
# MIGRATION COMMANDS --- TO BE DELETED LATER
####################################################################################################################
Expand Down
3 changes: 3 additions & 0 deletions src/kernelbot/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
env.DISCORD_DEBUG_CLUSTER_STAGING_ID = os.getenv("DISCORD_DEBUG_CLUSTER_STAGING_ID")

env.ADMIN_TOKEN = os.getenv("ADMIN_TOKEN")
env.HF_TOKEN = os.getenv("HF_TOKEN")
env.HF_PRIVATE_DATASET = os.getenv("HF_PRIVATE_DATASET", "GPUMODE/kernelbot-data-live")
env.HF_PUBLIC_DATASET = os.getenv("HF_PUBLIC_DATASET", "GPUMODE/kernelbot-data")

# Only required to run the CLI against this instance
# setting these is required only to run the CLI against local instance
Expand Down
186 changes: 186 additions & 0 deletions src/libkernelbot/hf_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""Export competition submissions to Hugging Face datasets as parquet files."""

import io
import tempfile
from datetime import datetime, timezone
from importlib.resources import files

import pyarrow as pa
import pyarrow.parquet as pq
from huggingface_hub import HfApi

from libkernelbot.leaderboard_db import LeaderboardDB
from libkernelbot.utils import setup_logging

logger = setup_logging(__name__)
HF_EXPORT_ROWS_SQL = files("libkernelbot").joinpath("sql/get_hf_export_rows.sql").read_text(
encoding="utf-8"
)

# Explicit schema matching GPUMODE/kernelbot-data nvidia_nvfp4_submissions.parquet
SUBMISSIONS_SCHEMA = pa.schema([
("submission_id", pa.int64()),
("leaderboard_id", pa.int64()),
("problem_name", pa.large_string()),
("user_id", pa.large_string()),
("user_name", pa.large_string()),
("code_id", pa.int64()),
("file_name", pa.large_string()),
("submission_time", pa.timestamp("us", tz="UTC")),
("status", pa.large_string()),
("score", pa.float64()),
("passed", pa.bool_()),
("mode", pa.large_string()),
("runner", pa.large_string()),
("code", pa.large_string()),
])


def _normalize_deadline(deadline: datetime) -> datetime:
"""Ensure deadlines are timezone-aware before comparing them."""
if deadline.tzinfo is None:
return deadline.replace(tzinfo=timezone.utc)
return deadline


MAX_COMPETITION_HORIZON_DAYS = 365


def get_active_competition_leaderboards(
leaderboards: list[dict],
*,
now: datetime | None = None,
) -> list[dict]:
"""Return leaderboards that belong to real, active competitions.

Filters out:
- Expired leaderboards (deadline <= now)
- Dev leaderboards (name ending with "-dev")
- Permanent/practice leaderboards (deadline > 1 year from now, e.g. year 2100)
"""
if now is None:
now = datetime.now(timezone.utc)

from datetime import timedelta

horizon = now + timedelta(days=MAX_COMPETITION_HORIZON_DAYS)

active_competitions = []
for leaderboard in leaderboards:
deadline = _normalize_deadline(leaderboard["deadline"])
if deadline > now and deadline < horizon and not leaderboard["name"].endswith("-dev"):
active_competitions.append(leaderboard)
return active_competitions


def ensure_public_export_allowed(
db: LeaderboardDB,
leaderboard_ids: list[int],
*,
now: datetime | None = None,
) -> None:
"""Block public exports while any selected leaderboard is still active."""
if now is None:
now = datetime.now(timezone.utc)

selected_ids = set(leaderboard_ids)
active_names = []
for leaderboard in db.get_leaderboards():
if leaderboard["id"] not in selected_ids:
continue
deadline = _normalize_deadline(leaderboard["deadline"])
if deadline > now:
active_names.append(leaderboard["name"])

if active_names:
active_names.sort()
raise ValueError(
"Cannot export active leaderboards to the public dataset: "
+ ", ".join(active_names)
)


def get_hf_export_rows(db: LeaderboardDB, leaderboard_ids: list[int]) -> list[dict]:
"""Fetch deduplicated submissions for export."""
if not leaderboard_ids:
return []

db.cursor.execute(HF_EXPORT_ROWS_SQL, (leaderboard_ids,))

columns = [
"submission_id", "leaderboard_id", "problem_name", "user_id", "user_name",
"code_id", "file_name", "submission_time", "status", "score", "passed",
"mode", "runner", "code",
]
return [dict(zip(columns, row, strict=True)) for row in db.cursor.fetchall()]


def rows_to_parquet_bytes(rows: list[dict]) -> bytes:
"""Convert a list of row dicts to parquet bytes using the canonical schema."""
if not rows:
table = pa.table({field.name: pa.array([], type=field.type) for field in SUBMISSIONS_SCHEMA})
else:
for row in rows:
if row.get("user_id") is not None:
row["user_id"] = str(row["user_id"])
if row.get("user_name") is None:
row["user_name"] = ""
if row.get("score") is not None:
row["score"] = float(row["score"])
table = pa.Table.from_pylist(rows, schema=SUBMISSIONS_SCHEMA)

buf = io.BytesIO()
pq.write_table(table, buf, compression="snappy")
return buf.getvalue()


def export_to_hf(
db: LeaderboardDB,
leaderboard_ids: list[int],
repo_id: str,
filename: str,
token: str,
private: bool = True,
) -> dict:
"""Export deduplicated submissions to a HF dataset repo as a parquet file.

Returns a summary dict with row count and repo info.
"""
if not private:
ensure_public_export_allowed(db, leaderboard_ids)

api = HfApi(token=token)
api.create_repo(repo_id, repo_type="dataset", private=private, exist_ok=True)

rows = get_hf_export_rows(db, leaderboard_ids)
parquet_bytes = rows_to_parquet_bytes(rows)
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp:
tmp.write(parquet_bytes)
tmp.flush()
api.upload_file(
path_or_fileobj=tmp.name,
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset",
)

logger.info("Exported %d rows to %s/%s", len(rows), repo_id, filename)
return {"rows": len(rows), "repo_id": repo_id, "filename": filename}


def publish_to_public_repo(
db: LeaderboardDB,
leaderboard_ids: list[int],
public_repo_id: str,
filename: str,
token: str,
) -> dict:
"""Export final competition data to the public dataset repo."""
return export_to_hf(
db=db,
leaderboard_ids=leaderboard_ids,
repo_id=public_repo_id,
filename=filename,
token=token,
private=False,
)
Loading