Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions infra/perf/pr_scoped_probes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3273,8 +3273,8 @@
"scripts/dataset_registry_preview_limit_probe.py",
"scripts/changed_scope_coverage.py"
],
"test_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics",
"coverage_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage run -m pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics && PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage json -o coverage.json && python3 scripts/changed_scope_coverage.py --coverage-json coverage.json services/mlx-worker-python/worker/dataset_registry/catalog.py services/mlx-worker-python/tests/test_dataset_registry.py services/mlx-worker-python/tests/test_pr_scoped_performance.py scripts/dataset_registry_preview_limit_probe.py",
"test_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_json_row_reader_limit_uses_incremental_decode services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limited_json_text_helper_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics",
"coverage_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage run -m pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_json_row_reader_limit_uses_incremental_decode services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limited_json_text_helper_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics && PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage json -o coverage.json && python3 scripts/changed_scope_coverage.py --coverage-json coverage.json services/mlx-worker-python/worker/dataset_registry/catalog.py services/mlx-worker-python/tests/test_dataset_registry.py services/mlx-worker-python/tests/test_pr_scoped_performance.py scripts/dataset_registry_preview_limit_probe.py",
"probe_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python bash -c 'if [ -f scripts/dataset_registry_preview_limit_probe.py ]; then python3 scripts/dataset_registry_preview_limit_probe.py; else python3 - <<\"PY\"\nimport json, os, statistics, sys, tempfile, time, tracemalloc\nfrom pathlib import Path\nrepo_root = Path.cwd()\nsys.path.insert(0, str(repo_root))\nsys.path.insert(0, str(repo_root / \"services/mlx-worker-python\"))\nfrom worker.dataset_registry.catalog import read_hf_dataset_snapshot_rows\ndef int_env(name, default):\n value = os.environ.get(name, \"\")\n return int(value) if value else default\nfile_count = int_env(\"MELIX_DATASET_PREVIEW_PROBE_FILES\", 1500)\nsample_count = int_env(\"MELIX_DATASET_PREVIEW_PROBE_SAMPLES\", 7)\nelapsed = []\npeaks = []\nwith tempfile.TemporaryDirectory(prefix=\"melix-dataset-preview-probe-\") as temp_dir:\n snapshot_dir = Path(temp_dir) / \"snapshot\"\n data_dir = snapshot_dir / \"data\"\n data_dir.mkdir(parents=True)\n for index in range(file_count):\n (data_dir / f\"part-{index:05d}.jsonl\").write_text(json.dumps({\"prompt\": f\"prompt-{index}\", \"answer\": f\"answer-{index}\"}) + \"\\n\", encoding=\"utf-8\")\n (snapshot_dir / \"README.md\").write_text(\"# Synthetic dataset\\n\", encoding=\"utf-8\")\n expected = [{\"prompt\": \"prompt-0\", \"answer\": \"answer-0\"}]\n for _ in range(sample_count):\n tracemalloc.start()\n started = time.perf_counter()\n rows = read_hf_dataset_snapshot_rows(snapshot_dir, limit=1)\n elapsed.append((time.perf_counter() - started) * 1000.0)\n _, peak = tracemalloc.get_traced_memory()\n tracemalloc.stop()\n peaks.append(float(peak))\n if rows != expected:\n raise SystemExit(f\"unexpected preview rows: {rows!r}\")\nprint(json.dumps({\"elapsed_ms_mean\": round(statistics.fmean(elapsed), 6), \"elapsed_ms_min\": round(min(elapsed), 6), \"peak_bytes_mean\": round(statistics.fmean(peaks), 3), \"file_count\": float(file_count), \"rows_returned\": 1.0, \"sample_count\": float(sample_count)}, sort_keys=True))\nPY\nfi'",
"probe_impl": "command_json",
"coverage_replays_tests": true,
Expand Down
58 changes: 58 additions & 0 deletions services/mlx-worker-python/tests/test_dataset_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,64 @@ def test_dataset_catalog_row_reader_respects_limit(tmp_path: Path) -> None:
assert rows == [{"prompt": "first", "answer": "a"}]


