Skip to content

Commit dc010e2

Browse files
committed
sec-edgar: fold DuckDB read into a custom download step
Drops the standalone transform stage entirely. The new download_sec_edgar() lists upstream parquets via HfFileSystem and maps each through a worker that streams it via DuckDB and writes a PyArrow-readable shard at raw/sec-edgar/<form-type>/<file>.parquet. Normalize consumes that directly — chain collapses to (download, normalize) with one on-disk copy instead of two. Matches the nsf_awards pattern: download function + zephyr Dataset.map + write_jsonl manifest + provenance.json at the end. Carries a 20-try exponential-backoff retry around each per-file DuckDB read to absorb HF rate-limits and transient xet-bridge errors that download_hf_step got for free. Saves ~220 GiB of redundant staging copy compared to the prior download_hf + transform chain. hash_attrs change forces fresh fetches (orphans the existing raw/sec-edgar_62f8bccd and downstream cached output paths), but that's a one-time cost.
1 parent c623b10 commit dc010e2

1 file changed

Lines changed: 136 additions & 49 deletions

File tree

lib/marin/src/marin/datakit/download/sec_edgar.py

Lines changed: 136 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,191 @@
11
# Copyright The Marin Authors
22
# SPDX-License-Identifier: Apache-2.0
33

4-
"""TeraflopAI/SEC-EDGAR download + transform + normalize helpers.
4+
"""TeraflopAI/SEC-EDGAR download + normalize helpers.
55
66
~8M filings (~335B marin_tokenizer tokens) from the SEC EDGAR database,
77
organized into per-filing-type subdirectories: 10-K, 10-Q, 8-K, 20-F,
88
S-1, S-8, 144, and Form 3/4/5. Text lives in the upstream ``content``
99
column.
1010
11-
A transform step sits between download and normalize as a workaround
12-
for https://github.com/marin-community/marin/issues/5334 — the upstream
13-
parquet shards trip ``apache/arrow#46404`` (PyArrow's parquet reader
11+
The download uses DuckDB instead of byte-streaming because the upstream
12+
parquet shards trip ``apache/arrow#46404`` — PyArrow's parquet reader
1413
can't decode page headers >8 MiB, which the multi-MB filings in the
15-
``content`` column overflow on per-page string statistics). The
16-
transform routes the read through ``read_parquet_via_duckdb`` (DuckDB
17-
has no such cap) so the upstream bytes survive into a re-write that
18-
PyArrow's own writer produces with safely-truncated stats — readable by
19-
downstream PyArrow consumers (normalize, tokenize) again.
20-
21-
Scoped to this source per the discussion in
22-
https://github.com/marin-community/marin/pull/5335 — if more datasets
23-
hit the page-header cap or we move off PyArrow wholesale, lift
24-
``read_parquet_via_duckdb`` into a shared helper.
14+
``content`` column overflow on per-page string statistics. DuckDB has
15+
no such cap, so we re-encode through it once at download time: each
16+
upstream shard is read via DuckDB and rewritten via PyArrow's default
17+
writer (whose stats are safely truncated for huge strings — verified
18+
locally on the offending SEC files). The result is a PyArrow-readable
19+
``raw/sec-edgar/<form-type>/<file>.parquet`` tree that normalize and
20+
tokenize consume directly, with no intermediate staging copy.
21+
22+
Scoped to this source per https://github.com/marin-community/marin/pull/5335
23+
— if more datasets hit the page-header cap or we move off PyArrow
24+
wholesale, lift ``read_parquet_via_duckdb`` into a shared helper.
25+
Tracking: https://github.com/marin-community/marin/issues/5334.
2526
"""
2627

2728
from __future__ import annotations
2829

30+
import logging
31+
import os
32+
import random
33+
import time
2934
from collections.abc import Iterator
3035

3136
import duckdb
37+
import pyarrow as pa
38+
import pyarrow.parquet as pq
3239
from fray import ResourceConfig
33-
from rigging.filesystem import url_to_fs
40+
from huggingface_hub import HfFileSystem
3441
from zephyr import Dataset, ZephyrContext, counters
42+
from zephyr.writers import atomic_rename, ensure_parent_dir
3543

36-
from marin.datakit.download.huggingface import download_hf_step
3744
from marin.datakit.normalize import normalize_step
3845
from marin.execution.step_spec import StepSpec
46+
from marin.utilities.validation_utils import write_provenance_json
47+
48+
logger = logging.getLogger(__name__)
3949

4050
HF_DATASET_ID = "TeraflopAI/SEC-EDGAR"
4151
HF_REVISION = "43de32c"
4252

4353
FILING_TYPES = ("10-K", "10-Q", "8-K", "20-F", "S-1", "S-8", "144", "3", "4", "5")
4454

55+
# HF rate-limits aggressively; a small batch reader keeps memory bounded
56+
# while one big SEC row group (~700 MB decompressed) is in flight.
57+
_ROWS_PER_BATCH = 8
58+
59+
# Per-file retry policy (HfHubHTTPError 429s, network blips, xet-bridge hiccups).
60+
_MAX_RETRIES = 20
61+
_BASE_WAIT_S = 5
62+
_MAX_WAIT_S = 15 * 60
63+
64+
65+
def read_parquet_via_duckdb(path: str, *, fs: object | None = None) -> Iterator[pa.RecordBatch]:
66+
"""Yield Arrow RecordBatches from a parquet via DuckDB.
4567
46-
def read_parquet_via_duckdb(path: str) -> Iterator[dict]:
47-
"""Yield records from a parquet file using DuckDB instead of PyArrow.
68+
Works around https://github.com/marin-community/marin/issues/5334
69+
(apache/arrow#46404) — PyArrow can't decode page headers >8 MiB,
70+
which SEC's multi-MB ``content`` column overflows. DuckDB has no
71+
such limit.
4872
49-
Drop-in replacement for ``zephyr.load_parquet`` for the SEC-EDGAR
50-
pipeline. DuckDB's reader has no 8 MiB page-header cap, so it can
51-
decode shards PyArrow rejects (see module docstring + #5334).
73+
``fs`` is the fsspec filesystem to register with the DuckDB
74+
connection. Defaults to an ``HfFileSystem`` so callers reading
75+
``hf://...`` paths don't have to wire one up.
5276
"""
53-
src_fs, _ = url_to_fs(path)
77+
if fs is None:
78+
fs = HfFileSystem()
5479
con = duckdb.connect(":memory:")
5580
try:
56-
con.register_filesystem(src_fs)
81+
con.register_filesystem(fs)
5782
result = con.execute("SELECT * FROM read_parquet(?)", [path])
58-
reader = result.fetch_record_batch(rows_per_batch=8)
59-
for batch in reader:
60-
counters.increment("sec_edgar/rows_read", batch.num_rows)
61-
rows = batch.to_pydict()
62-
for i in range(batch.num_rows):
63-
yield {k: rows[k][i] for k in rows}
83+
yield from result.fetch_record_batch(rows_per_batch=_ROWS_PER_BATCH)
6484
finally:
6585
con.close()
6686

