-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexport.py
More file actions
317 lines (257 loc) · 11.5 KB
/
export.py
File metadata and controls
317 lines (257 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import argparse
import gc
import os
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import pyarrow as pa
import pyarrow.parquet as pq
load_dotenv()
DB_USER = os.environ.get("DB_USER")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
if not DB_USER or not DB_PASSWORD:
raise ValueError(
"DB_USER and DB_PASSWORD environment variables must be set. "
"Please create a .env file or export them in your shell."
)
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_NAME = os.environ.get("DB_NAME", "postgres")
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# The leaderboard IDs to export
LEADERBOARD_IDS = [463, 430, 399, 398, 563, 564, 565]
def fetch_and_save_leaderboards(engine, leaderboard_ids, output_path):
"""
Fetches leaderboard data from the database and saves it directly to parquet.
This function queries the database for specific leaderboards, selecting
key fields and fetching all associated GPU types for each leaderboard
using a subquery. It saves the leaderboards directly to parquet.
Args:
engine: The SQLAlchemy engine instance for database connection.
leaderboard_ids: A list of integer IDs for the leaderboards to fetch.
Returns:
The number of leaderboards.
"""
print("Fetching and saving leaderboards...")
query = text("""
SELECT
id,
name,
deadline AT TIME ZONE 'UTC' AS deadline,
task->>'lang' AS lang,
description,
task->'files'->>'reference.py' AS reference,
(
SELECT array_agg(gpu_type)
FROM leaderboard.gpu_type
WHERE leaderboard_id = leaderboard.leaderboard.id
) AS gpu_types
FROM leaderboard.leaderboard
WHERE id = ANY(:leaderboard_ids)
""")
df = pd.read_sql_query(query, engine, params={'leaderboard_ids': leaderboard_ids})
df.to_parquet(output_path, index=False)
print(f"Leaderboards saved to {output_path}")
def anonymize_users_in_db(engine, leaderboard_ids):
"""Create a temporary mapping table in the database."""
with engine.begin() as conn:
# Create temporary table with anonymized IDs
conn.execute(text("""
CREATE TEMP TABLE user_mapping AS
SELECT
user_id as original_user_id,
ROW_NUMBER() OVER (ORDER BY RANDOM()) as anonymized_user_id
FROM (
SELECT DISTINCT user_id
FROM leaderboard.submission
WHERE leaderboard_id = ANY(:leaderboard_ids)
) AS distinct_users
"""), {'leaderboard_ids': leaderboard_ids})
def handle_empty_structs(df):
"""
Replace empty struct/dict values with None to avoid PyArrow serialization errors.
PyArrow cannot write empty struct types to Parquet. This function checks
columns that contain dict/struct values and replaces empty ones with None.
"""
for col in df.columns:
if df[col].dtype == 'object':
# Check if column contains dict-like objects
sample = df[col].dropna().head(1)
if len(sample) > 0 and isinstance(sample.iloc[0], dict):
# Replace empty dicts with None
df[col] = df[col].apply(lambda x: None if isinstance(x, dict) and len(x) == 0 else x)
return df
def fetch_and_save_submissions(engine, leaderboard_ids, output_path, chunksize=8192):
"""
Fetches and processes submission data from the database.
This function queries the database for submissions associated with specific
leaderboards. It performs a join across `submission`, `runs`, and
`code_files` tables to create a denormalized view of the submission,
its execution details, and the source code.
Args:
engine: The SQLAlchemy engine instance for database connection.
leaderboard_ids: A list of integer IDs for the leaderboards whose
submissions are to be fetched.
"""
print("Fetching submissions...")
query = """
SELECT
s.id AS submission_id,
s.leaderboard_id,
um.anonymized_user_id AS user_id,
s.submission_time AT TIME ZONE 'UTC' AS submission_time,
s.file_name,
c.code,
c.id AS code_id,
r.id AS run_id,
r.start_time AT TIME ZONE 'UTC' AS run_start_time,
r.end_time AT TIME ZONE 'UTC' AS run_end_time,
r.mode AS run_mode,
r.score AS run_score,
r.passed AS run_passed,
r.result AS run_result,
r.compilation as run_compilation,
r.meta as run_meta,
r.system_info AS run_system_info
FROM leaderboard.submission AS s
LEFT JOIN leaderboard.runs AS r ON s.id = r.submission_id
JOIN leaderboard.code_files AS c ON s.code_id = c.id
LEFT JOIN user_mapping um ON s.user_id = um.original_user_id
WHERE s.leaderboard_id = ANY(:leaderboard_ids)
"""
part = 0
with engine.connect().execution_options(stream_results=True) as conn:
for chunk_df in pd.read_sql_query(
text(query),
conn,
params={'leaderboard_ids': leaderboard_ids},
chunksize=chunksize
):
# Decode hex values code column
if 'code' in chunk_df.columns:
chunk_df['code'] = chunk_df['code'].apply(decode_hex_if_needed)
# Convert nullable integer columns to consistent types
# This prevents type mismatches when some chunks have all NULLs
nullable_int_cols = ['run_id', 'code_id', 'submission_id', 'leaderboard_id', 'user_id']
for col in nullable_int_cols:
if col in chunk_df.columns:
chunk_df[col] = chunk_df[col].astype('Int64')
# Handle empty structs that PyArrow can't serialize
chunk_df = handle_empty_structs(chunk_df)
# Convert to arrow table
table = pa.Table.from_pandas(chunk_df)
# Write chunk as separate parquet file
filename = os.path.join(output_path, f"submissions_part_{part:05d}.parquet")
pq.write_table(table, filename)
print(f" Wrote {len(chunk_df)} submissions to part {part}")
# Filter for and save successful submissions
if 'run_passed' in chunk_df.columns:
success_mask = chunk_df['run_passed'] == True
if success_mask.any():
success_df = chunk_df[success_mask]
success_table = pa.Table.from_pandas(success_df)
success_filename = os.path.join(output_path, f"successful_submissions_part_{part:05d}.parquet")
pq.write_table(success_table, success_filename)
print(f" Wrote {len(success_df)} successful submissions to part {part}")
del success_df, success_table, success_mask
del chunk_df, table
gc.collect()
part += 1
print(f"Submissions saved to {part} parquet files in {output_path}")
def decode_hex_if_needed(code_val: str) -> str:
"""Decodes a string from hexadecimal if it starts with '\\x'.
Args:
code_val: The value from the 'code' column, expected to be a string.
Returns:
The decoded UTF-8 string, or the original value if it's not a
hex-encoded string or if decoding fails.
"""
if isinstance(code_val, str) and code_val.startswith('\\x'):
try:
# Strip the '\\x' prefix and convert from hex to bytes
hex_string = code_val[2:]
byte_data = bytes.fromhex(hex_string)
# Decode bytes to a UTF-8 string, replacing errors
return byte_data.decode('utf-8', 'replace')
except ValueError:
# Handles errors like non-hex characters in the string
return code_val
return code_val
def consolidate_parquet_files(input_dir, pattern, output_file):
"""
Consolidates multiple parquet part files into a single parquet file.
Args:
input_dir: Directory containing the parquet part files
pattern: Glob pattern to match the part files (e.g., "submissions_part_*.parquet")
output_file: Path to the output consolidated parquet file
"""
import glob
# Find all matching parquet files
part_files = sorted(glob.glob(os.path.join(input_dir, pattern)))
if not part_files:
print(f" No files found matching pattern {pattern}")
return
print(f" Consolidating {len(part_files)} {pattern} files into {output_file}...")
# First pass: Read only schemas (not data) from all files to unify them
schemas = []
for part_file in part_files:
parquet_file = pq.ParquetFile(part_file)
schemas.append(parquet_file.schema_arrow)
# Unify schemas across all tables to handle struct field variations
unified_schema = pa.unify_schemas(schemas)
# Second pass: Read each file, cast to unified schema, and write incrementally
total_rows = 0
with pq.ParquetWriter(output_file, unified_schema) as writer:
for part_file in part_files:
# Read one file at a time
table = pq.read_table(part_file)
# Cast to unified schema (fills missing fields with nulls)
unified_table = table.cast(unified_schema)
# Write to output file
writer.write_table(unified_table)
total_rows += len(unified_table)
print(f" Done! Consolidated {len(part_files)} files ({total_rows} total rows)")
def main(output_dir):
"""
Orchestrates the data export process.
This function initializes the database connection, fetches leaderboard
and submission data, anonymizes user IDs, and saves the results to
separate Parquet files: `leaderboards.parquet`, `submissions.parquet`,
and `successful_submissions.parquet`. The user ID mapping is not saved.
Temporary files are not deleted and should be manually removed if
desired.
Args:
output_dir (str): The local directory path to save the Parquet files.
"""
engine = create_engine(DATABASE_URL)
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Fetch and save leaderboards
leaderboards_output_path = os.path.join(output_dir, "leaderboards.parquet")
fetch_and_save_leaderboards(engine, LEADERBOARD_IDS, leaderboards_output_path)
anonymize_users_in_db(engine, LEADERBOARD_IDS)
# Fetch submissions
submissions_output_path = os.path.join(output_dir, "submissions")
os.makedirs(submissions_output_path, exist_ok=True)
fetch_and_save_submissions(engine, LEADERBOARD_IDS, submissions_output_path)
# Consolidate part files into single parquet files
consolidate_parquet_files(
submissions_output_path,
"submissions_part_*.parquet",
os.path.join(output_dir, "submissions.parquet")
)
consolidate_parquet_files(
submissions_output_path,
"successful_submissions_part_*.parquet",
os.path.join(output_dir, "successful_submissions.parquet")
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Export leaderboard data to a Hugging Face dataset.")
parser.add_argument(
"--output_dir",
type=str,
default="dataset",
help="Directory to save the Hugging Face dataset."
)
args = parser.parse_args()
main(args.output_dir)