-
Notifications
You must be signed in to change notification settings - Fork 8
feat: switch to parquet data format for rollout communication #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- **Add get_parquet_file function**: New async function to download and parse Parquet files from S3/R2, enhancing data retrieval efficiency. - **Update sink_window_inferences**: Change to upload window inferences in Parquet format, improving storage efficiency with snappy compression. - **Modify fetch_miner_window_data**: Adjust to fetch Parquet-formatted window files, optimizing data transfer for miners. - **Revise MinerValidator**: Update to validate and download miner's window files in Parquet format for better performance. - **Change MinerSampler**: Update file checks to use Parquet format, ensuring consistency across data handling.
|
Caution Review failedThe pull request is closed. Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughWindow data storage was migrated from JSON to Parquet: a new Parquet I/O module was added, comms gained Parquet download/sink functions, validation and miner code now read/write Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
grail/validation/miner_validator.py (1)
884-944: GRPO env-seed check is now effectively neutered by Parquet schema.
_validate_grpo_groupsreadsr.get("commit", {}).get("env", {}).get("seed"), but the new Parquet schema/converters ingrail/infrastructure/parquet_io.pynever serialize or restore acommit["env"]field. After a Parquet round-trip, these seeds will all beNone, so this check will always pass and no longer detect mismatched env seeds across a group.Recommend either:
- Adding an
envstruct (withseed) toCOMMIT_SCHEMAand wiring it through_convert_inference_to_row/_convert_row_to_inference, or- Explicitly removing this check and replacing it with a validator-side derivation/consistency check that doesn’t depend on miner-supplied
env.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
grail/infrastructure/comms.py(3 hunks)grail/infrastructure/miner_data.py(4 hunks)grail/infrastructure/parquet_io.py(1 hunks)grail/validation/miner_validator.py(3 hunks)grail/validation/sampling.py(2 hunks)tests/test_parquet_io.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
grail/validation/miner_validator.py (1)
grail/infrastructure/comms.py (2)
get_parquet_file(1111-1138)file_exists_with_deadline(916-981)
grail/infrastructure/comms.py (1)
grail/infrastructure/parquet_io.py (2)
deserialize_parquet_to_window(285-321)serialize_window_to_parquet(239-282)
grail/infrastructure/miner_data.py (1)
grail/infrastructure/comms.py (2)
file_exists(887-913)get_parquet_file(1111-1138)
tests/test_parquet_io.py (1)
grail/infrastructure/parquet_io.py (2)
deserialize_parquet_to_window(285-321)serialize_window_to_parquet(239-282)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Run Tests (3.11)
- GitHub Check: Run Tests (3.10)
- GitHub Check: Cursor Bugbot
🔇 Additional comments (6)
grail/infrastructure/miner_data.py (1)
12-61: Parquet migration wiring here looks consistent with the new sink.Importing
get_parquet_file, switching the window filename to.parquet, and delegating download throughget_parquet_fileall line up withsink_window_inferences’ new key format. No issues from a correctness or compatibility standpoint.grail/validation/miner_validator.py (1)
21-21: Parquet-based window fetch path looks sound.The updated filename (
.parquet), deadline-aware existence check, and use ofget_parquet_filealign with the new Parquet sink incomms.sink_window_inferences. No behavioral regressions spotted in the fetch/validation entry point.Also applies to: 247-255, 288-295
grail/validation/sampling.py (1)
74-75: Active-miner discovery correctly updated to Parquet filenames.The
.parquetfilename in both the docstring and_checkaligns with the new sink and validator paths, preserving the existing deadline/size semantics.Also applies to: 95-96
tests/test_parquet_io.py (1)
1-220: Parquet IO test coverage is solid and matches the schema behavior.The helpers and test cases exercise window-level metadata, nested commit/rollout structures, JSON-encoded fields, empty inferences, and basic compression behavior. This should catch most regressions in
serialize_window_to_parquet/deserialize_parquet_to_window.grail/infrastructure/comms.py (2)
1111-1138: get_parquet_file correctly layers Parquet deserialization on top of existing download logic.Reusing
download_file_chunkedand delegating todeserialize_parquet_to_window, with the same “log-and-return-None” behavior asget_file, keeps error semantics consistent across formats.
1152-1175: Parquet sink for window inferences is consistent with the new read paths.Switching the key to
.parquet, constructingwindow_dataas before, and usingserialize_window_to_parquetbeforeupload_file_chunkedlines up with the consumer expectations in miner fetch/validation and sampling. No issues spotted in the upload path.
| COMMIT_SCHEMA = pa.struct( | ||
| [ | ||
| ("tokens", pa.list_(pa.int32())), | ||
| ("commitments", pa.string()), # JSON-encoded for variable structure | ||
| ("proof_version", pa.string()), | ||
| ("model", MODEL_SCHEMA), | ||
| ("signature", pa.string()), | ||
| ("beacon", pa.string()), # JSON-encoded for variable structure | ||
| ("rollout", ROLLOUT_SCHEMA), | ||
| ] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet schema currently drops commit["env"] and any other extra commit fields.
COMMIT_SCHEMA plus _convert_inference_to_row / _convert_row_to_inference explicitly round-trip only:
tokens,commitments,proof_version,model,signature,beacon, androllout(with its own fixed fields).
Any additional keys in commit (e.g. env, sat_problem, s_vals, or future fields) will be lost once written to Parquet. This is already observable for env: MinerValidator._validate_grpo_groups expects commit["env"]["seed"], but after a Parquet round-trip that field will be missing, so downstream code only ever sees None.
Suggest extending the schema and converters to preserve at least the fields that validators depend on, for example:
- Add an
envstruct toCOMMIT_SCHEMA(includingseedand any other required fields), and - Map it through
_convert_inference_to_row/_convert_row_to_inference.
Optionally, consider preserving unknown extra keys under a generic JSON-encoded blob if forward-compatibility is important.
Also applies to: 87-162, 166-231
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
grail/infrastructure/parquet_io.py (1)
58-68: Schema omitsenv,sat_problem, ands_valsfields from commit.As noted in a previous review, the current schema only preserves a subset of commit fields. Fields like
env(containingseed),sat_problem, ands_valsare not round-tripped through Parquet. If downstream validation code (e.g.,MinerValidator._validate_grpo_groups) expectscommit["env"]["seed"], it will receiveNoneafter deserialization.Consider extending
COMMIT_SCHEMAto include these fields, or adding a fallback JSON blob for unknown keys to maintain forward-compatibility.
🧹 Nitpick comments (1)
grail/infrastructure/parquet_io.py (1)
24-28: Minor: Redundantpassin exception class.The
passstatement is not needed when a docstring is present.class ParquetError(ValueError): """Raised when Parquet data is invalid or corrupt.""" - - pass
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
grail/infrastructure/comms.py(3 hunks)grail/infrastructure/parquet_io.py(1 hunks)pyproject.toml(2 hunks)tests/test_parquet_io.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/test_parquet_io.py
🧰 Additional context used
🧬 Code graph analysis (1)
grail/infrastructure/comms.py (1)
grail/infrastructure/parquet_io.py (3)
ParquetError(24-27)deserialize_parquet_to_window(295-340)serialize_window_to_parquet(249-292)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Cursor Bugbot
- GitHub Check: Run Tests (3.10)
- GitHub Check: Run Tests (3.11)
- GitHub Check: Run Tests (3.11)
- GitHub Check: Run Tests (3.10)
🔇 Additional comments (5)
pyproject.toml (1)
55-55: LGTM! Dependency and mypy configuration properly added.The
pyarrow>=14.0.0dependency is correctly added to enable Parquet serialization. The mypy overrides forpyarrowandpyarrow.*are appropriate since pyarrow's type stubs may be incomplete.Also applies to: 159-160
grail/infrastructure/parquet_io.py (2)
176-241: LGTM!The reverse conversion properly decodes JSON-encoded fields and reconstructs the original inference dictionary structure. The
isinstancechecks beforejson.loadsare appropriate defensive coding.
249-292: LGTM!The serialization implementation is well-designed:
- Handles empty inference lists correctly
- Stores window-level metadata in Parquet schema metadata
- Uses snappy compression with dictionary encoding for efficient storage
- In-memory serialization avoids filesystem overhead
grail/infrastructure/comms.py (2)
1111-1141: LGTM!The
get_parquet_filefunction follows the same pattern as the existingget_filefunction. Good separation of concerns with:
- Local import to keep Parquet dependencies localized
- Appropriate error handling (warning for corrupt files, debug for other failures)
- Consistent return type with
get_file
1149-1183: LGTM! Parquet serialization correctly replaces JSON.The changes are well-implemented:
- File extension correctly changed to
.parquet- Local import keeps Parquet dependency scoped
- No redundant compression since Parquet already uses snappy compression internally
- The
upload_file_chunkedfunction won't apply gzip since the key doesn't end in.json(per line 429)
| token_logprobs = rollout_data.get("token_logprobs") | ||
| if token_logprobs is None: | ||
| token_logprobs = [] | ||
| elif not isinstance(token_logprobs, list): | ||
| token_logprobs = list(token_logprobs) if hasattr(token_logprobs, "__iter__") else [] | ||
|
|
||
| # Handle assignment - ensure it's a list of bools | ||
| assignment = rollout_data.get("assignment") | ||
| if assignment is None: | ||
| assignment = [] | ||
| elif not isinstance(assignment, list): | ||
| assignment = list(assignment) if hasattr(assignment, "__iter__") else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue: iterable check would incorrectly process strings.
The hasattr(token_logprobs, "__iter__") check would return True for strings, converting a string like "1.0" into ['1', '.', '0']. Same issue applies to assignment.
Consider using explicit type checks:
token_logprobs = rollout_data.get("token_logprobs")
if token_logprobs is None:
token_logprobs = []
elif not isinstance(token_logprobs, list):
- token_logprobs = list(token_logprobs) if hasattr(token_logprobs, "__iter__") else []
+ token_logprobs = list(token_logprobs) if hasattr(token_logprobs, "__iter__") and not isinstance(token_logprobs, str) else []
# Handle assignment - ensure it's a list of bools
assignment = rollout_data.get("assignment")
if assignment is None:
assignment = []
elif not isinstance(assignment, list):
- assignment = list(assignment) if hasattr(assignment, "__iter__") else []
+ assignment = list(assignment) if hasattr(assignment, "__iter__") and not isinstance(assignment, str) else []📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| token_logprobs = rollout_data.get("token_logprobs") | |
| if token_logprobs is None: | |
| token_logprobs = [] | |
| elif not isinstance(token_logprobs, list): | |
| token_logprobs = list(token_logprobs) if hasattr(token_logprobs, "__iter__") else [] | |
| # Handle assignment - ensure it's a list of bools | |
| assignment = rollout_data.get("assignment") | |
| if assignment is None: | |
| assignment = [] | |
| elif not isinstance(assignment, list): | |
| assignment = list(assignment) if hasattr(assignment, "__iter__") else [] | |
| token_logprobs = rollout_data.get("token_logprobs") | |
| if token_logprobs is None: | |
| token_logprobs = [] | |
| elif not isinstance(token_logprobs, list): | |
| token_logprobs = list(token_logprobs) if hasattr(token_logprobs, "__iter__") and not isinstance(token_logprobs, str) else [] | |
| # Handle assignment - ensure it's a list of bools | |
| assignment = rollout_data.get("assignment") | |
| if assignment is None: | |
| assignment = [] | |
| elif not isinstance(assignment, list): | |
| assignment = list(assignment) if hasattr(assignment, "__iter__") and not isinstance(assignment, str) else [] |
🤖 Prompt for AI Agents
In grail/infrastructure/parquet_io.py around lines 114 to 125, the current
hasattr(..., "__iter__") guard will treat strings as iterables and split them
into characters; change the logic to explicitly reject str/bytes before
converting iterables and instead: if value is None set [], elif
isinstance(value, (str, bytes)) treat it as a single scalar and wrap it in a
single-element list (or attempt to parse to the target type if appropriate),
elif it's already a list leave as-is, else if it is iterable convert to list;
additionally coerce token_logprobs elements to floats and assignment elements to
bools (with safe conversion/fallback) so the final variables are lists of the
expected types.
| # Extract window-level metadata | ||
| metadata = table.schema.metadata or {} | ||
| wallet = metadata.get(b"wallet", b"").decode() | ||
| window_start = int(metadata.get(b"window_start", b"0").decode()) | ||
| window_length = int(metadata.get(b"window_length", b"0").decode()) | ||
| inference_count = int(metadata.get(b"inference_count", b"0").decode()) | ||
| timestamp = float(metadata.get(b"timestamp", b"0.0").decode()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadata parsing could raise ValueError on corrupt data.
If schema metadata contains non-numeric values (e.g., from corruption), int() or float() will raise ValueError which is not caught and converted to ParquetError.
# Extract window-level metadata
metadata = table.schema.metadata or {}
- wallet = metadata.get(b"wallet", b"").decode()
- window_start = int(metadata.get(b"window_start", b"0").decode())
- window_length = int(metadata.get(b"window_length", b"0").decode())
- inference_count = int(metadata.get(b"inference_count", b"0").decode())
- timestamp = float(metadata.get(b"timestamp", b"0.0").decode())
+ try:
+ wallet = metadata.get(b"wallet", b"").decode()
+ window_start = int(metadata.get(b"window_start", b"0").decode())
+ window_length = int(metadata.get(b"window_length", b"0").decode())
+ inference_count = int(metadata.get(b"inference_count", b"0").decode())
+ timestamp = float(metadata.get(b"timestamp", b"0.0").decode())
+ except (ValueError, UnicodeDecodeError) as e:
+ raise ParquetError(f"Invalid metadata in Parquet file: {e}") from e📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Extract window-level metadata | |
| metadata = table.schema.metadata or {} | |
| wallet = metadata.get(b"wallet", b"").decode() | |
| window_start = int(metadata.get(b"window_start", b"0").decode()) | |
| window_length = int(metadata.get(b"window_length", b"0").decode()) | |
| inference_count = int(metadata.get(b"inference_count", b"0").decode()) | |
| timestamp = float(metadata.get(b"timestamp", b"0.0").decode()) | |
| # Extract window-level metadata | |
| metadata = table.schema.metadata or {} | |
| try: | |
| wallet = metadata.get(b"wallet", b"").decode() | |
| window_start = int(metadata.get(b"window_start", b"0").decode()) | |
| window_length = int(metadata.get(b"window_length", b"0").decode()) | |
| inference_count = int(metadata.get(b"inference_count", b"0").decode()) | |
| timestamp = float(metadata.get(b"timestamp", b"0.0").decode()) | |
| except (ValueError, UnicodeDecodeError) as e: | |
| raise ParquetError(f"Invalid metadata in Parquet file: {e}") from e |
🤖 Prompt for AI Agents
In grail/infrastructure/parquet_io.py around lines 319 to 325, the metadata
parsing uses int()/float() directly and can raise ValueError on
corrupted/non-numeric values; wrap the conversions in a try/except that catches
ValueError and TypeError and re-raises a ParquetError (preserving the original
exception via "from") with a clear message that includes the metadata key and
offending value so callers get a ParquetError instead of an uncaught ValueError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the final PR Bugbot will review for you during this billing cycle
Your free Bugbot reviews will reset on January 4
Details
You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
| """ | ||
| commit = row.get("commit", {}) | ||
| rollout_data = commit.get("rollout", {}) | ||
| model_data = commit.get("model", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Null struct values cause AttributeError on deserialization
The _convert_row_to_inference function uses row.get("commit", {}) which only returns {} when the key is missing, not when the value is None. In PyArrow, struct fields are nullable by default, so if a malformed Parquet file contains null struct values, commit would be None and the subsequent commit.get("rollout", {}) call would raise an AttributeError. This could occur when validators process Parquet files from malicious miners. Using row.get("commit") or {} instead would handle both missing keys and null values safely.
| "prompt_length": rollout_data.get("prompt_length"), | ||
| "completion_length": rollout_data.get("completion_length"), | ||
| "satisfied_clauses": rollout_data.get("satisfied_clauses"), | ||
| "assignment": list(rollout_data.get("assignment", [])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Null list values cause TypeError during deserialization
The _convert_row_to_inference function calls list() on values retrieved via .get("key", []) at lines 216, 230, and 234. If these list fields (tokens, token_logprobs, assignment) exist in the Parquet data with explicit None values rather than missing keys, the .get() returns None instead of the default [], and list(None) raises TypeError: 'NoneType' object is not iterable. This is distinct from the null struct issue since it affects list columns within valid structs. In PyArrow, null list values are represented as None when deserialized with to_pylist().
| ), | ||
| digest_counter=digest_counter, | ||
| total_inferences_in_file=total_inferences, | ||
| download_seconds=download_seconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Passing undefined field to MinerResults dataclass
The _create_success_result method passes download_seconds=download_seconds to the MinerResults constructor, but the MinerResults dataclass in types.py doesn't have a download_seconds field. This will cause a runtime TypeError: __init__() got an unexpected keyword argument 'download_seconds' when creating successful validation results.
Overview
This PR introduces Parquet-based serialization for window data communication between trainers and miners, replacing text-based formats with snappy-compressed binary format.
Note
Migrate window rollout files from JSON to Parquet across infra and validation, adding Arrow-based serialization, tests, and download-time monitoring.
grail/infrastructure/parquet_io.pywith Arrow schemas andserialize_window_to_parquet/deserialize_parquet_to_window(withParquetError).get_parquet_fileingrail/infrastructure/comms.pyand switchsink_window_inferencesto write*.parquet.miner_validator/miner_datanow useget_parquet_fileand*.parquetkeys;samplingchecks*.parquetexistence.window_processor.tests/test_parquet_io.pycovering round-trip integrity, JSON-encoded fields, empty tables, compression, and error cases.pyarrowtopyproject.toml.Written by Cursor Bugbot for commit 2e0fa5c. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
New Features
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.