6787

68-
def transform(input_path: str, output_path: str) -> None:
88+
def _download_one(task: dict) -> dict:
89+
"""Stream one upstream parquet via DuckDB, write a PyArrow-readable shard at ``dst``."""
90+
hf_path = task["hf_path"]
91+
dst = task["dst"]
92+
93+
last_exc: BaseException | None = None
94+
for attempt in range(_MAX_RETRIES):
95+
try:
96+
ensure_parent_dir(dst)
97+
count = 0
98+
batches = read_parquet_via_duckdb(hf_path)
99+
first = next(batches, None)
100+
if first is None:
101+
counters.increment("sec_edgar/empty_input")
102+
return {"hf_path": hf_path, "dst": dst, "rows": 0}
103+
with atomic_rename(dst) as tmp:
104+
with pq.ParquetWriter(tmp, first.schema) as writer:
105+
writer.write_batch(first)
106+
count += first.num_rows
107+
for batch in batches:
108+
writer.write_batch(batch)
109+
count += batch.num_rows
110+
counters.increment("sec_edgar/rows_downloaded", count)
111+
return {"hf_path": hf_path, "dst": dst, "rows": count}
112+
except Exception as e:
113+
last_exc = e
114+
wait = min(_MAX_WAIT_S, _BASE_WAIT_S * (2**attempt)) + random.uniform(0, 10)
115+
logger.warning(
116+
"Attempt %d/%d failed for %s: %s: %s; retrying in %.1fs",
117+
attempt + 1,
118+
_MAX_RETRIES,
119+
hf_path,
120+
type(e).__name__,
121+
e,
122+
wait,
123+
)
124+
time.sleep(wait)
125+
raise RuntimeError(f"Failed to download {hf_path} after {_MAX_RETRIES} attempts") from last_exc
126+
127+
128+
def _list_hf_parquets() -> list[str]:
129+
"""List all upstream parquet paths in ``hf://datasets/...`` form, pinned to revision."""
130+
hf = HfFileSystem()
131+
paths: list[str] = []
132+
for ftype in FILING_TYPES:
133+
pattern = f"datasets/{HF_DATASET_ID}/{ftype}/*.parquet"
134+
for p in hf.glob(pattern, revision=HF_REVISION):
135+
paths.append(f"hf://{p}")
136+
paths.sort()
137+
return paths
138+
139+
140+
def download_sec_edgar(output_path: str) -> None:
141+
"""Pull SEC-EDGAR from HF via DuckDB, write PyArrow-readable shards under ``output_path``."""
142+
files = _list_hf_parquets()
143+
if not files:
144+
raise ValueError(f"No parquet files matched for {HF_DATASET_ID}@{HF_REVISION}")
145+
logger.info("Found %d upstream parquet files", len(files))
146+
147+
base = f"hf://datasets/{HF_DATASET_ID}/"
148+
tasks = [{"hf_path": p, "dst": os.path.join(output_path, p.removeprefix(base))} for p in files]
149+
69150
pipeline = (
70-
Dataset.from_files(f"{input_path}/**/*.parquet")
71-
.flat_map(read_parquet_via_duckdb)
72-
.write_parquet(f"{output_path}/data-{{shard:05d}}-of-{{total:05d}}.parquet", skip_existing=True)
151+
Dataset.from_list(tasks)
152+
.map(_download_one)
153+
.write_jsonl(
154+
f"{output_path}/.metrics/download-{{shard:05d}}-of-{{total:05d}}.jsonl",
155+
skip_existing=True,
156+
)
73157
)
74-
ctx = ZephyrContext(name="sec-edgar-transform", resources=ResourceConfig(cpu=1, ram="32g"))
158+
ctx = ZephyrContext(name="download-sec-edgar", resources=ResourceConfig(cpu=1, ram="16g"))
75159
ctx.execute(pipeline)
76160

