Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions experiments/pretraining_datasets/starcoder2_extras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright The Marin Authors
# SPDX-License-Identifier: Apache-2.0

"""StarCoder2 data extras: download and tokenize ir_cpp, ir_python, ir_rust, ir_low_resource, documentation."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Nit: docstring missing "kaggle"

The docstring lists subsets but omits "kaggle", which is in SUBSETS.

Suggested change
"""StarCoder2 data extras: download and tokenize ir_cpp, ir_python, ir_rust, ir_low_resource, documentation."""
"""StarCoder2 data extras: download and tokenize ir_cpp, ir_python, ir_rust, ir_low_resource, documentation, kaggle."""


from experiments.defaults import default_tokenize
from experiments.marin_models import marin_tokenizer
from fray.v2 import ResourceConfig
from levanter.data.text.formats import TextLmDatasetFormat
from marin.datakit.download.starcoder2_extras import (
SUBSETS,
download_starcoder2_extras_step,
reshard_starcoder2_extras_step,
)
from marin.execution.executor import executor_main
from marin.processing.tokenize.data_configs import TokenizerStep

WORKER_RAM = {"ir_low_resource": "80g"}
DEFAULT_WORKER_RAM = "40g"


def tokenize_starcoder2_extras(*, tokenizer: str = marin_tokenizer) -> list[TokenizerStep]:
"""Download and tokenize all selected starcoder2data-extras subsets."""
steps = []
RESHARD_SUBSETS = {"ir_low_resource"}
for subset in SUBSETS:
if subset in RESHARD_SUBSETS:
download = reshard_starcoder2_extras_step(subset)
else:
download = download_starcoder2_extras_step(subset)
ram = WORKER_RAM.get(subset, DEFAULT_WORKER_RAM)
steps.append(
default_tokenize(
name=f"starcoder2_extras/{subset}",
dataset=download.as_executor_step(),
tokenizer=tokenizer,
format=TextLmDatasetFormat(text_key="content"),
worker_resources=ResourceConfig(ram=ram, disk="10g"),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Remove unsupported worker_resources kwarg

default_tokenize does not define a worker_resources parameter, so this call path will fail immediately with TypeError: default_tokenize() got an unexpected keyword argument 'worker_resources' when tokenize_starcoder2_extras() is executed. That means the new StarCoder2 extras experiment cannot run at all.

Useful? React with 👍 / 👎.

)
Comment on lines +33 to +39
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Bug: default_tokenize does not accept worker_resources

default_tokenize in experiments/defaults.py:201 has this signature:

def default_tokenize(
    name: str,
    dataset: InputName | ExecutorStep | str | HfDatasetSpec,
    tokenizer: str,
    format: LmDatasetFormatBase = TextLmDatasetFormat(),
    *,
    sample_count: int | VersionedValue[int] | None = None,
    is_validation: bool = False,
) -> ExecutorStep:

There is no worker_resources parameter — this will raise TypeError at runtime.

To set custom worker resources, you'll need to use TokenizeConfig + ExecutorStep directly (the pattern used in experiments/pretraining_datasets/dolmino.py and nemotron_v2.py), or pass worker_resources on the TokenizeConfig object:

from marin.processing.tokenize import TokenizeConfig, tokenize
from marin.execution.executor import ExecutorStep, this_output_path, versioned

step = ExecutorStep(
    name=f"tokenized/starcoder2_extras/{subset}",
    fn=tokenize,
    config=TokenizeConfig(
        train_paths=[download.as_executor_step()],
        validation_paths=versioned([]),
        cache_path=this_output_path(),
        tokenizer=versioned(tokenizer),
        format=TextLmDatasetFormat(text_key="content"),
        worker_resources=ResourceConfig(ram=ram, disk="10g"),
    ),
)

)
return steps


if __name__ == "__main__":
executor_main(steps=tokenize_starcoder2_extras())
84 changes: 84 additions & 0 deletions lib/marin/src/marin/datakit/download/starcoder2_extras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright The Marin Authors
# SPDX-License-Identifier: Apache-2.0

"""Download subsets of the bigcode/starcoder2data-extras dataset from HuggingFace.

