Skip to content
4 changes: 3 additions & 1 deletion data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from data_juicer.utils.constant import RAY_JOB_ENV_VAR
from data_juicer.utils.logger_utils import setup_logger
from data_juicer.utils.mm_utils import SpecialTokens
from data_juicer.utils.ray_utils import is_ray_mode

global_cfg = None
global_parser = None
Expand Down Expand Up @@ -749,7 +750,6 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
"audio_key": cfg.get("audio_key", "audios"),
"video_key": cfg.get("video_key", "videos"),
"image_bytes_key": cfg.get("image_bytes_key", "image_bytes"),
"num_proc": cfg.get("np", None),
"turbo": cfg.get("turbo", False),
"skip_op_error": cfg.get("skip_op_error", True),
"work_dir": cfg.work_dir,
Expand All @@ -758,6 +758,8 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
"video_special_token": cfg.get("video_special_token", SpecialTokens.video),
"eoc_special_token": cfg.get("eoc_special_token", SpecialTokens.eoc),
}
if not is_ray_mode():
op_attrs.update({"num_proc": cfg.get("np", None)})
cfg.process = update_op_attr(cfg.process, op_attrs)

return cfg
Expand Down
39 changes: 11 additions & 28 deletions data_juicer/core/data/ray_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import os
import sys
from functools import partial
from typing import Any, Dict, List, Literal, Optional, Union

Expand All @@ -16,7 +15,6 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import is_remote_path
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.process_utils import calculate_np
from data_juicer.utils.resource_utils import cuda_device_count
from data_juicer.utils.webdataset_utils import _custom_default_decoder

Expand Down Expand Up @@ -148,27 +146,16 @@ def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -
return self
if not isinstance(operators, list):
operators = [operators]

from data_juicer.utils.process_utils import calculate_ray_np

calculate_ray_np(operators)

for op in operators:
self._run_single_op(op)
return self

def _run_single_op(self, op):
# TODO: optimize auto proc
auto_parallel = False
if op.num_proc:
op_proc = op.num_proc
else:
auto_parallel = True
op_proc = sys.maxsize
auto_op_proc = calculate_np(op._name, op.mem_required, op.cpu_required, op.use_cuda(), op.gpu_required)
op_proc = min(op_proc, auto_op_proc)

# use ray default parallelism in cpu mode if op.num_proc is not specified
if op.use_cuda() or not auto_parallel:
logger.info(f"Op [{op._name}] running with number of procs:{op_proc}")

num_gpus = op.gpu_required if op.gpu_required else get_num_gpus(op, op_proc)

if op._name in TAGGING_OPS.modules and Fields.meta not in self.data.columns():

def process_batch_arrow(table: pyarrow.Table):
Expand All @@ -193,8 +180,8 @@ def process_batch_arrow(table: pyarrow.Table):
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_cpus=op.cpu_required,
num_gpus=num_gpus,
concurrency=op_proc,
num_gpus=op.gpu_required,
concurrency=op.num_proc,
batch_format="pyarrow",
)
else:
Expand All @@ -203,9 +190,7 @@ def process_batch_arrow(table: pyarrow.Table):
batch_size=batch_size,
batch_format="pyarrow",
num_cpus=op.cpu_required,
concurrency=(
None if auto_parallel else op_proc
), # use ray default parallelism in cpu mode if num_proc is not specified
concurrency=op.num_proc,
)
elif isinstance(op, Filter):
columns = self.data.columns()
Expand All @@ -229,8 +214,8 @@ def process_batch_arrow(table: pyarrow.Table):
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_cpus=op.cpu_required,
num_gpus=num_gpus,
concurrency=op_proc,
num_gpus=op.gpu_required,
concurrency=op.num_proc,
batch_format="pyarrow",
)
else:
Expand All @@ -239,9 +224,7 @@ def process_batch_arrow(table: pyarrow.Table):
batch_size=batch_size,
batch_format="pyarrow",
num_cpus=op.cpu_required,
concurrency=(
None if auto_parallel else op_proc
), # use ray default parallelism in cpu mode if num_proc is not specified
concurrency=op.num_proc,
)
if op.stats_export_path is not None:
self.data.write_json(op.stats_export_path, force_ascii=False)
Expand Down
21 changes: 15 additions & 6 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from data_juicer.utils.mm_utils import SpecialTokens, size_to_bytes
from data_juicer.utils.model_utils import free_models
from data_juicer.utils.process_utils import calculate_np
from data_juicer.utils.ray_utils import is_ray_mode
from data_juicer.utils.registry import Registry
from data_juicer.utils.resource_utils import is_cuda_available

Expand Down Expand Up @@ -191,10 +192,10 @@ def __init__(self, *args, **kwargs):
self.accelerator = self._accelerator

# parameters to determine the number of procs for this op
self.num_proc = kwargs.get("num_proc", None)
self.cpu_required = kwargs.get("cpu_required", 1)
self.gpu_required = kwargs.get("gpu_required", 0)
self.mem_required = kwargs.get("mem_required", 0)
self.num_proc = kwargs.get("num_proc", -1) # -1 means automatic calculation of concurrency
self.cpu_required = kwargs.get("cpu_required", None)
self.gpu_required = kwargs.get("gpu_required", None)
self.mem_required = kwargs.get("mem_required", None)
if isinstance(self.mem_required, str):
self.mem_required = size_to_bytes(self.mem_required) / 1024**3

Expand All @@ -215,6 +216,12 @@ def __init__(self, *args, **kwargs):
method = wrap_func_with_nested_access(method)
setattr(self, name, method)

def use_auto_proc(self):
if is_ray_mode() and not self.use_cuda(): # ray task
return self.num_proc == -1
else:
return not self.num_proc or self.num_proc == -1

def is_batched_op(self):
return self._batched_op

Expand All @@ -228,8 +235,10 @@ def runtime_np(self):
# Local import to avoid logger being serialized in multiprocessing
from loguru import logger

op_proc = calculate_np(self._name, self.mem_required, self.cpu_required, self.use_cuda(), self.gpu_required)
if self.num_proc is not None:
op_proc = calculate_np(
self._name, self.mem_required, self.cpu_required or 1, self.use_cuda(), self.gpu_required
)
if not self.use_auto_proc():
op_proc = min(op_proc, self.num_proc)
logger.debug(f"Op [{self._name}] running with number of procs:{op_proc}")
return op_proc
Expand Down
Loading
Loading