From 8d18e915b1e770efa0684c6f90f594c0a5a0b4dc Mon Sep 17 00:00:00 2001 From: Zigfreidish <261910051+Zigfreidish@users.noreply.github.com> Date: Tue, 12 May 2026 12:36:07 +0000 Subject: [PATCH] perf: short-circuit limited dataset JSON previews --- infra/perf/pr_scoped_probes.json | 4 +- .../tests/test_dataset_registry.py | 58 ++++++++++++ .../worker/dataset_registry/catalog.py | 90 ++++++++++++++++++- 3 files changed, 149 insertions(+), 3 deletions(-) diff --git a/infra/perf/pr_scoped_probes.json b/infra/perf/pr_scoped_probes.json index 52f49b17e..e4477dccf 100644 --- a/infra/perf/pr_scoped_probes.json +++ b/infra/perf/pr_scoped_probes.json @@ -3273,8 +3273,8 @@ "scripts/dataset_registry_preview_limit_probe.py", "scripts/changed_scope_coverage.py" ], - "test_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics", - "coverage_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage run -m pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics && PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage json -o coverage.json && python3 scripts/changed_scope_coverage.py --coverage-json coverage.json services/mlx-worker-python/worker/dataset_registry/catalog.py services/mlx-worker-python/tests/test_dataset_registry.py services/mlx-worker-python/tests/test_pr_scoped_performance.py scripts/dataset_registry_preview_limit_probe.py", + "test_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_json_row_reader_limit_uses_incremental_decode services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limited_json_text_helper_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics", + "coverage_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage run -m pytest -q services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_respects_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_json_row_reader_limit_uses_incremental_decode services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limited_json_text_helper_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_parquet_limit_uses_batched_reader services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_arrow_limit_uses_first_batches services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_zero_limit_returns_empty services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_stops_file_scan_after_unsplit_limit services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_limit_one_preview_avoids_full_supported_file_iterator services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_first_preview_file_preserves_sorted_depth_first_edges services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_row_reader_keeps_split_filtering_eager_for_missing_split services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_reads_json_and_csv_snapshots services/mlx-worker-python/tests/test_dataset_registry.py::test_dataset_catalog_selected_split_filters_during_iteration services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_scope_report_selects_dataset_registry_preview_probe services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_registered_probes_expose_focused_commands services/mlx-worker-python/tests/test_pr_scoped_performance.py::test_dataset_registry_preview_limit_probe_script_emits_metrics && PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python coverage json -o coverage.json && python3 scripts/changed_scope_coverage.py --coverage-json coverage.json services/mlx-worker-python/worker/dataset_registry/catalog.py services/mlx-worker-python/tests/test_dataset_registry.py services/mlx-worker-python/tests/test_pr_scoped_performance.py scripts/dataset_registry_preview_limit_probe.py", "probe_command": "PYTHONPATH=\"$PWD:$PWD/services/mlx-worker-python\" uv run --project services/mlx-worker-python bash -c 'if [ -f scripts/dataset_registry_preview_limit_probe.py ]; then python3 scripts/dataset_registry_preview_limit_probe.py; else python3 - <<\"PY\"\nimport json, os, statistics, sys, tempfile, time, tracemalloc\nfrom pathlib import Path\nrepo_root = Path.cwd()\nsys.path.insert(0, str(repo_root))\nsys.path.insert(0, str(repo_root / \"services/mlx-worker-python\"))\nfrom worker.dataset_registry.catalog import read_hf_dataset_snapshot_rows\ndef int_env(name, default):\n value = os.environ.get(name, \"\")\n return int(value) if value else default\nfile_count = int_env(\"MELIX_DATASET_PREVIEW_PROBE_FILES\", 1500)\nsample_count = int_env(\"MELIX_DATASET_PREVIEW_PROBE_SAMPLES\", 7)\nelapsed = []\npeaks = []\nwith tempfile.TemporaryDirectory(prefix=\"melix-dataset-preview-probe-\") as temp_dir:\n snapshot_dir = Path(temp_dir) / \"snapshot\"\n data_dir = snapshot_dir / \"data\"\n data_dir.mkdir(parents=True)\n for index in range(file_count):\n (data_dir / f\"part-{index:05d}.jsonl\").write_text(json.dumps({\"prompt\": f\"prompt-{index}\", \"answer\": f\"answer-{index}\"}) + \"\\n\", encoding=\"utf-8\")\n (snapshot_dir / \"README.md\").write_text(\"# Synthetic dataset\\n\", encoding=\"utf-8\")\n expected = [{\"prompt\": \"prompt-0\", \"answer\": \"answer-0\"}]\n for _ in range(sample_count):\n tracemalloc.start()\n started = time.perf_counter()\n rows = read_hf_dataset_snapshot_rows(snapshot_dir, limit=1)\n elapsed.append((time.perf_counter() - started) * 1000.0)\n _, peak = tracemalloc.get_traced_memory()\n tracemalloc.stop()\n peaks.append(float(peak))\n if rows != expected:\n raise SystemExit(f\"unexpected preview rows: {rows!r}\")\nprint(json.dumps({\"elapsed_ms_mean\": round(statistics.fmean(elapsed), 6), \"elapsed_ms_min\": round(min(elapsed), 6), \"peak_bytes_mean\": round(statistics.fmean(peaks), 3), \"file_count\": float(file_count), \"rows_returned\": 1.0, \"sample_count\": float(sample_count)}, sort_keys=True))\nPY\nfi'", "probe_impl": "command_json", "coverage_replays_tests": true, diff --git a/services/mlx-worker-python/tests/test_dataset_registry.py b/services/mlx-worker-python/tests/test_dataset_registry.py index 05f3800f4..5d23043bf 100644 --- a/services/mlx-worker-python/tests/test_dataset_registry.py +++ b/services/mlx-worker-python/tests/test_dataset_registry.py @@ -225,6 +225,64 @@ def test_dataset_catalog_row_reader_respects_limit(tmp_path: Path) -> None: assert rows == [{"prompt": "first", "answer": "a"}] +def test_dataset_catalog_json_row_reader_limit_uses_incremental_decode( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + json_path = tmp_path / "preview.json" + json_path.write_text( + json.dumps( + { + "rows": [ + {"prompt": "first", "answer": "a"}, + {"prompt": "second", "answer": "b"}, + ] + } + ), + encoding="utf-8", + ) + + monkeypatch.setattr( + catalog.json, + "loads", + lambda _payload: (_ for _ in ()).throw( + AssertionError("limited canonical JSON previews should not fully decode") + ), + ) + + assert catalog._read_rows_from_file(json_path, limit=1) == [ + {"prompt": "first", "answer": "a"} + ] + + +def test_dataset_catalog_limited_json_text_helper_edges() -> None: + assert catalog._limited_rows_from_json_text("[]", limit=0) == [] + assert catalog._limited_rows_from_json_text("{}", limit=1) is None + assert catalog._limited_rows_from_json_text(" []", limit=1) == [] + assert catalog._limited_rows_from_json_text("[", limit=1) is None + assert catalog._limited_rows_from_json_text("[invalid", limit=1) is None + assert catalog._limited_rows_from_json_text("[1]", limit=1) == [] + assert catalog._limited_rows_from_json_text("[1 x", limit=1) is None + assert catalog._limited_rows_from_json_text( + '[{"prompt":"first"}, {"prompt":"second"}]', limit=2 + ) == [{"prompt": "first"}, {"prompt": "second"}] + assert catalog._limited_rows_from_json_text( + '{"data" : [{"prompt":"first"}]}', limit=1 + ) == [{"prompt": "first"}] + assert catalog._json_text_first_array_start("") is None + assert catalog._json_text_first_array_start("1") is None + assert catalog._json_text_first_array_start('{"rows" []}') is None + assert catalog._json_text_first_array_start('{"rows" }') is None + assert catalog._json_text_first_array_start('{ }') is None + assert catalog._json_text_first_array_start('{invalid') is None + assert catalog._json_text_first_array_start('{1: 2}') is None + assert catalog._json_text_first_array_start('{"metadata": bad}') is None + assert catalog._json_text_first_array_start('{"metadata": 1 , "rows": []}') is not None + assert catalog._json_text_first_array_start('{"metadata": 1 x}') is None + assert catalog._json_text_first_array_start("{") is None + assert catalog._json_text_first_array_start('{"items": []}') is None + + class _FakeColumnarBatch: def __init__(self, rows: list[object]) -> None: self._rows = rows diff --git a/services/mlx-worker-python/worker/dataset_registry/catalog.py b/services/mlx-worker-python/worker/dataset_registry/catalog.py index afbe863d3..d35e0d3b0 100644 --- a/services/mlx-worker-python/worker/dataset_registry/catalog.py +++ b/services/mlx-worker-python/worker/dataset_registry/catalog.py @@ -545,7 +545,12 @@ def _read_rows_from_file(path: Path, *, limit: int | None = None) -> list[dict[s break return rows if suffix == ".json": - payload = json.loads(path.read_text(encoding="utf-8")) + json_text = path.read_text(encoding="utf-8") + if limit is not None: + limited_rows = _limited_rows_from_json_text(json_text, limit=limit) + if limited_rows is not None: + return limited_rows + payload = json.loads(json_text) return _rows_from_json_payload(payload, limit=limit) if suffix == ".csv": rows: list[dict[str, Any]] = [] @@ -624,6 +629,89 @@ def _limit_rows(rows: list[dict[str, Any]], limit: int | None) -> list[dict[str, return rows[:limit] +def _limited_rows_from_json_text(json_text: str, *, limit: int) -> list[dict[str, Any]] | None: + if limit <= 0: + return [] + cursor = _json_text_first_array_start(json_text) + if cursor is None: + return None + rows: list[dict[str, Any]] = [] + decoder = json.JSONDecoder() + text_length = len(json_text) + cursor += 1 + while cursor < text_length: + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor >= text_length or json_text[cursor] == "]": + return rows + try: + value, cursor = decoder.raw_decode(json_text, cursor) + except json.JSONDecodeError: + return None + if isinstance(value, dict): + rows.append(value) + if len(rows) >= limit: + return rows + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor < text_length and json_text[cursor] == ",": + cursor += 1 + continue + if cursor < text_length and json_text[cursor] == "]": + return rows + return None + return None + + +def _json_text_first_array_start(json_text: str) -> int | None: + cursor = 0 + text_length = len(json_text) + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor >= text_length: + return None + if json_text[cursor] == "[": + return cursor + if json_text[cursor] != "{": + return None + + decoder = json.JSONDecoder() + cursor += 1 + while cursor < text_length: + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor >= text_length or json_text[cursor] == "}": + return None + try: + key, cursor = decoder.raw_decode(json_text, cursor) + except json.JSONDecodeError: + return None + if not isinstance(key, str): + return None + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor >= text_length or json_text[cursor] != ":": + return None + cursor += 1 + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if key in {"rows", "data"} and cursor < text_length and json_text[cursor] == "[": + return cursor + try: + _, cursor = decoder.raw_decode(json_text, cursor) + except json.JSONDecodeError: + return None + while cursor < text_length and json_text[cursor].isspace(): + cursor += 1 + if cursor < text_length and json_text[cursor] == ",": + cursor += 1 + continue + if cursor < text_length and json_text[cursor] == "}": + return None + return None + return None + + def _append_limited_dict_rows( rows: list[dict[str, Any]], candidates: list[Any],