Subsets: ir_cpp, ir_python, ir_rust, ir_low_resource, documentation, kaggle.
"""

from marin.datakit.download.huggingface import download_hf_step
from marin.execution.step_spec import StepSpec

HF_DATASET_ID = "bigcode/starcoder2data-extras"
HF_REVISION = "1ba0d4f"

SUBSETS = ["ir_cpp", "ir_python", "ir_rust", "ir_low_resource", "documentation", "kaggle"]


def download_starcoder2_extras_step(subset: str) -> StepSpec:
"""Download a single subset of the starcoder2data-extras dataset."""
return download_hf_step(
f"raw/starcoder2_extras/{subset}",
hf_dataset_id=HF_DATASET_ID,
revision=HF_REVISION,
hf_urls_glob=[f"{subset}/*.parquet"],
override_output_path=f"raw/starcoder2_extras-{HF_REVISION}/{subset}",
)


def reshard_starcoder2_extras_step(subset: str, target_shard_mb: int = 200) -> StepSpec:
"""Reshard a downloaded subset into more evenly-sized parquet files."""
raw = download_starcoder2_extras_step(subset)
raw_output_path = raw.output_path

def _run(output_path: str) -> None:
import logging

import pyarrow.parquet as pq
from rigging.filesystem import url_to_fs

logger = logging.getLogger(__name__)
input_path = raw_output_path
fs, _ = url_to_fs(input_path)
files = sorted(f"gs://{f}" for f in fs.glob(f"{input_path}/**/*.parquet") if not f.endswith("/.parquet"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Nit: hardcoded gs:// prefix

fs.glob() returns paths without the protocol prefix, and this re-adds gs://. This couples the code to GCS. Consider deriving the prefix from input_path instead, e.g.:

from urllib.parse import urlparse
scheme = urlparse(input_path).scheme
prefix = f"{scheme}://" if scheme else ""
files = sorted(f"{prefix}{f}" for f in fs.glob(...) if not f.endswith("/.parquet"))


# Read all files, split into evenly-sized output shards
target_bytes = target_shard_mb * 1024 * 1024
shard_idx = 0
for file_path in files:
meta = pq.read_metadata(file_path)
if meta.serialized_size <= target_bytes:
Comment on lines +49 to +50
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Bug: serialized_size is the Parquet metadata size, not the file/data size

pq.read_metadata(file_path).serialized_size returns the serialized size of the Parquet metadata footer (typically a few KB), not the total data size. This means:

  • meta.serialized_size <= target_bytes will almost always be True, so files are never split
  • The rows_per_shard calculation on line 60 divides by a tiny number, producing huge shards

To get the actual data size, sum the row group sizes:

Suggested change
meta = pq.read_metadata(file_path)
if meta.serialized_size <= target_bytes:
data_size = sum(meta.row_group(i).total_byte_size for i in range(meta.num_row_groups))
if data_size <= target_bytes:

And update line 60 similarly:

rows_per_shard = max(1, (table.num_rows * target_bytes) // data_size)

Comment on lines +49 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use actual parquet file size for shard splitting

This logic uses meta.serialized_size to decide whether to split a parquet file, but that field is the serialized metadata/footer size, not the data file size. As a result, large files are typically treated as "small" and copied without splitting, so the reshard step does not actually rebalance oversized shards as intended.

Useful? React with 👍 / 👎.

# Small file — copy as-is
out = f"{output_path}/shard-{shard_idx:05d}.parquet"
table = pq.read_table(file_path)
pq.write_table(table, out)
logger.info(f"Copied {file_path} -> {out} ({table.num_rows} rows)")
shard_idx += 1
else:
# Big file — split by row groups or by row count
table = pq.read_table(file_path)
rows_per_shard = max(1, (table.num_rows * target_bytes) // meta.serialized_size)
offset = 0
while offset < table.num_rows:
chunk = table.slice(offset, min(rows_per_shard, table.num_rows - offset))
out = f"{output_path}/shard-{shard_idx:05d}.parquet"
pq.write_table(chunk, out)
logger.info(
f"Split {file_path}[{offset}:{offset + chunk.num_rows}] -> {out} ({chunk.num_rows} rows)"
)
shard_idx += 1
offset += chunk.num_rows
del table

logger.info(f"Resharded {len(files)} files into {shard_idx} shards")

return StepSpec(
name=f"resharded/starcoder2_extras/{subset}",
fn=_run,
deps=[raw],
)
Comment on lines +75 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Missing hash_attrs for target_shard_mb

The StepSpec doesn't include target_shard_mb in hash_attrs. If someone changes the shard size parameter, the cached output won't be invalidated.

Suggested change
return StepSpec(
name=f"resharded/starcoder2_extras/{subset}",
fn=_run,
deps=[raw],
)
return StepSpec(
name=f"resharded/starcoder2_extras/{subset}",
fn=_run,
deps=[raw],
hash_attrs={"target_shard_mb": target_shard_mb},
)



def download_all_starcoder2_extras_steps() -> list[StepSpec]:
"""Download all selected subsets of starcoder2data-extras."""
return [download_starcoder2_extras_step(subset) for subset in SUBSETS]
Loading