161+
write_provenance_json(
162+
output_path,
163+
metadata={"dataset": HF_DATASET_ID, "version": HF_REVISION, "links": files},
164+
)
165+
logger.info("SEC-EDGAR download complete.")
166+
77167

78168
def download_sec_edgar_step() -> StepSpec:
79-
"""Download SEC-EDGAR from HF and rewrite each shard via DuckDB."""
80-
dl = download_hf_step(
81-
"raw/sec-edgar",
82-
hf_dataset_id=HF_DATASET_ID,
83-
revision=HF_REVISION,
84-
hf_urls_glob=[f"{f}/*.parquet" for f in FILING_TYPES],
85-
)
86169
return StepSpec(
87-
name="processed/sec-edgar",
88-
deps=[dl],
89-
fn=lambda output_path: transform(input_path=dl.output_path, output_path=output_path),
90-
hash_attrs={"version": "v1"},
170+
name="raw/sec-edgar",
171+
fn=download_sec_edgar,
172+
hash_attrs={
173+
"hf_dataset_id": HF_DATASET_ID,
174+
"revision": HF_REVISION,
175+
"filing_types": list(FILING_TYPES),
176+
"reader": "duckdb",
177+
},
91178
)
92179

93180

94181
def sec_edgar_normalize_steps() -> tuple[StepSpec, ...]:
95-
"""Return the ``(download+transform, normalize)`` chain for SEC-EDGAR."""
96-
processed = download_sec_edgar_step()
182+
"""Return the ``(download, normalize)`` chain for SEC-EDGAR."""
183+
download = download_sec_edgar_step()
97184
return (
98-
processed,
185+
download,
99186
normalize_step(
100187
name="normalized/sec-edgar",
101-
download=processed,
188+
download=download,
102189
text_field="content",
103190
file_extensions=(".parquet",),
104191
),

0 commit comments

Comments
 (0)