def test_dataset_catalog_json_row_reader_limit_uses_incremental_decode(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
json_path = tmp_path / "preview.json"
json_path.write_text(
json.dumps(
{
"rows": [
{"prompt": "first", "answer": "a"},
{"prompt": "second", "answer": "b"},
]
}
),
encoding="utf-8",
)

monkeypatch.setattr(
catalog.json,
"loads",
lambda _payload: (_ for _ in ()).throw(
AssertionError("limited canonical JSON previews should not fully decode")
),
)

assert catalog._read_rows_from_file(json_path, limit=1) == [
{"prompt": "first", "answer": "a"}
]


def test_dataset_catalog_limited_json_text_helper_edges() -> None:
assert catalog._limited_rows_from_json_text("[]", limit=0) == []
assert catalog._limited_rows_from_json_text("{}", limit=1) is None
assert catalog._limited_rows_from_json_text(" []", limit=1) == []
assert catalog._limited_rows_from_json_text("[", limit=1) is None
assert catalog._limited_rows_from_json_text("[invalid", limit=1) is None
assert catalog._limited_rows_from_json_text("[1]", limit=1) == []
assert catalog._limited_rows_from_json_text("[1 x", limit=1) is None
assert catalog._limited_rows_from_json_text(
'[{"prompt":"first"}, {"prompt":"second"}]', limit=2
) == [{"prompt": "first"}, {"prompt": "second"}]
assert catalog._limited_rows_from_json_text(
'{"data" : [{"prompt":"first"}]}', limit=1
) == [{"prompt": "first"}]
assert catalog._json_text_first_array_start("") is None
assert catalog._json_text_first_array_start("1") is None
assert catalog._json_text_first_array_start('{"rows" []}') is None
assert catalog._json_text_first_array_start('{"rows" }') is None
assert catalog._json_text_first_array_start('{ }') is None
assert catalog._json_text_first_array_start('{invalid') is None
assert catalog._json_text_first_array_start('{1: 2}') is None
assert catalog._json_text_first_array_start('{"metadata": bad}') is None
assert catalog._json_text_first_array_start('{"metadata": 1 , "rows": []}') is not None
assert catalog._json_text_first_array_start('{"metadata": 1 x}') is None
assert catalog._json_text_first_array_start("{") is None
assert catalog._json_text_first_array_start('{"items": []}') is None


class _FakeColumnarBatch:
def __init__(self, rows: list[object]) -> None:
self._rows = rows
Expand Down
90 changes: 89 additions & 1 deletion services/mlx-worker-python/worker/dataset_registry/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,12 @@ def _read_rows_from_file(path: Path, *, limit: int | None = None) -> list[dict[s
break
return rows
if suffix == ".json":
payload = json.loads(path.read_text(encoding="utf-8"))
json_text = path.read_text(encoding="utf-8")
if limit is not None:
limited_rows = _limited_rows_from_json_text(json_text, limit=limit)
if limited_rows is not None:
return limited_rows
payload = json.loads(json_text)
return _rows_from_json_payload(payload, limit=limit)
if suffix == ".csv":
rows: list[dict[str, Any]] = []
Expand Down Expand Up @@ -624,6 +629,89 @@ def _limit_rows(rows: list[dict[str, Any]], limit: int | None) -> list[dict[str,
return rows[:limit]


def _limited_rows_from_json_text(json_text: str, *, limit: int) -> list[dict[str, Any]] | None:
if limit <= 0:
return []
cursor = _json_text_first_array_start(json_text)
if cursor is None:
return None
rows: list[dict[str, Any]] = []
decoder = json.JSONDecoder()
text_length = len(json_text)
cursor += 1
while cursor < text_length:
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor >= text_length or json_text[cursor] == "]":
return rows
try:
value, cursor = decoder.raw_decode(json_text, cursor)
except json.JSONDecodeError:
return None
if isinstance(value, dict):
rows.append(value)
if len(rows) >= limit:
return rows
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor < text_length and json_text[cursor] == ",":
cursor += 1
continue
if cursor < text_length and json_text[cursor] == "]":
return rows
return None
return None


def _json_text_first_array_start(json_text: str) -> int | None:
cursor = 0
text_length = len(json_text)
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor >= text_length:
return None
if json_text[cursor] == "[":
return cursor
if json_text[cursor] != "{":
return None

decoder = json.JSONDecoder()
cursor += 1
while cursor < text_length:
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor >= text_length or json_text[cursor] == "}":
return None
try:
key, cursor = decoder.raw_decode(json_text, cursor)
except json.JSONDecodeError:
return None
if not isinstance(key, str):
return None
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor >= text_length or json_text[cursor] != ":":
return None
cursor += 1
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if key in {"rows", "data"} and cursor < text_length and json_text[cursor] == "[":
return cursor
try:
_, cursor = decoder.raw_decode(json_text, cursor)
except json.JSONDecodeError:
return None
while cursor < text_length and json_text[cursor].isspace():
cursor += 1
if cursor < text_length and json_text[cursor] == ",":
cursor += 1
continue
if cursor < text_length and json_text[cursor] == "}":
return None
return None
return None
Comment on lines +666 to +712

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for skipping whitespace, while cursor < text_length and json_text[cursor].isspace(): cursor += 1, is repeated multiple times in this function and also in _limited_rows_from_json_text. To improve maintainability and reduce code duplication, consider extracting this into a small helper function.

For example:

def _skip_whitespace(text: str, cursor: int, text_length: int) -> int:
    while cursor < text_length and text[cursor].isspace():
        cursor += 1
    return cursor

You could then replace the repeated while loops with a call to this helper, like cursor = _skip_whitespace(json_text, cursor, text_length).



def _append_limited_dict_rows(
rows: list[dict[str, Any]],
candidates: list[Any],
Expand Down
Loading