From e8f8cd7617c1b0a6fba843108f8f38e5a8f2ce83 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Fri, 19 Sep 2025 12:24:37 +0800 Subject: [PATCH 01/12] Optimize the auto num_proc calculation of operators in ray mode --- data_juicer/config/config.py | 3 +- data_juicer/core/data/ray_dataset.py | 293 +++++++++++++++++++++++--- data_juicer/utils/ray_utils.py | 23 +- tests/core/data/test_ray_aoto_proc.py | 215 +++++++++++++++++++ 4 files changed, 503 insertions(+), 31 deletions(-) create mode 100644 tests/core/data/test_ray_aoto_proc.py diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 8d62e820f4..f5bf62182e 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -28,6 +28,7 @@ from data_juicer.utils.constant import RAY_JOB_ENV_VAR from data_juicer.utils.logger_utils import setup_logger from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.ray_utils import is_ray_mode global_cfg = None global_parser = None @@ -749,7 +750,7 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False): "audio_key": cfg.get("audio_key", "audios"), "video_key": cfg.get("video_key", "videos"), "image_bytes_key": cfg.get("image_bytes_key", "image_bytes"), - "num_proc": cfg.get("np", None), + "num_proc": cfg.get("np", None) if not is_ray_mode() else None, "turbo": cfg.get("turbo", False), "skip_op_error": cfg.get("skip_op_error", True), "work_dir": cfg.work_dir, diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 8bb62d5834..8854562c60 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -1,7 +1,8 @@ from __future__ import annotations +import itertools +import math import os -import sys from functools import partial from typing import Any, Dict, List, Literal, Optional, Union @@ -16,11 +17,11 @@ from data_juicer.utils.constant import Fields from data_juicer.utils.file_utils import is_remote_path from data_juicer.utils.lazy_loader import LazyLoader -from data_juicer.utils.process_utils import calculate_np from data_juicer.utils.resource_utils import cuda_device_count from data_juicer.utils.webdataset_utils import _custom_default_decoder ray = LazyLoader("ray") +_OPS_MEMORY_LIMIT_FRACTION = 0.7 def get_abs_path(path, dataset_dir): @@ -90,6 +91,73 @@ def filter_batch(batch, filter_func): return batch.filter(mask) +def find_optimal_concurrency(resource_ratios, total_resource): + """ + Search for the optimal concurrency allocation to achieve the + highest total resource utilization and the most balanced processing capacity. + + Args: + resource_ratios (list[float]): List of single-process resource ratios for each operator + total_resource (float): Total resource + + Return: + tuple: (list of optimal concurrency, total resource usage, standard deviation of processing capacity) + If there is no valid combination, return (None, 0, 0) + """ + n = len(resource_ratios) + if n == 0: + return (None, 0, 0) + + sum_r_squared = sum(r * r for r in resource_ratios) + if sum_r_squared == 0: + return (None, 0, 0) + + c_floats = [(total_resource * r) / sum_r_squared for r in resource_ratios] + + # generate candidate concurrency + candidates = [] + for cf in c_floats: + floor_cf = math.floor(cf) + ceil_cf = math.ceil(cf) + possible = set() + if floor_cf >= 1: + possible.add(floor_cf) + possible.add(ceil_cf) + possible = [max(1, v) for v in possible] + candidates.append(sorted(list(set(possible)))) + + # traverse all combinations + best_combination = None + max_resource_usage = 0 + min_std = float("inf") + + for combo in itertools.product(*candidates): + total_used = sum(c * r for c, r in zip(combo, resource_ratios)) + if total_used > total_resource: + continue + + # calculate the standard deviation of processing capacity + processing_powers = [c / r for c, r in zip(combo, resource_ratios)] + mean = sum(processing_powers) / n + variance = sum((x - mean) ** 2 for x in processing_powers) / n + std = math.sqrt(variance) + + # update the optimal solution (priority resource utilization, suboptimal standard deviation) + if total_used > max_resource_usage: + max_resource_usage = total_used + best_combination = combo + min_std = std + elif total_used == max_resource_usage and std < min_std: + best_combination = combo + min_std = std + + return ( + list(best_combination) if best_combination else None, + max_resource_usage, + min_std if best_combination else 0, + ) + + class RayDataset(DJDataset): def __init__(self, dataset: ray.data.Dataset, dataset_path: str = None, cfg: Optional[Namespace] = None) -> None: self.data = preprocess_dataset(dataset, dataset_path, cfg) @@ -143,32 +211,205 @@ def get_column(self, column: str, k: Optional[int] = None) -> List[Any]: return [row[column] for row in self.data.take()] + @staticmethod + def set_resource_for_ops(operators): + """ + Automatically calculates optimal concurrency for Ray Data operator. + This function handles both task and actor based operators, considering + resource requirements and user specifications. The computation follows Ray Data's + concurrency semantics while optimizing resource utilization. + + Key Concepts: + - Resource Ratio: Individual operator's resource requirement (GPU/CPU/memory) + compared to total cluster resources, using max(cpu_ratio, gpu_ratio, adjusted_mem_ratio) + - Fixed Allocation: Portion of resources reserved by operators with user-specified num_proc + - Dynamic Allocation: Remaining resources distributed among auto-scaling operators + + Design Logic: + 1. User Specification Priority: + - If user provides concurrency setting, directly return it + - Applies to both task and actor based operators + 2. Task Operators (equivalent to a cpu operator in dj): + a. When unspecified: Return None to let Ray determine implicitly + b. Auto-calculation: Returns maximum concurrency based on available + resources and operator requirements + 3. Actor Operators (equivalent to a gpu operator in dj): + a. Mandatory concurrency - set required gpus to 1 if unspecified, and then refer to the following `b` + to calculate automatically based on this setting + b. Auto-calculation returns tuple (min_concurrency, max_concurrency): + i. Minimum: Ensures baseline resource allocation in remaining resources + when all operators are active simultaneously (proportionally) + ii. Maximum: Allows full utilization of remaining resources by single + operator when others are idle + """ + from data_juicer.utils.ray_utils import ( + ray_available_gpu_memories, + ray_available_memories, + ray_cpu_count, + ray_gpu_count, + ) + from data_juicer.utils.resource_utils import is_cuda_available + + cuda_available = is_cuda_available() + + total_cpu = ray_cpu_count() + total_gpu = ray_gpu_count() + + available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB + available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB + + resource_configs = {} + + for op in operators: + cpu_req = op.cpu_required + mem_req = op.mem_required + gpu_req = 0 + gpu_mem_req = 0 + base_resource_frac = 0.0 + + if op.gpu_required: + if not op.use_cuda(): + raise ValueError( + f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " + "but appears to lack GPU support. If you have verified this operator support GPU acceleration, " + 'please explicitly set its property: `_accelerator = "cuda"`.' + ) + if not cuda_available: + raise ValueError( + f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " + "but the gpu is unavailable. Please check whether your environment is installed correctly" + " and whether there is a gpu in the resource pool." + ) + + # if it is a cuda operator, mem_required will be calculated as gpu memory; + # if it is a cpu, it will be calculated as memory. + + auto_proc = False if op.num_proc else True + + # GPU operator calculations + if op.use_cuda(): + gpu_req = op.gpu_required + gpu_mem_req = op.mem_required + if not gpu_req and not gpu_mem_req: + logger.warning( + f"The required cuda memory and gpu of Op[{op._name}] " + f"has not been specified. " + f"Please specify the `mem_required` field or `gpu_required` field in the " + f"config file. You can reference the `config_all.yaml` file." + f"Set the `gpu_required` to 1 now." + ) + gpu_req = 1 + + base_resource_frac = max( + cpu_req / total_cpu if cpu_req else 0, + gpu_req / total_gpu if gpu_req else 0, + gpu_mem_req / available_gpu_mem if gpu_mem_req else 0, + ) + + if not gpu_req: + gpu_req = math.ceil(base_resource_frac * total_gpu * 100) / 100 + # CPU operator calculations + else: + if cpu_req or mem_req: + base_resource_frac = max( + cpu_req / total_cpu if cpu_req else 0, mem_req / available_mem if mem_req else 0 + ) + else: + logger.warning( + f"The required memory and cpu of Op[{op._name}] " + f"has not been specified. " + f"We recommend specifying the `mem_required` field or `cpu_required` field in the " + f"config file. You can reference the `config_all.yaml` file." + ) + # Default to single CPU if no requirements specified + base_resource_frac = 1 / total_cpu + + resource_configs[op._name] = { + "cpu_required": cpu_req, + "gpu_required": gpu_req, + "mem_required": mem_req, + "gpu_mem_required": gpu_mem_req, + "base_resource_frac": base_resource_frac, + "num_proc": tuple(op.num_proc) if isinstance(op.num_proc, list) else op.num_proc, + "auto_proc": auto_proc, + } + + fixed_min_resources = 0 + fixed_max_resources = 0 + auto_resource_frac_map = {} + for op_name, cfg in resource_configs.items(): + if cfg["auto_proc"]: + auto_resource_frac_map[op_name] = cfg["base_resource_frac"] + else: + num_proc = cfg["num_proc"] + min_proc = num_proc[0] if isinstance(num_proc, (tuple, list)) else num_proc + max_proc = num_proc[1] if isinstance(num_proc, (tuple, list)) else num_proc + fixed_min_resources += cfg["base_resource_frac"] * min_proc + fixed_max_resources += cfg["base_resource_frac"] * max_proc + + # Validate resource availability + total_auto_base_resource = sum(list(auto_resource_frac_map.values())) + total_required_min = fixed_min_resources + total_auto_base_resource + if total_required_min > 1: + raise ValueError( + f"Insufficient cluster resources: " + f"At least {total_required_min:.2f}x the current resource is required. " + f"Add resources or reduce operator requirements." + ) + if len(auto_resource_frac_map) > 0: + remaining_min_frac = 1 - fixed_max_resources + remaining_max_frac = 1 - fixed_min_resources + + op_names, op_resources = [], [] + for k, v in auto_resource_frac_map.items(): + op_names.append(k) + op_resources.append(v) + best_combination, _, _ = find_optimal_concurrency(op_resources, remaining_min_frac) + best_combination = dict(zip(op_names, best_combination)) + + for op_name, cfg in resource_configs.items(): + if cfg["auto_proc"]: + min_proc = best_combination[op_name] + max_proc = int(max(1, remaining_max_frac / cfg["base_resource_frac"])) + cfg["num_proc"] = min_proc if min_proc == max_proc else (min_proc, max_proc) + + for op in operators: + cfg = resource_configs[op._name] + auto_proc, num_proc = cfg["auto_proc"], cfg["num_proc"] + if op.use_cuda(): + op.cpu_required = cfg["cpu_required"] + op.gpu_required = cfg["gpu_required"] + op.num_proc = num_proc + else: + # * If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data + # launches *at most* ``n`` concurrent tasks. + op.cpu_required = cfg["cpu_required"] + op.gpu_required = None + # if concurrency left to None, the automatic concurrency of ray may be slightly higher, which could lead to OOM + op.num_proc = num_proc[1] if (auto_proc and isinstance(num_proc, (tuple, list))) else num_proc + # op.num_proc = None if auto_proc else num_proc + + logger.info( + f"Op[{op._name}] will be executed with the following resources: " + f"num_cpus: {op.cpu_required}, " + f"num_gpus: {op.gpu_required}, " + f"concurrency: {op.num_proc}, " + ) + return operators + def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -> DJDataset: if operators is None: return self if not isinstance(operators, list): operators = [operators] + + RayDataset.set_resource_for_ops(operators) + for op in operators: self._run_single_op(op) return self def _run_single_op(self, op): - # TODO: optimize auto proc - auto_parallel = False - if op.num_proc: - op_proc = op.num_proc - else: - auto_parallel = True - op_proc = sys.maxsize - auto_op_proc = calculate_np(op._name, op.mem_required, op.cpu_required, op.use_cuda(), op.gpu_required) - op_proc = min(op_proc, auto_op_proc) - - # use ray default parallelism in cpu mode if op.num_proc is not specified - if op.use_cuda() or not auto_parallel: - logger.info(f"Op [{op._name}] running with number of procs:{op_proc}") - - num_gpus = op.gpu_required if op.gpu_required else get_num_gpus(op, op_proc) - if op._name in TAGGING_OPS.modules and Fields.meta not in self.data.columns(): def process_batch_arrow(table: pyarrow.Table): @@ -193,8 +434,8 @@ def process_batch_arrow(table: pyarrow.Table): fn_constructor_kwargs=op_kwargs, batch_size=batch_size, num_cpus=op.cpu_required, - num_gpus=num_gpus, - concurrency=op_proc, + num_gpus=op.gpu_required, + concurrency=op.num_proc, batch_format="pyarrow", ) else: @@ -203,9 +444,7 @@ def process_batch_arrow(table: pyarrow.Table): batch_size=batch_size, batch_format="pyarrow", num_cpus=op.cpu_required, - concurrency=( - None if auto_parallel else op_proc - ), # use ray default parallelism in cpu mode if num_proc is not specified + concurrency=op.num_proc, ) elif isinstance(op, Filter): columns = self.data.columns() @@ -229,8 +468,8 @@ def process_batch_arrow(table: pyarrow.Table): fn_constructor_kwargs=op_kwargs, batch_size=batch_size, num_cpus=op.cpu_required, - num_gpus=num_gpus, - concurrency=op_proc, + num_gpus=op.gpu_required, + concurrency=op.num_proc, batch_format="pyarrow", ) else: @@ -239,9 +478,7 @@ def process_batch_arrow(table: pyarrow.Table): batch_size=batch_size, batch_format="pyarrow", num_cpus=op.cpu_required, - concurrency=( - None if auto_parallel else op_proc - ), # use ray default parallelism in cpu mode if num_proc is not specified + concurrency=op.num_proc, ) if op.stats_export_path is not None: self.data.write_json(op.stats_export_path, force_ascii=False) diff --git a/data_juicer/utils/ray_utils.py b/data_juicer/utils/ray_utils.py index cd6ec1f3df..e6da4606fc 100644 --- a/data_juicer/utils/ray_utils.py +++ b/data_juicer/utils/ray_utils.py @@ -56,7 +56,7 @@ def collect_node_info(): cpu_count = psutil.cpu_count() try: - free_gpus_memory = [] + gpus_memory, free_gpus_memory = [], [] nvidia_smi_output = subprocess.check_output( ["nvidia-smi", "--query-gpu=memory.free", "--format=csv,noheader,nounits"] ).decode("utf-8") @@ -64,14 +64,22 @@ def collect_node_info(): for line in nvidia_smi_output.strip().split("\n"): free_gpus_memory.append(int(line)) + nvidia_smi_output = subprocess.check_output( + ["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"] + ).decode("utf-8") + + for line in nvidia_smi_output.strip().split("\n"): + gpus_memory.append(int(line)) + except Exception: # no gpu - free_gpus_memory = [] + gpus_memory, free_gpus_memory = [], [] return { "free_memory": free_mem, # MB "cpu_count": cpu_count, "gpu_count": len(free_gpus_memory), + "gpus_memory": gpus_memory, "free_gpus_memory": free_gpus_memory, # MB } @@ -135,3 +143,14 @@ def ray_available_gpu_memories(): available_gpu_mems.extend(info["free_gpus_memory"]) return available_gpu_mems + + +def ray_gpu_memories(): + """Total gpu memory of each gpu card for each alive node in MB.""" + ray_nodes_info = get_ray_nodes_info() + + gpu_mems = [] + for nodeid, info in ray_nodes_info.items(): + gpu_mems.extend(info["gpus_memory"]) + + return gpu_mems diff --git a/tests/core/data/test_ray_aoto_proc.py b/tests/core/data/test_ray_aoto_proc.py new file mode 100644 index 0000000000..10fc24c343 --- /dev/null +++ b/tests/core/data/test_ray_aoto_proc.py @@ -0,0 +1,215 @@ +import unittest +from unittest.mock import MagicMock, patch +from data_juicer.core.data.ray_dataset import RayDataset + + +class TestRayDataset(unittest.TestCase): + def setUp(self): + self.mock_op = lambda use_cuda: MagicMock( + cpu_required=0, + mem_required=0, + gpu_required=None, + num_proc=None, + _name="test_op", + use_cuda=lambda: use_cuda + ) + + # Common patchers + self.ray_cpu_patcher = patch( + 'data_juicer.utils.ray_utils.ray_cpu_count') + self.ray_gpu_patcher = patch( + 'data_juicer.utils.ray_utils.ray_gpu_count') + self.ray_mem_patcher = patch( + 'data_juicer.utils.ray_utils.ray_available_memories') + self.ray_gpu_mem_patcher = patch( + 'data_juicer.utils.ray_utils.ray_available_gpu_memories') + self.cuda_available_patcher = patch( + 'data_juicer.utils.resource_utils.is_cuda_available') + + self.mock_cpu = self.ray_cpu_patcher.start() + self.mock_gpu = self.ray_gpu_patcher.start() + self.mock_mem = self.ray_mem_patcher.start() + self.mock_gpu_mem = self.ray_gpu_mem_patcher.start() + self.mock_cuda_available = self.cuda_available_patcher.start() + + # Default cluster resources (4 CPUs, 8GB RAM, 1 GPU 16GB) + self.mock_cpu.return_value = 4 + self.mock_gpu.return_value = 1 + self.mock_mem.return_value = [8192] # 8GB + self.mock_gpu_mem.return_value = [16384] # 16GB + self.mock_cuda_available.return_value = True + + def tearDown(self): + self.ray_cpu_patcher.stop() + self.ray_gpu_patcher.stop() + self.ray_mem_patcher.stop() + self.ray_gpu_mem_patcher.stop() + self.cuda_available_patcher.stop() + + def test_cpu_op_auto_scaling(self): + """Test CPU operator with auto scaling""" + op = self.mock_op(use_cuda=False) + op.cpu_required = 1 + + RayDataset.set_resource_for_ops([op]) + self.assertEqual(op.num_proc, 4) # 4 CPUs / 1 per op + self.assertEqual(op.cpu_required, 1) + self.assertEqual(op.gpu_required, None) + + def test_gpu_op_auto_scaling(self): + """Test GPU operator with auto scaling""" + op = self.mock_op(use_cuda=True) + op.gpu_required = 1 + + RayDataset.set_resource_for_ops([op]) + self.assertEqual(op.num_proc, 1) # Only 1 GPU available + self.assertEqual(op.gpu_required, 1) + self.assertEqual(op.cpu_required, 0) + + def test_user_specified_num_proc(self): + """Test user-specified num_proc takes priority""" + op = self.mock_op(use_cuda=False) + op.num_proc = 2 + op.cpu_required = 1 + + RayDataset.set_resource_for_ops([op]) + self.assertEqual(op.num_proc, 2) + self.assertEqual(op.cpu_required, 1) + self.assertEqual(op.gpu_required, None) + + def test_mixed_ops_resource_allocation(self): + """Test mixed operators with fixed and auto scaling""" + fixed_op = self.mock_op(use_cuda=False) + fixed_op._name = 'op1' + fixed_op.num_proc = (1, 2) + fixed_op.cpu_required = 1 + + auto_op = self.mock_op(use_cuda=False) + auto_op._name = 'op2' + auto_op.cpu_required = 1 + + self.mock_cpu.return_value = 8 # 8 CPUs total + + RayDataset.set_resource_for_ops([fixed_op, auto_op]) + # Fixed min: 1 core * 1 proc = 1 core + # Fixed max: 1 core * 2 proc = 2 core + # Auto: each needs 1 core, remaining (8-2)-(8-1) cores + # Min proc = 6 // 1 = 6, Max = 7 // 1 = 7 + self.assertEqual(auto_op.num_proc, 7) + self.assertEqual(auto_op.cpu_required, 1) + self.assertEqual(fixed_op.cpu_required, 1) + self.assertEqual(fixed_op.num_proc, (1, 2)) + + def test_insufficient_resources(self): + """Test resource overallocation exception""" + op1 = self.mock_op(use_cuda=False) + op1._name = 'op1' + op1.num_proc = 5 # 1 core per proc + op1.cpu_required = 1 + + op2 = self.mock_op(use_cuda=False) + op2._name = 'op2' + op2.cpu_required = 1 + + self.mock_cpu.return_value = 4 # Only 4 cores available + + with self.assertRaises(ValueError) as cm: + RayDataset.set_resource_for_ops([op1, op2]) + + # required cpus: 1*5 + 1, 6/4=1.5 + self.assertEqual(str(cm.exception), + "Insufficient cluster resources: At least 1.50x the current resource is required. " + "Add resources or reduce operator requirements.") + + def test_gpu_op_without_cuda(self): + """Test GPU operator when CUDA is unavailable""" + self.mock_cuda_available.return_value = False + op = self.mock_op(use_cuda=True) + op.gpu_required = 1 + + with self.assertRaises(ValueError) as cm: + RayDataset.set_resource_for_ops([op]) + + self.assertEqual(str(cm.exception), + "Op[test_op] attempted to request GPU resources (gpu_required=1), " + "but the gpu is unavailable. Please check whether your environment is installed correctly" + " and whether there is a gpu in the resource pool.") + + def test_multi_ops_with_cpu_gpu(self): + """Test operator with no resource requirements""" + + op1_cuda = self.mock_op(use_cuda=True) + op1_cuda.mem_required = 2 + op1_cuda.cpu_required = 1 + op1_cuda._name = 'op1_cuda' + + op2_cuda = self.mock_op(use_cuda=True) + op2_cuda.gpu_required = 0.5 + op2_cuda._name = 'op2_cuda' + + op3_cuda = self.mock_op(use_cuda=True) + op3_cuda.gpu_required = 0.2 + op3_cuda.num_proc = (5, 10) + op3_cuda._name = 'op3_cuda' + + op1_cpu = self.mock_op(use_cuda=False) + op1_cpu.mem_required = 2 + op1_cpu._name = 'op1_cpu' + + op2_cpu = self.mock_op(use_cuda=False) + op2_cpu.cpu_required = 0.5 + op2_cpu._name = 'op2_cpu' + + op3_cpu = self.mock_op(use_cuda=False) + op3_cpu.cpu_required = 0.2 + op3_cpu.num_proc = 10 + op3_cpu._name = 'op3_cpu' + + op4_cpu = self.mock_op(use_cuda=False) + op4_cpu._name = 'op4_cpu' + + self.mock_cpu.return_value = 100 + self.mock_gpu.return_value = 5 + self.mock_mem.return_value = [131072] # 128 GB + self.mock_gpu_mem.return_value = [51200] # 5 * 10GB + + RayDataset.set_resource_for_ops([op1_cuda, op2_cuda, op3_cuda, op1_cpu, op2_cpu, op3_cpu, op4_cpu]) + + self.assertEqual(op1_cuda.num_proc, (2, 13)) + self.assertEqual(op1_cuda.cpu_required, 1) + self.assertEqual(op1_cuda.gpu_required, 0.29) + self.assertEqual(op1_cuda.mem_required, 2) + + self.assertEqual(op2_cuda.num_proc, (2, 7)) + self.assertEqual(op2_cuda.cpu_required, 0) + self.assertEqual(op2_cuda.gpu_required, 0.5) + self.assertEqual(op2_cuda.mem_required, 0) + + self.assertEqual(op3_cuda.num_proc, (5, 10)) + self.assertEqual(op3_cuda.cpu_required, 0) + self.assertEqual(op3_cuda.gpu_required, 0.2) + self.assertEqual(op3_cuda.mem_required, 0) + + self.assertEqual(op1_cpu.num_proc, 34) + self.assertEqual(op1_cpu.cpu_required, 0) + self.assertEqual(op1_cpu.gpu_required, None) + self.assertEqual(op1_cpu.mem_required, 2) + + self.assertEqual(op2_cpu.num_proc, 156) + self.assertEqual(op2_cpu.cpu_required, 0.5) + self.assertEqual(op2_cpu.gpu_required, None) + self.assertEqual(op2_cpu.mem_required, 0) + + self.assertEqual(op3_cpu.num_proc, 10) + self.assertEqual(op3_cpu.cpu_required, 0.2) + self.assertEqual(op3_cpu.gpu_required, None) + self.assertEqual(op3_cpu.mem_required, 0) + + self.assertEqual(op4_cpu.num_proc, 78) + self.assertEqual(op4_cpu.cpu_required, 0) + self.assertEqual(op4_cpu.gpu_required, None) + self.assertEqual(op4_cpu.mem_required, 0) + + +if __name__ == '__main__': + unittest.main() From 8751361ee162396776c777081f70ac38e76d7a57 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Fri, 19 Sep 2025 16:48:07 +0800 Subject: [PATCH 02/12] add todo --- data_juicer/core/data/ray_dataset.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 8854562c60..0b0b7bff7c 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -250,14 +250,12 @@ def set_resource_for_ops(operators): ) from data_juicer.utils.resource_utils import is_cuda_available + # TODO: split to cpu resources and gpu resources cuda_available = is_cuda_available() - total_cpu = ray_cpu_count() total_gpu = ray_gpu_count() - available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB - resource_configs = {} for op in operators: @@ -280,10 +278,8 @@ def set_resource_for_ops(operators): "but the gpu is unavailable. Please check whether your environment is installed correctly" " and whether there is a gpu in the resource pool." ) - # if it is a cuda operator, mem_required will be calculated as gpu memory; # if it is a cpu, it will be calculated as memory. - auto_proc = False if op.num_proc else True # GPU operator calculations @@ -369,8 +365,12 @@ def set_resource_for_ops(operators): for op_name, cfg in resource_configs.items(): if cfg["auto_proc"]: + # TODO: min_proc = best_combination[op_name] + # issue: https://github.com/ray-project/ray/issues/55307 + # or min_proc = 1 ? max_proc = int(max(1, remaining_max_frac / cfg["base_resource_frac"])) + # or max_proc = int(max(1, 1 / cfg["base_resource_frac"])) ? use all resources cfg["num_proc"] = min_proc if min_proc == max_proc else (min_proc, max_proc) for op in operators: From 2c092fb3161efd8bb9ef9aa275fc5a0ca3dd4fd8 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Mon, 22 Sep 2025 11:43:54 +0800 Subject: [PATCH 03/12] update test file name --- tests/core/data/{test_ray_aoto_proc.py => test_ray_auto_proc.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/core/data/{test_ray_aoto_proc.py => test_ray_auto_proc.py} (100%) diff --git a/tests/core/data/test_ray_aoto_proc.py b/tests/core/data/test_ray_auto_proc.py similarity index 100% rename from tests/core/data/test_ray_aoto_proc.py rename to tests/core/data/test_ray_auto_proc.py From 43b5f79e05f10d61ce2962bd55fbeb0ada1a0b78 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Tue, 23 Sep 2025 17:03:01 +0800 Subject: [PATCH 04/12] optimize --- data_juicer/core/data/ray_dataset.py | 260 +-------------------- data_juicer/ops/base_op.py | 10 +- data_juicer/utils/process_utils.py | 325 ++++++++++++++++++++++++++ data_juicer/utils/ray_utils.py | 14 +- tests/core/data/test_ray_auto_proc.py | 215 ----------------- tests/utils/test_process_utils.py | 242 ++++++++++++++++++- 6 files changed, 579 insertions(+), 487 deletions(-) delete mode 100644 tests/core/data/test_ray_auto_proc.py diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 0b0b7bff7c..ba3ca2bafd 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -1,7 +1,5 @@ from __future__ import annotations -import itertools -import math import os from functools import partial from typing import Any, Dict, List, Literal, Optional, Union @@ -21,7 +19,6 @@ from data_juicer.utils.webdataset_utils import _custom_default_decoder ray = LazyLoader("ray") -_OPS_MEMORY_LIMIT_FRACTION = 0.7 def get_abs_path(path, dataset_dir): @@ -91,73 +88,6 @@ def filter_batch(batch, filter_func): return batch.filter(mask) -def find_optimal_concurrency(resource_ratios, total_resource): - """ - Search for the optimal concurrency allocation to achieve the - highest total resource utilization and the most balanced processing capacity. - - Args: - resource_ratios (list[float]): List of single-process resource ratios for each operator - total_resource (float): Total resource - - Return: - tuple: (list of optimal concurrency, total resource usage, standard deviation of processing capacity) - If there is no valid combination, return (None, 0, 0) - """ - n = len(resource_ratios) - if n == 0: - return (None, 0, 0) - - sum_r_squared = sum(r * r for r in resource_ratios) - if sum_r_squared == 0: - return (None, 0, 0) - - c_floats = [(total_resource * r) / sum_r_squared for r in resource_ratios] - - # generate candidate concurrency - candidates = [] - for cf in c_floats: - floor_cf = math.floor(cf) - ceil_cf = math.ceil(cf) - possible = set() - if floor_cf >= 1: - possible.add(floor_cf) - possible.add(ceil_cf) - possible = [max(1, v) for v in possible] - candidates.append(sorted(list(set(possible)))) - - # traverse all combinations - best_combination = None - max_resource_usage = 0 - min_std = float("inf") - - for combo in itertools.product(*candidates): - total_used = sum(c * r for c, r in zip(combo, resource_ratios)) - if total_used > total_resource: - continue - - # calculate the standard deviation of processing capacity - processing_powers = [c / r for c, r in zip(combo, resource_ratios)] - mean = sum(processing_powers) / n - variance = sum((x - mean) ** 2 for x in processing_powers) / n - std = math.sqrt(variance) - - # update the optimal solution (priority resource utilization, suboptimal standard deviation) - if total_used > max_resource_usage: - max_resource_usage = total_used - best_combination = combo - min_std = std - elif total_used == max_resource_usage and std < min_std: - best_combination = combo - min_std = std - - return ( - list(best_combination) if best_combination else None, - max_resource_usage, - min_std if best_combination else 0, - ) - - class RayDataset(DJDataset): def __init__(self, dataset: ray.data.Dataset, dataset_path: str = None, cfg: Optional[Namespace] = None) -> None: self.data = preprocess_dataset(dataset, dataset_path, cfg) @@ -211,199 +141,15 @@ def get_column(self, column: str, k: Optional[int] = None) -> List[Any]: return [row[column] for row in self.data.take()] - @staticmethod - def set_resource_for_ops(operators): - """ - Automatically calculates optimal concurrency for Ray Data operator. - This function handles both task and actor based operators, considering - resource requirements and user specifications. The computation follows Ray Data's - concurrency semantics while optimizing resource utilization. - - Key Concepts: - - Resource Ratio: Individual operator's resource requirement (GPU/CPU/memory) - compared to total cluster resources, using max(cpu_ratio, gpu_ratio, adjusted_mem_ratio) - - Fixed Allocation: Portion of resources reserved by operators with user-specified num_proc - - Dynamic Allocation: Remaining resources distributed among auto-scaling operators - - Design Logic: - 1. User Specification Priority: - - If user provides concurrency setting, directly return it - - Applies to both task and actor based operators - 2. Task Operators (equivalent to a cpu operator in dj): - a. When unspecified: Return None to let Ray determine implicitly - b. Auto-calculation: Returns maximum concurrency based on available - resources and operator requirements - 3. Actor Operators (equivalent to a gpu operator in dj): - a. Mandatory concurrency - set required gpus to 1 if unspecified, and then refer to the following `b` - to calculate automatically based on this setting - b. Auto-calculation returns tuple (min_concurrency, max_concurrency): - i. Minimum: Ensures baseline resource allocation in remaining resources - when all operators are active simultaneously (proportionally) - ii. Maximum: Allows full utilization of remaining resources by single - operator when others are idle - """ - from data_juicer.utils.ray_utils import ( - ray_available_gpu_memories, - ray_available_memories, - ray_cpu_count, - ray_gpu_count, - ) - from data_juicer.utils.resource_utils import is_cuda_available - - # TODO: split to cpu resources and gpu resources - cuda_available = is_cuda_available() - total_cpu = ray_cpu_count() - total_gpu = ray_gpu_count() - available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB - available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB - resource_configs = {} - - for op in operators: - cpu_req = op.cpu_required - mem_req = op.mem_required - gpu_req = 0 - gpu_mem_req = 0 - base_resource_frac = 0.0 - - if op.gpu_required: - if not op.use_cuda(): - raise ValueError( - f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " - "but appears to lack GPU support. If you have verified this operator support GPU acceleration, " - 'please explicitly set its property: `_accelerator = "cuda"`.' - ) - if not cuda_available: - raise ValueError( - f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " - "but the gpu is unavailable. Please check whether your environment is installed correctly" - " and whether there is a gpu in the resource pool." - ) - # if it is a cuda operator, mem_required will be calculated as gpu memory; - # if it is a cpu, it will be calculated as memory. - auto_proc = False if op.num_proc else True - - # GPU operator calculations - if op.use_cuda(): - gpu_req = op.gpu_required - gpu_mem_req = op.mem_required - if not gpu_req and not gpu_mem_req: - logger.warning( - f"The required cuda memory and gpu of Op[{op._name}] " - f"has not been specified. " - f"Please specify the `mem_required` field or `gpu_required` field in the " - f"config file. You can reference the `config_all.yaml` file." - f"Set the `gpu_required` to 1 now." - ) - gpu_req = 1 - - base_resource_frac = max( - cpu_req / total_cpu if cpu_req else 0, - gpu_req / total_gpu if gpu_req else 0, - gpu_mem_req / available_gpu_mem if gpu_mem_req else 0, - ) - - if not gpu_req: - gpu_req = math.ceil(base_resource_frac * total_gpu * 100) / 100 - # CPU operator calculations - else: - if cpu_req or mem_req: - base_resource_frac = max( - cpu_req / total_cpu if cpu_req else 0, mem_req / available_mem if mem_req else 0 - ) - else: - logger.warning( - f"The required memory and cpu of Op[{op._name}] " - f"has not been specified. " - f"We recommend specifying the `mem_required` field or `cpu_required` field in the " - f"config file. You can reference the `config_all.yaml` file." - ) - # Default to single CPU if no requirements specified - base_resource_frac = 1 / total_cpu - - resource_configs[op._name] = { - "cpu_required": cpu_req, - "gpu_required": gpu_req, - "mem_required": mem_req, - "gpu_mem_required": gpu_mem_req, - "base_resource_frac": base_resource_frac, - "num_proc": tuple(op.num_proc) if isinstance(op.num_proc, list) else op.num_proc, - "auto_proc": auto_proc, - } - - fixed_min_resources = 0 - fixed_max_resources = 0 - auto_resource_frac_map = {} - for op_name, cfg in resource_configs.items(): - if cfg["auto_proc"]: - auto_resource_frac_map[op_name] = cfg["base_resource_frac"] - else: - num_proc = cfg["num_proc"] - min_proc = num_proc[0] if isinstance(num_proc, (tuple, list)) else num_proc - max_proc = num_proc[1] if isinstance(num_proc, (tuple, list)) else num_proc - fixed_min_resources += cfg["base_resource_frac"] * min_proc - fixed_max_resources += cfg["base_resource_frac"] * max_proc - - # Validate resource availability - total_auto_base_resource = sum(list(auto_resource_frac_map.values())) - total_required_min = fixed_min_resources + total_auto_base_resource - if total_required_min > 1: - raise ValueError( - f"Insufficient cluster resources: " - f"At least {total_required_min:.2f}x the current resource is required. " - f"Add resources or reduce operator requirements." - ) - if len(auto_resource_frac_map) > 0: - remaining_min_frac = 1 - fixed_max_resources - remaining_max_frac = 1 - fixed_min_resources - - op_names, op_resources = [], [] - for k, v in auto_resource_frac_map.items(): - op_names.append(k) - op_resources.append(v) - best_combination, _, _ = find_optimal_concurrency(op_resources, remaining_min_frac) - best_combination = dict(zip(op_names, best_combination)) - - for op_name, cfg in resource_configs.items(): - if cfg["auto_proc"]: - # TODO: - min_proc = best_combination[op_name] - # issue: https://github.com/ray-project/ray/issues/55307 - # or min_proc = 1 ? - max_proc = int(max(1, remaining_max_frac / cfg["base_resource_frac"])) - # or max_proc = int(max(1, 1 / cfg["base_resource_frac"])) ? use all resources - cfg["num_proc"] = min_proc if min_proc == max_proc else (min_proc, max_proc) - - for op in operators: - cfg = resource_configs[op._name] - auto_proc, num_proc = cfg["auto_proc"], cfg["num_proc"] - if op.use_cuda(): - op.cpu_required = cfg["cpu_required"] - op.gpu_required = cfg["gpu_required"] - op.num_proc = num_proc - else: - # * If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data - # launches *at most* ``n`` concurrent tasks. - op.cpu_required = cfg["cpu_required"] - op.gpu_required = None - # if concurrency left to None, the automatic concurrency of ray may be slightly higher, which could lead to OOM - op.num_proc = num_proc[1] if (auto_proc and isinstance(num_proc, (tuple, list))) else num_proc - # op.num_proc = None if auto_proc else num_proc - - logger.info( - f"Op[{op._name}] will be executed with the following resources: " - f"num_cpus: {op.cpu_required}, " - f"num_gpus: {op.gpu_required}, " - f"concurrency: {op.num_proc}, " - ) - return operators - def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -> DJDataset: if operators is None: return self if not isinstance(operators, list): operators = [operators] - RayDataset.set_resource_for_ops(operators) + from data_juicer.utils.process_utils import calculate_ray_np + + calculate_ray_np(operators) for op in operators: self._run_single_op(op) diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index 5d6ffd4b58..d1831a9156 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -191,8 +191,8 @@ def __init__(self, *args, **kwargs): self.accelerator = self._accelerator # parameters to determine the number of procs for this op - self.num_proc = kwargs.get("num_proc", None) - self.cpu_required = kwargs.get("cpu_required", 1) + self.num_proc = kwargs.get("num_proc", -1) # -1 means automatic calculation of concurrency + self.cpu_required = kwargs.get("cpu_required", 0) self.gpu_required = kwargs.get("gpu_required", 0) self.mem_required = kwargs.get("mem_required", 0) if isinstance(self.mem_required, str): @@ -228,8 +228,10 @@ def runtime_np(self): # Local import to avoid logger being serialized in multiprocessing from loguru import logger - op_proc = calculate_np(self._name, self.mem_required, self.cpu_required, self.use_cuda(), self.gpu_required) - if self.num_proc is not None: + op_proc = calculate_np( + self._name, self.mem_required, self.cpu_required or 1, self.use_cuda(), self.gpu_required + ) + if self.num_proc is not None and self.num_proc != -1: op_proc = min(op_proc, self.num_proc) logger.debug(f"Op [{self._name}] running with number of procs:{op_proc}") return op_proc diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index 343b0c9a58..9da9c4be55 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -1,6 +1,8 @@ +import itertools import math import os import subprocess +import sys import multiprocess as mp from loguru import logger @@ -12,6 +14,10 @@ cuda_device_count, ) +# A safety fraction to avoid OOM by not allocating all available memory to operators. +# This leaves some memory for Ray's overhead and other system processes. +_OPS_MEMORY_LIMIT_FRACTION = 0.7 + def setup_mp(method=None): if mp.current_process().name != "MainProcess": @@ -118,3 +124,322 @@ def calculate_np(name, mem_required, cpu_required, use_cuda=False, gpu_required= f"and required cpu: {cpu_required}." ) return auto_num_proc + + +def _find_optimal_concurrency(resource_ratios, total_resource): + """ + Search for the optimal concurrency allocation to achieve the + highest total resource utilization and the most balanced processing capacity. + + Args: + resource_ratios (list[float]): List of single-process resource ratios for each operator + total_resource (float): Total resource + + Return: + tuple: (list of optimal concurrency, total resource usage, standard deviation of processing capacity) + If there is no valid combination, return (None, 0, 0) + """ + n = len(resource_ratios) + if n == 0: + return (None, 0, 0) + + sum_r_squared = sum(r * r for r in resource_ratios) + if sum_r_squared == 0: + return (None, 0, 0) + + c_floats = [(total_resource * r) / sum_r_squared for r in resource_ratios] + + # generate candidate concurrency + candidates = [] + for cf in c_floats: + floor_cf = math.floor(cf) + ceil_cf = math.ceil(cf) + possible = set() + if floor_cf >= 1: + possible.add(floor_cf) + possible.add(ceil_cf) + possible = [max(1, v) for v in possible] + candidates.append(sorted(list(set(possible)))) + + # traverse all combinations + best_combination = None + max_resource_usage = 0 + min_std = float("inf") + + for combo in itertools.product(*candidates): + total_used = sum(c * r for c, r in zip(combo, resource_ratios)) + if total_used > total_resource: + continue + + # calculate the standard deviation of processing capacity + processing_powers = [c / r for c, r in zip(combo, resource_ratios)] + mean = sum(processing_powers) / n + variance = sum((x - mean) ** 2 for x in processing_powers) / n + std = math.sqrt(variance) + + # update the optimal solution (priority resource utilization, suboptimal standard deviation) + if total_used > max_resource_usage: + max_resource_usage = total_used + best_combination = combo + min_std = std + elif total_used == max_resource_usage and std < min_std: + best_combination = combo + min_std = std + + return ( + list(best_combination) if best_combination else None, + max_resource_usage, + min_std if best_combination else 0, + ) + + +def calculate_ray_np(operators): + """ + Automatically calculates optimal concurrency for Ray Data operator. + This function handles both task and actor based operators, considering + resource requirements and user specifications. The computation follows Ray Data's + concurrency semantics while optimizing resource utilization. + + Key Concepts: + - Resource Ratio: Individual operator's resource requirement (GPU/CPU/memory) + compared to total cluster resources, using max(cpu_ratio, gpu_ratio, adjusted_mem_ratio) + - Fixed Allocation: Portion of resources reserved by operators with user-specified num_proc + - Dynamic Allocation: Remaining resources distributed among auto-scaling operators + + Design Logic: + 1. User Specification Priority: + - If user provides concurrency setting, directly return it + - Applies to both task and actor based operators + 2. Task Operators (equivalent to a cpu operator in dj): + a. When unspecified: Return None to let Ray determine implicitly + b. Auto-calculation: Returns maximum concurrency based on available + resources and operator requirements + 3. Actor Operators (equivalent to a gpu operator in dj): + a. Mandatory concurrency - set required gpus to 1 if unspecified, and then refer to the following `b` + to calculate automatically based on this setting + b. Auto-calculation returns tuple (min_concurrency, max_concurrency): + i. Minimum: Ensures baseline resource allocation in remaining resources + when all operators are active simultaneously (proportionally) + ii. Maximum: Allows full utilization of remaining resources by single + operator when others are idle + """ + from data_juicer.utils.ray_utils import ( + ray_available_gpu_memories, + ray_available_memories, + ray_cpu_count, + ray_gpu_count, + ) + from data_juicer.utils.resource_utils import is_cuda_available + + cuda_available = is_cuda_available() + total_cpu = ray_cpu_count() + total_gpu = ray_gpu_count() + available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB + available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB + resource_configs = {} + + for op in operators: + cpu_req = op.cpu_required + mem_req = op.mem_required + gpu_req = 0 + gpu_mem_req = 0 + + if op.gpu_required: + if not op.use_cuda(): + raise ValueError( + f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " + "but appears to lack GPU support. If you have verified this operator support GPU acceleration, " + 'please explicitly set its property: `_accelerator = "cuda"`.' + ) + if not cuda_available: + raise ValueError( + f"Op[{op._name}] attempted to request GPU resources (gpu_required={op.gpu_required}), " + "but the gpu is unavailable. Please check whether your environment is installed correctly" + " and whether there is a gpu in the resource pool." + ) + # if it is a cuda operator, mem_required will be calculated as gpu memory; + # if it is a cpu, it will be calculated as memory. + cpu_required_frac, gpu_required_frac = 0, 0 + # GPU operator calculations + if op.use_cuda(): + gpu_req = op.gpu_required + gpu_mem_req = op.mem_required + if not gpu_req and not gpu_mem_req: + logger.warning( + f"The required cuda memory and gpu of Op[{op._name}] " + f"has not been specified. " + f"Please specify the `mem_required` field or `gpu_required` field in the " + f"config file. You can reference the `config_all.yaml` file." + f"Set the `gpu_required` to 1 now." + ) + gpu_req = 1 + + cpu_required_frac = cpu_req / total_cpu if cpu_req else 0 + gpu_required_frac = max( + gpu_req / total_gpu if gpu_req else 0, + gpu_mem_req / available_gpu_mem if gpu_mem_req else 0, + ) + + if not gpu_req: + gpu_req = math.ceil(gpu_required_frac * total_gpu * 100) / 100 + + auto_proc = False if (op.num_proc and op.num_proc != -1) else True + + # CPU operator calculations + else: + if cpu_req or mem_req: + cpu_required_frac = max( + cpu_req / total_cpu if cpu_req else 0, mem_req / available_mem if mem_req else 0 + ) + else: + logger.warning( + f"The required memory and cpu of Op[{op._name}] " + f"has not been specified. " + f"We recommend specifying the `mem_required` field or `cpu_required` field in the " + f"config file. You can reference the `config_all.yaml` file." + ) + # Default to single CPU if no requirements specified + cpu_required_frac = 1 / total_cpu + if op.num_proc: + if not isinstance(op.num_proc, int): + raise ValueError( + f"Op[{op._name}] is running with cpu resource, ``num_proc`` is expected to be set as an integer. " + f"Use ``concurrency=n`` to control maximum number of workers to use, but got: {op.num_proc}." + ) + + auto_proc = False if op.num_proc != -1 else True + + resource_configs[op._name] = { + "cpu_required": cpu_req, + "gpu_required": gpu_req, + "mem_required": mem_req, + "gpu_mem_required": gpu_mem_req, + "cpu_required_frac": cpu_required_frac, + "gpu_required_frac": gpu_required_frac, + "num_proc": tuple(op.num_proc) if isinstance(op.num_proc, list) else op.num_proc, + "auto_proc": auto_proc, + "is_actor": op.use_cuda(), + } + + fixed_min_cpu = 0 + fixed_max_cpu = 0 + fixed_min_gpu = 0 + fixed_max_gpu = 0 + auto_resource_frac_map = {} + for op_name, cfg in resource_configs.items(): + if cfg["auto_proc"]: + auto_resource_frac_map[op_name] = (cfg["cpu_required_frac"], cfg["gpu_required_frac"]) + else: + num_proc = cfg["num_proc"] + + # support the task concurrency to be specified as None value + if not cfg["is_actor"] and not num_proc: + continue + + if cfg["is_actor"]: + min_proc = num_proc[0] if isinstance(num_proc, (tuple, list)) else num_proc + else: + min_proc = 1 # when ``fn`` is a function, , only the maximum concurrency can be specified + max_proc = num_proc[1] if isinstance(num_proc, (tuple, list)) else num_proc + fixed_min_cpu += cfg["cpu_required_frac"] * min_proc + fixed_max_cpu += cfg["cpu_required_frac"] * max_proc + fixed_min_gpu += cfg["gpu_required_frac"] * min_proc + fixed_max_gpu += cfg["gpu_required_frac"] * max_proc + + # Validate resource availability + total_auto_base_cpu = sum([i[0] for i in list(auto_resource_frac_map.values())]) + total_auto_base_gpu = sum([i[1] for i in list(auto_resource_frac_map.values())]) + total_required_min_cpu = fixed_min_cpu + total_auto_base_cpu + if total_required_min_cpu > 1: + raise ValueError( + f"Insufficient cpu resources: " + f"At least {total_required_min_cpu * total_cpu} cpus are required, but only {total_cpu} are available. " + f"Please add resources to ray cluster or reduce operator requirements." + ) + total_required_min_gpu = fixed_min_gpu + total_auto_base_gpu + if total_required_min_gpu > 1: + raise ValueError( + f"Insufficient gpu resources: " + f"At least {total_required_min_gpu * total_gpu} cpus are required, but only {total_gpu} are available. " + f"Please add resources to ray cluster or reduce operator requirements." + ) + if len(auto_resource_frac_map) > 0: + remaining_min_frac_cpu = 1 - fixed_max_cpu + remaining_max_frac_cpu = 1 - fixed_min_cpu + remaining_min_frac_gpu = 1 - fixed_max_gpu + remaining_max_frac_gpu = 1 - fixed_min_gpu + + op_resources_cpu, op_resources_gpu = {}, {} + # if both cpu and gpu are required, the allocation will be prioritized based on the gpu fraction + for k, v in auto_resource_frac_map.items(): + if v[1] > 0: # (cpu, gpu) + op_resources_gpu[k] = v[1] + elif v[0] > 0: + op_resources_cpu[k] = v[0] + + best_combination_cpu, best_combination_gpu = {}, {} + if len(op_resources_gpu) > 0: + _gpu_names, _gpu_resources = [], [] + for k, v in op_resources_gpu.items(): + _gpu_names.append(k) + _gpu_resources.append(v) + _best_combination_gpu, _, _ = _find_optimal_concurrency(_gpu_resources, remaining_min_frac_gpu) + best_combination_gpu = dict(zip(_gpu_names, _best_combination_gpu)) + if len(op_resources_cpu) > 0: + _cpu_names, _cpu_resources = [], [] + for k, v in op_resources_cpu.items(): + _cpu_names.append(k) + _cpu_resources.append(v) + _best_combination_cpu, _, _ = _find_optimal_concurrency(_cpu_resources, remaining_min_frac_cpu) + best_combination_cpu = dict(zip(_cpu_names, _best_combination_cpu)) + + best_combination = {} + for k in list(auto_resource_frac_map.keys()): + best_combination[k] = min( + best_combination_gpu.get(k, sys.maxsize), best_combination_cpu.get(k, sys.maxsize) + ) + for op_name, cfg in resource_configs.items(): + if cfg["auto_proc"]: + # TODO: + # issue: https://github.com/ray-project/ray/issues/55307 + # or min_proc = 1 ? + # or max_proc = int(max(1, 1 / cfg["base_resource_frac"])) ? use all resources + + # max_frac_cpu, max_frac_gpu = 1, 1 + max_frac_cpu, max_frac_gpu = remaining_max_frac_cpu, remaining_max_frac_gpu + min_proc = best_combination[op_name] + + if cfg["cpu_required_frac"] and cfg["gpu_required_frac"]: + max_proc = min( + int(max(1, max_frac_cpu / cfg["cpu_required_frac"])), + int(max(1, max_frac_gpu / cfg["gpu_required_frac"])), + ) + elif cfg["gpu_required_frac"]: + max_proc = int(max(1, max_frac_gpu / cfg["gpu_required_frac"])) + else: + max_proc = int(max(1, max_frac_cpu / cfg["cpu_required_frac"])) + + cfg["num_proc"] = min_proc if min_proc == max_proc else (min_proc, max_proc) + + for op in operators: + cfg = resource_configs[op._name] + auto_proc, num_proc = cfg["auto_proc"], cfg["num_proc"] + if cfg["is_actor"]: + op.cpu_required = cfg["cpu_required"] + op.gpu_required = cfg["gpu_required"] + op.num_proc = num_proc + else: + # * If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data + # launches *at most* ``n`` concurrent tasks. + op.cpu_required = cfg["cpu_required"] + op.gpu_required = None + # if concurrency left to None, the automatic concurrency of ray may be slightly higher, which could lead to OOM + op.num_proc = num_proc[1] if (auto_proc and isinstance(num_proc, (tuple, list))) else num_proc + + logger.info( + f"Op[{op._name}] will be executed with the following resources: " + f"num_cpus: {op.cpu_required}, " + f"num_gpus: {op.gpu_required}, " + f"concurrency: {op.num_proc}, " + ) + return operators diff --git a/data_juicer/utils/ray_utils.py b/data_juicer/utils/ray_utils.py index e6da4606fc..e7560cbc96 100644 --- a/data_juicer/utils/ray_utils.py +++ b/data_juicer/utils/ray_utils.py @@ -58,19 +58,13 @@ def collect_node_info(): try: gpus_memory, free_gpus_memory = [], [] nvidia_smi_output = subprocess.check_output( - ["nvidia-smi", "--query-gpu=memory.free", "--format=csv,noheader,nounits"] + ["nvidia-smi", "--query-gpu=memory.free,memory.total", "--format=csv,noheader,nounits"] ).decode("utf-8") for line in nvidia_smi_output.strip().split("\n"): - free_gpus_memory.append(int(line)) - - nvidia_smi_output = subprocess.check_output( - ["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"] - ).decode("utf-8") - - for line in nvidia_smi_output.strip().split("\n"): - gpus_memory.append(int(line)) - + free_mem_str, total_mem_str = line.split(", ") + free_gpus_memory.append(int(free_mem_str)) + gpus_memory.append(int(total_mem_str)) except Exception: # no gpu gpus_memory, free_gpus_memory = [], [] diff --git a/tests/core/data/test_ray_auto_proc.py b/tests/core/data/test_ray_auto_proc.py deleted file mode 100644 index 10fc24c343..0000000000 --- a/tests/core/data/test_ray_auto_proc.py +++ /dev/null @@ -1,215 +0,0 @@ -import unittest -from unittest.mock import MagicMock, patch -from data_juicer.core.data.ray_dataset import RayDataset - - -class TestRayDataset(unittest.TestCase): - def setUp(self): - self.mock_op = lambda use_cuda: MagicMock( - cpu_required=0, - mem_required=0, - gpu_required=None, - num_proc=None, - _name="test_op", - use_cuda=lambda: use_cuda - ) - - # Common patchers - self.ray_cpu_patcher = patch( - 'data_juicer.utils.ray_utils.ray_cpu_count') - self.ray_gpu_patcher = patch( - 'data_juicer.utils.ray_utils.ray_gpu_count') - self.ray_mem_patcher = patch( - 'data_juicer.utils.ray_utils.ray_available_memories') - self.ray_gpu_mem_patcher = patch( - 'data_juicer.utils.ray_utils.ray_available_gpu_memories') - self.cuda_available_patcher = patch( - 'data_juicer.utils.resource_utils.is_cuda_available') - - self.mock_cpu = self.ray_cpu_patcher.start() - self.mock_gpu = self.ray_gpu_patcher.start() - self.mock_mem = self.ray_mem_patcher.start() - self.mock_gpu_mem = self.ray_gpu_mem_patcher.start() - self.mock_cuda_available = self.cuda_available_patcher.start() - - # Default cluster resources (4 CPUs, 8GB RAM, 1 GPU 16GB) - self.mock_cpu.return_value = 4 - self.mock_gpu.return_value = 1 - self.mock_mem.return_value = [8192] # 8GB - self.mock_gpu_mem.return_value = [16384] # 16GB - self.mock_cuda_available.return_value = True - - def tearDown(self): - self.ray_cpu_patcher.stop() - self.ray_gpu_patcher.stop() - self.ray_mem_patcher.stop() - self.ray_gpu_mem_patcher.stop() - self.cuda_available_patcher.stop() - - def test_cpu_op_auto_scaling(self): - """Test CPU operator with auto scaling""" - op = self.mock_op(use_cuda=False) - op.cpu_required = 1 - - RayDataset.set_resource_for_ops([op]) - self.assertEqual(op.num_proc, 4) # 4 CPUs / 1 per op - self.assertEqual(op.cpu_required, 1) - self.assertEqual(op.gpu_required, None) - - def test_gpu_op_auto_scaling(self): - """Test GPU operator with auto scaling""" - op = self.mock_op(use_cuda=True) - op.gpu_required = 1 - - RayDataset.set_resource_for_ops([op]) - self.assertEqual(op.num_proc, 1) # Only 1 GPU available - self.assertEqual(op.gpu_required, 1) - self.assertEqual(op.cpu_required, 0) - - def test_user_specified_num_proc(self): - """Test user-specified num_proc takes priority""" - op = self.mock_op(use_cuda=False) - op.num_proc = 2 - op.cpu_required = 1 - - RayDataset.set_resource_for_ops([op]) - self.assertEqual(op.num_proc, 2) - self.assertEqual(op.cpu_required, 1) - self.assertEqual(op.gpu_required, None) - - def test_mixed_ops_resource_allocation(self): - """Test mixed operators with fixed and auto scaling""" - fixed_op = self.mock_op(use_cuda=False) - fixed_op._name = 'op1' - fixed_op.num_proc = (1, 2) - fixed_op.cpu_required = 1 - - auto_op = self.mock_op(use_cuda=False) - auto_op._name = 'op2' - auto_op.cpu_required = 1 - - self.mock_cpu.return_value = 8 # 8 CPUs total - - RayDataset.set_resource_for_ops([fixed_op, auto_op]) - # Fixed min: 1 core * 1 proc = 1 core - # Fixed max: 1 core * 2 proc = 2 core - # Auto: each needs 1 core, remaining (8-2)-(8-1) cores - # Min proc = 6 // 1 = 6, Max = 7 // 1 = 7 - self.assertEqual(auto_op.num_proc, 7) - self.assertEqual(auto_op.cpu_required, 1) - self.assertEqual(fixed_op.cpu_required, 1) - self.assertEqual(fixed_op.num_proc, (1, 2)) - - def test_insufficient_resources(self): - """Test resource overallocation exception""" - op1 = self.mock_op(use_cuda=False) - op1._name = 'op1' - op1.num_proc = 5 # 1 core per proc - op1.cpu_required = 1 - - op2 = self.mock_op(use_cuda=False) - op2._name = 'op2' - op2.cpu_required = 1 - - self.mock_cpu.return_value = 4 # Only 4 cores available - - with self.assertRaises(ValueError) as cm: - RayDataset.set_resource_for_ops([op1, op2]) - - # required cpus: 1*5 + 1, 6/4=1.5 - self.assertEqual(str(cm.exception), - "Insufficient cluster resources: At least 1.50x the current resource is required. " - "Add resources or reduce operator requirements.") - - def test_gpu_op_without_cuda(self): - """Test GPU operator when CUDA is unavailable""" - self.mock_cuda_available.return_value = False - op = self.mock_op(use_cuda=True) - op.gpu_required = 1 - - with self.assertRaises(ValueError) as cm: - RayDataset.set_resource_for_ops([op]) - - self.assertEqual(str(cm.exception), - "Op[test_op] attempted to request GPU resources (gpu_required=1), " - "but the gpu is unavailable. Please check whether your environment is installed correctly" - " and whether there is a gpu in the resource pool.") - - def test_multi_ops_with_cpu_gpu(self): - """Test operator with no resource requirements""" - - op1_cuda = self.mock_op(use_cuda=True) - op1_cuda.mem_required = 2 - op1_cuda.cpu_required = 1 - op1_cuda._name = 'op1_cuda' - - op2_cuda = self.mock_op(use_cuda=True) - op2_cuda.gpu_required = 0.5 - op2_cuda._name = 'op2_cuda' - - op3_cuda = self.mock_op(use_cuda=True) - op3_cuda.gpu_required = 0.2 - op3_cuda.num_proc = (5, 10) - op3_cuda._name = 'op3_cuda' - - op1_cpu = self.mock_op(use_cuda=False) - op1_cpu.mem_required = 2 - op1_cpu._name = 'op1_cpu' - - op2_cpu = self.mock_op(use_cuda=False) - op2_cpu.cpu_required = 0.5 - op2_cpu._name = 'op2_cpu' - - op3_cpu = self.mock_op(use_cuda=False) - op3_cpu.cpu_required = 0.2 - op3_cpu.num_proc = 10 - op3_cpu._name = 'op3_cpu' - - op4_cpu = self.mock_op(use_cuda=False) - op4_cpu._name = 'op4_cpu' - - self.mock_cpu.return_value = 100 - self.mock_gpu.return_value = 5 - self.mock_mem.return_value = [131072] # 128 GB - self.mock_gpu_mem.return_value = [51200] # 5 * 10GB - - RayDataset.set_resource_for_ops([op1_cuda, op2_cuda, op3_cuda, op1_cpu, op2_cpu, op3_cpu, op4_cpu]) - - self.assertEqual(op1_cuda.num_proc, (2, 13)) - self.assertEqual(op1_cuda.cpu_required, 1) - self.assertEqual(op1_cuda.gpu_required, 0.29) - self.assertEqual(op1_cuda.mem_required, 2) - - self.assertEqual(op2_cuda.num_proc, (2, 7)) - self.assertEqual(op2_cuda.cpu_required, 0) - self.assertEqual(op2_cuda.gpu_required, 0.5) - self.assertEqual(op2_cuda.mem_required, 0) - - self.assertEqual(op3_cuda.num_proc, (5, 10)) - self.assertEqual(op3_cuda.cpu_required, 0) - self.assertEqual(op3_cuda.gpu_required, 0.2) - self.assertEqual(op3_cuda.mem_required, 0) - - self.assertEqual(op1_cpu.num_proc, 34) - self.assertEqual(op1_cpu.cpu_required, 0) - self.assertEqual(op1_cpu.gpu_required, None) - self.assertEqual(op1_cpu.mem_required, 2) - - self.assertEqual(op2_cpu.num_proc, 156) - self.assertEqual(op2_cpu.cpu_required, 0.5) - self.assertEqual(op2_cpu.gpu_required, None) - self.assertEqual(op2_cpu.mem_required, 0) - - self.assertEqual(op3_cpu.num_proc, 10) - self.assertEqual(op3_cpu.cpu_required, 0.2) - self.assertEqual(op3_cpu.gpu_required, None) - self.assertEqual(op3_cpu.mem_required, 0) - - self.assertEqual(op4_cpu.num_proc, 78) - self.assertEqual(op4_cpu.cpu_required, 0) - self.assertEqual(op4_cpu.gpu_required, None) - self.assertEqual(op4_cpu.mem_required, 0) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/utils/test_process_utils.py b/tests/utils/test_process_utils.py index 315e0aa0bd..a0764289d3 100644 --- a/tests/utils/test_process_utils.py +++ b/tests/utils/test_process_utils.py @@ -6,7 +6,7 @@ import torch import ray -from data_juicer.utils.process_utils import setup_mp, get_min_cuda_memory, calculate_np +from data_juicer.utils.process_utils import setup_mp, get_min_cuda_memory, calculate_np, calculate_ray_np from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG from data_juicer.utils.constant import RAY_JOB_ENV_VAR @@ -147,5 +147,245 @@ def test_cpu_num_proc_unset_and_mem_unlimited(self): ) +class CalculateRayNPTest(DataJuicerTestCaseBase): + + def setUp(self): + self.mock_op = lambda use_cuda: MagicMock( + cpu_required=0, + mem_required=0, + gpu_required=None, + num_proc=-1, # auto + _name="test_op", + use_cuda=lambda: use_cuda + ) + + # Common patchers + self.ray_cpu_patcher = patch( + 'data_juicer.utils.ray_utils.ray_cpu_count') + self.ray_gpu_patcher = patch( + 'data_juicer.utils.ray_utils.ray_gpu_count') + self.ray_mem_patcher = patch( + 'data_juicer.utils.ray_utils.ray_available_memories') + self.ray_gpu_mem_patcher = patch( + 'data_juicer.utils.ray_utils.ray_available_gpu_memories') + self.cuda_available_patcher = patch( + 'data_juicer.utils.resource_utils.is_cuda_available') + + self.mock_cpu = self.ray_cpu_patcher.start() + self.mock_gpu = self.ray_gpu_patcher.start() + self.mock_mem = self.ray_mem_patcher.start() + self.mock_gpu_mem = self.ray_gpu_mem_patcher.start() + self.mock_cuda_available = self.cuda_available_patcher.start() + + # Default cluster resources (64 CPUs, 256GB RAM, 8 GPU 32GB) + self.mock_cpu.return_value = 64 + self.mock_gpu.return_value = 8 + self.mock_mem.return_value = [256 * 1024] # 256GB + self.mock_gpu_mem.return_value = [32 * 1024] * 8 # 32GB * 8 + self.mock_cuda_available.return_value = True + + def tearDown(self): + self.ray_cpu_patcher.stop() + self.ray_gpu_patcher.stop() + self.ray_mem_patcher.stop() + self.ray_gpu_mem_patcher.stop() + self.cuda_available_patcher.stop() + + def test_cpu_op_auto_scaling(self): + """Test CPU operator with auto scaling""" + op = self.mock_op(use_cuda=False) + op.cpu_required = 1 + + calculate_ray_np([op]) + self.assertEqual(op.num_proc, 64) # 64 CPUs / 1 per op + self.assertEqual(op.cpu_required, 1) + self.assertEqual(op.gpu_required, None) + + def test_gpu_op_auto_scaling(self): + """Test GPU operator with auto scaling""" + op = self.mock_op(use_cuda=True) + op.gpu_required = 1 + + calculate_ray_np([op]) + self.assertEqual(op.num_proc, 8) # Only 1 op and 8 GPU available + self.assertEqual(op.gpu_required, 1) + self.assertEqual(op.cpu_required, 0) + + def test_user_specified_num_proc(self): + """Test user-specified num_proc takes priority""" + op = self.mock_op(use_cuda=False) + op.num_proc = 2 + op.cpu_required = 1 + + calculate_ray_np([op]) + self.assertEqual(op.num_proc, 2) + self.assertEqual(op.cpu_required, 1) + self.assertEqual(op.gpu_required, None) + + def test_user_specified_num_proc_to_none_in_task(self): + """Test user-specified num_proc takes priority""" + op = self.mock_op(use_cuda=False) + op.num_proc = None + op.cpu_required = 1 + + calculate_ray_np([op]) + self.assertEqual(op.num_proc, None) + self.assertEqual(op.cpu_required, 1) + self.assertEqual(op.gpu_required, None) + + def test_num_proc_check(self): + op = self.mock_op(use_cuda=False) + op._name = 'op1' + op.num_proc = (1, 2) + op.cpu_required = 1 + + with self.assertRaises(ValueError) as cm: + calculate_ray_np([op]) + + self.assertEqual(str(cm.exception), + "Op[op1] is running with cpu resource, ``num_proc`` is expected to be set as an integer. " + "Use ``concurrency=n`` to control maximum number of workers to use, but got: (1, 2).") + + def test_mixed_ops_resource_allocation(self): + """Test mixed operators with fixed and auto scaling""" + fixed_op = self.mock_op(use_cuda=False) + fixed_op._name = 'op1' + fixed_op.num_proc = 4 # concurrency max=4, min=1 + fixed_op.cpu_required = 1 + + auto_op = self.mock_op(use_cuda=False) + auto_op._name = 'op2' + auto_op.cpu_required = 1 + + calculate_ray_np([fixed_op, auto_op]) + + self.assertEqual(fixed_op.cpu_required, 1) + self.assertEqual(fixed_op.num_proc, 4) + self.assertEqual(auto_op.num_proc, 63) + self.assertEqual(auto_op.cpu_required, 1) + + def test_insufficient_resources(self): + """Test resource overallocation exception""" + op1 = self.mock_op(use_cuda=False) + op1._name = 'op1' + op1.num_proc = 5 # concurrency max=5, min=1 + op1.cpu_required = 2 + + op2 = self.mock_op(use_cuda=False) + op2._name = 'op2' + op2.cpu_required = 3 + + self.mock_cpu.return_value = 4 # Only 4 cores available + + with self.assertRaises(ValueError) as cm: + calculate_ray_np([op1, op2]) + + self.assertEqual(str(cm.exception), + "Insufficient cpu resources: At least 5.0 cpus are required, but only 4 are available. " + "Please add resources to ray cluster or reduce operator requirements.") + + def test_gpu_op_without_cuda(self): + """Test GPU operator when CUDA is unavailable""" + self.mock_cuda_available.return_value = False + op = self.mock_op(use_cuda=True) + op.gpu_required = 1 + + with self.assertRaises(ValueError) as cm: + calculate_ray_np([op]) + + self.assertEqual(str(cm.exception), + "Op[test_op] attempted to request GPU resources (gpu_required=1), " + "but the gpu is unavailable. Please check whether your environment is installed correctly" + " and whether there is a gpu in the resource pool.") + + def test_multi_ops_with_cpu_gpu(self): + """Test operator with no resource requirements""" + + op1_cuda = self.mock_op(use_cuda=True) + op1_cuda.mem_required = 2 + op1_cuda.cpu_required = 1 + op1_cuda._name = 'op1_cuda' + + op2_cuda = self.mock_op(use_cuda=True) + op2_cuda.gpu_required = 0.5 + op2_cuda._name = 'op2_cuda' + + op3_cuda = self.mock_op(use_cuda=True) + op3_cuda.gpu_required = 0.2 + op3_cuda.num_proc = (5, 10) + op3_cuda._name = 'op3_cuda' + + op1_cpu = self.mock_op(use_cuda=False) + op1_cpu.mem_required = 8 + op1_cpu._name = 'op1_cpu' + + op2_cpu = self.mock_op(use_cuda=False) + op2_cpu.cpu_required = 5 + op2_cpu._name = 'op2_cpu' + + op3_cpu = self.mock_op(use_cuda=False) + op3_cpu.cpu_required = 0.2 + op3_cpu.num_proc = 10 # concurrency max=10, min=1 + op3_cpu._name = 'op3_cpu' + + op4_cpu = self.mock_op(use_cuda=False) + op4_cpu._name = 'op4_cpu' + + self.mock_cpu.return_value = 100 + self.mock_gpu.return_value = 5 + self.mock_mem.return_value = [131072] # 128 GB + self.mock_gpu_mem.return_value = [10240] * 5 # 10GB * 5 + + calculate_ray_np([op1_cuda, op2_cuda, op3_cuda, op1_cpu, op2_cpu, op3_cpu, op4_cpu]) + + # fixed cpu: + # op3_cpu: 0.2 + # fixed gpu: + # op3_cuda: (1, 2) # (5*0.2, 10*0.2) + + # remaining max cpu: 100-0.2=99.8 + # remaining gpu: (3, 4) + + # auto gpu: 0.29: 0.5 remaining min gpu = 3 + # find_optimal_concurrency([0.29, 0.5], 3) = [3, 4] + + self.assertEqual(op1_cuda.num_proc, (3, 14)) # min=3, max=4/(2/7) + self.assertEqual(op1_cuda.cpu_required, 1) + self.assertEqual(op1_cuda.gpu_required, 0.29) # 2GB / 10GB * 0.7 + self.assertEqual(op1_cuda.mem_required, 2) + + self.assertEqual(op2_cuda.num_proc, (4, 8)) # min=4, max=4/0.5 + self.assertEqual(op2_cuda.cpu_required, 0) + self.assertEqual(op2_cuda.gpu_required, 0.5) + self.assertEqual(op2_cuda.mem_required, 0) + + # fixed gpu + self.assertEqual(op3_cuda.num_proc, (5, 10)) + self.assertEqual(op3_cuda.cpu_required, 0) + self.assertEqual(op3_cuda.gpu_required, 0.2) + self.assertEqual(op3_cuda.mem_required, 0) + + self.assertEqual(op1_cpu.num_proc, 11) # 99.8 / (8/(128 * 0.7) * 100 ) + self.assertEqual(op1_cpu.cpu_required, 0) + self.assertEqual(op1_cpu.gpu_required, None) + self.assertEqual(op1_cpu.mem_required, 8) + + self.assertEqual(op2_cpu.num_proc, 19) # 99.8 / 5 + self.assertEqual(op2_cpu.cpu_required, 5) + self.assertEqual(op2_cpu.gpu_required, None) + self.assertEqual(op2_cpu.mem_required, 0) + + # fixed cpu + self.assertEqual(op3_cpu.num_proc, 10) + self.assertEqual(op3_cpu.cpu_required, 0.2) + self.assertEqual(op3_cpu.gpu_required, None) + self.assertEqual(op3_cpu.mem_required, 0) + + self.assertEqual(op4_cpu.num_proc, 99) + self.assertEqual(op4_cpu.cpu_required, 0) + self.assertEqual(op4_cpu.gpu_required, None) + self.assertEqual(op4_cpu.mem_required, 0) + + if __name__ == '__main__': unittest.main() From 237cc700c40ff4641a3d3c229452e6a6c9f7e549 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Tue, 23 Sep 2025 17:47:44 +0800 Subject: [PATCH 05/12] optimize --- data_juicer/config/config.py | 3 ++- data_juicer/ops/base_op.py | 9 +++++++- data_juicer/utils/process_utils.py | 6 +---- tests/utils/test_process_utils.py | 35 +++++++++++++++--------------- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index f5bf62182e..e368784288 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -750,7 +750,6 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False): "audio_key": cfg.get("audio_key", "audios"), "video_key": cfg.get("video_key", "videos"), "image_bytes_key": cfg.get("image_bytes_key", "image_bytes"), - "num_proc": cfg.get("np", None) if not is_ray_mode() else None, "turbo": cfg.get("turbo", False), "skip_op_error": cfg.get("skip_op_error", True), "work_dir": cfg.work_dir, @@ -759,6 +758,8 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False): "video_special_token": cfg.get("video_special_token", SpecialTokens.video), "eoc_special_token": cfg.get("eoc_special_token", SpecialTokens.eoc), } + if not is_ray_mode(): + op_attrs.update({"num_proc": cfg.get("np", None)}) cfg.process = update_op_attr(cfg.process, op_attrs) return cfg diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index d1831a9156..883d07df65 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -8,6 +8,7 @@ from data_juicer.utils.mm_utils import SpecialTokens, size_to_bytes from data_juicer.utils.model_utils import free_models from data_juicer.utils.process_utils import calculate_np +from data_juicer.utils.ray_utils import is_ray_mode from data_juicer.utils.registry import Registry from data_juicer.utils.resource_utils import is_cuda_available @@ -215,6 +216,12 @@ def __init__(self, *args, **kwargs): method = wrap_func_with_nested_access(method) setattr(self, name, method) + def use_auto_proc(self): + if is_ray_mode() and not self.use_cuda(): # ray task + return self.num_proc == -1 + else: + return not self.num_proc or self.num_proc == -1 + def is_batched_op(self): return self._batched_op @@ -231,7 +238,7 @@ def runtime_np(self): op_proc = calculate_np( self._name, self.mem_required, self.cpu_required or 1, self.use_cuda(), self.gpu_required ) - if self.num_proc is not None and self.num_proc != -1: + if not self.use_auto_proc(): op_proc = min(op_proc, self.num_proc) logger.debug(f"Op [{self._name}] running with number of procs:{op_proc}") return op_proc diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index 9da9c4be55..e6e0d16944 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -283,8 +283,6 @@ def calculate_ray_np(operators): if not gpu_req: gpu_req = math.ceil(gpu_required_frac * total_gpu * 100) / 100 - auto_proc = False if (op.num_proc and op.num_proc != -1) else True - # CPU operator calculations else: if cpu_req or mem_req: @@ -307,8 +305,6 @@ def calculate_ray_np(operators): f"Use ``concurrency=n`` to control maximum number of workers to use, but got: {op.num_proc}." ) - auto_proc = False if op.num_proc != -1 else True - resource_configs[op._name] = { "cpu_required": cpu_req, "gpu_required": gpu_req, @@ -317,7 +313,7 @@ def calculate_ray_np(operators): "cpu_required_frac": cpu_required_frac, "gpu_required_frac": gpu_required_frac, "num_proc": tuple(op.num_proc) if isinstance(op.num_proc, list) else op.num_proc, - "auto_proc": auto_proc, + "auto_proc": op.use_auto_proc(), "is_actor": op.use_cuda(), } diff --git a/tests/utils/test_process_utils.py b/tests/utils/test_process_utils.py index a0764289d3..65190cac18 100644 --- a/tests/utils/test_process_utils.py +++ b/tests/utils/test_process_utils.py @@ -150,13 +150,21 @@ def test_cpu_num_proc_unset_and_mem_unlimited(self): class CalculateRayNPTest(DataJuicerTestCaseBase): def setUp(self): - self.mock_op = lambda use_cuda: MagicMock( + + def _use_auto_proc(num_proc, use_cuda): + if not use_cuda: # ray task + return num_proc == -1 + else: + return not num_proc or num_proc == -1 + + self.mock_op = lambda use_cuda, num_proc=-1: MagicMock( cpu_required=0, mem_required=0, gpu_required=None, - num_proc=-1, # auto + num_proc=num_proc, _name="test_op", - use_cuda=lambda: use_cuda + use_cuda=lambda: use_cuda, + use_auto_proc=lambda: _use_auto_proc(num_proc, use_cuda) ) # Common patchers @@ -213,8 +221,7 @@ def test_gpu_op_auto_scaling(self): def test_user_specified_num_proc(self): """Test user-specified num_proc takes priority""" - op = self.mock_op(use_cuda=False) - op.num_proc = 2 + op = self.mock_op(use_cuda=False, num_proc=2) op.cpu_required = 1 calculate_ray_np([op]) @@ -224,8 +231,7 @@ def test_user_specified_num_proc(self): def test_user_specified_num_proc_to_none_in_task(self): """Test user-specified num_proc takes priority""" - op = self.mock_op(use_cuda=False) - op.num_proc = None + op = self.mock_op(use_cuda=False, num_proc=None) op.cpu_required = 1 calculate_ray_np([op]) @@ -234,9 +240,8 @@ def test_user_specified_num_proc_to_none_in_task(self): self.assertEqual(op.gpu_required, None) def test_num_proc_check(self): - op = self.mock_op(use_cuda=False) + op = self.mock_op(use_cuda=False, num_proc=(1, 2)) op._name = 'op1' - op.num_proc = (1, 2) op.cpu_required = 1 with self.assertRaises(ValueError) as cm: @@ -248,9 +253,8 @@ def test_num_proc_check(self): def test_mixed_ops_resource_allocation(self): """Test mixed operators with fixed and auto scaling""" - fixed_op = self.mock_op(use_cuda=False) + fixed_op = self.mock_op(use_cuda=False, num_proc=4) # concurrency max=4, min=1 fixed_op._name = 'op1' - fixed_op.num_proc = 4 # concurrency max=4, min=1 fixed_op.cpu_required = 1 auto_op = self.mock_op(use_cuda=False) @@ -266,9 +270,8 @@ def test_mixed_ops_resource_allocation(self): def test_insufficient_resources(self): """Test resource overallocation exception""" - op1 = self.mock_op(use_cuda=False) + op1 = self.mock_op(use_cuda=False, num_proc=5) op1._name = 'op1' - op1.num_proc = 5 # concurrency max=5, min=1 op1.cpu_required = 2 op2 = self.mock_op(use_cuda=False) @@ -310,9 +313,8 @@ def test_multi_ops_with_cpu_gpu(self): op2_cuda.gpu_required = 0.5 op2_cuda._name = 'op2_cuda' - op3_cuda = self.mock_op(use_cuda=True) + op3_cuda = self.mock_op(use_cuda=True, num_proc=(5, 10)) op3_cuda.gpu_required = 0.2 - op3_cuda.num_proc = (5, 10) op3_cuda._name = 'op3_cuda' op1_cpu = self.mock_op(use_cuda=False) @@ -323,9 +325,8 @@ def test_multi_ops_with_cpu_gpu(self): op2_cpu.cpu_required = 5 op2_cpu._name = 'op2_cpu' - op3_cpu = self.mock_op(use_cuda=False) + op3_cpu = self.mock_op(use_cuda=False, num_proc=10) # concurrency max=10, min=1 op3_cpu.cpu_required = 0.2 - op3_cpu.num_proc = 10 # concurrency max=10, min=1 op3_cpu._name = 'op3_cpu' op4_cpu = self.mock_op(use_cuda=False) From c3692a51f4942f0e60a4a78537f05dca2629be9c Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Wed, 24 Sep 2025 17:21:20 +0800 Subject: [PATCH 06/12] update --- data_juicer/ops/base_op.py | 6 +++--- data_juicer/utils/process_utils.py | 17 ++++++++--------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index 883d07df65..f350f21741 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -193,9 +193,9 @@ def __init__(self, *args, **kwargs): # parameters to determine the number of procs for this op self.num_proc = kwargs.get("num_proc", -1) # -1 means automatic calculation of concurrency - self.cpu_required = kwargs.get("cpu_required", 0) - self.gpu_required = kwargs.get("gpu_required", 0) - self.mem_required = kwargs.get("mem_required", 0) + self.cpu_required = kwargs.get("cpu_required", None) + self.gpu_required = kwargs.get("gpu_required", None) + self.mem_required = kwargs.get("mem_required", None) if isinstance(self.mem_required, str): self.mem_required = size_to_bytes(self.mem_required) / 1024**3 diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index e6e0d16944..6aa9bb7d17 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -266,9 +266,8 @@ def calculate_ray_np(operators): gpu_mem_req = op.mem_required if not gpu_req and not gpu_mem_req: logger.warning( - f"The required cuda memory and gpu of Op[{op._name}] " - f"has not been specified. " - f"Please specify the `mem_required` field or `gpu_required` field in the " + f"Neither the required cuda memory nor gpu of Op[{op._name}] is specified. " + f"We recommend specifying the `mem_required` field or `gpu_required` field in the " f"config file. You can reference the `config_all.yaml` file." f"Set the `gpu_required` to 1 now." ) @@ -290,12 +289,12 @@ def calculate_ray_np(operators): cpu_req / total_cpu if cpu_req else 0, mem_req / available_mem if mem_req else 0 ) else: - logger.warning( - f"The required memory and cpu of Op[{op._name}] " - f"has not been specified. " - f"We recommend specifying the `mem_required` field or `cpu_required` field in the " - f"config file. You can reference the `config_all.yaml` file." - ) + if op.use_auto_proc(): + logger.warning( + f"Neither the required memory nor cpu of Op[{op._name}] is specified. " + f"We recommend specifying the `cpu_required` field in the " + f"config file. You can reference the `config_all.yaml` file." + ) # Default to single CPU if no requirements specified cpu_required_frac = 1 / total_cpu if op.num_proc: From 55d0658df8fa5bb1304a0976377eca292d91acec Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Wed, 24 Sep 2025 20:12:21 +0800 Subject: [PATCH 07/12] fix bug --- data_juicer/utils/process_utils.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index 6aa9bb7d17..c9383b01b2 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -61,15 +61,15 @@ def get_min_cuda_memory(): def calculate_np(name, mem_required, cpu_required, use_cuda=False, gpu_required=0): """Calculate the optimum number of processes for the given OP automatically。""" - if not use_cuda and gpu_required > 0: + if not use_cuda and gpu_required: raise ValueError( f"Op[{name}] attempted to request GPU resources (gpu_required={gpu_required}), " "but appears to lack GPU support. If you have verified this operator support GPU acceleration, " 'please explicitly set its property: `_accelerator = "cuda"`.' ) - eps = 1e-9 # about 1 byte cpu_num = cpu_count() + auto_proc_from_mem = auto_proc_from_gpu = auto_proc_from_cpu = sys.maxsize if use_cuda: cuda_mems_available = [m / 1024 for m in available_gpu_memories()] # GB @@ -84,11 +84,15 @@ def calculate_np(name, mem_required, cpu_required, use_cuda=False, gpu_required= f"Set the auto `num_proc` to number of GPUs {auto_num_proc}." ) else: - auto_proc_from_mem = sum( - [math.floor(mem_available / (mem_required + eps)) for mem_available in cuda_mems_available] - ) - auto_proc_from_gpu = math.floor(gpu_count / (gpu_required + eps)) - auto_proc_from_cpu = math.floor(cpu_num / (cpu_required + eps)) + if mem_required: + auto_proc_from_mem = sum( + [math.floor(mem_available / mem_required) for mem_available in cuda_mems_available] + ) + if gpu_required: + auto_proc_from_gpu = math.floor(gpu_count / gpu_required) + if cpu_required: + auto_proc_from_cpu = math.floor(cpu_num / cpu_required) + auto_num_proc = min(auto_proc_from_mem, auto_proc_from_gpu, auto_proc_from_cpu) if auto_num_proc < 1: auto_num_proc = len(available_memories()) # set to the number of available nodes @@ -101,8 +105,11 @@ def calculate_np(name, mem_required, cpu_required, use_cuda=False, gpu_required= return auto_num_proc else: mems_available = [m / 1024 for m in available_memories()] # GB - auto_proc_from_mem = sum([math.floor(mem_available / (mem_required + eps)) for mem_available in mems_available]) - auto_proc_from_cpu = math.floor(cpu_num / (cpu_required + eps)) + + if mem_required: + auto_proc_from_mem = sum([math.floor(mem_available / mem_required) for mem_available in mems_available]) + if cpu_required: + auto_proc_from_cpu = math.floor(cpu_num / cpu_required) auto_num_proc = min(cpu_num, auto_proc_from_mem, auto_proc_from_cpu) From 1f2e866483c0c7978bdb6bdfb67df243b3ec97b3 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Thu, 25 Sep 2025 17:24:27 +0800 Subject: [PATCH 08/12] fix ut --- tests/config/test_config.py | 42 +++++++++---------- .../test_ray_bts_minhash_deduplicator.py | 1 + .../test_ray_document_deduplicator.py | 1 + .../test_ray_image_deduplicator.py | 1 + .../test_ray_video_deduplicator.py | 1 + 5 files changed, 25 insertions(+), 21 deletions(-) diff --git a/tests/config/test_config.py b/tests/config/test_config.py index 7847b27a39..ab1a7f34df 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -77,9 +77,9 @@ def test_yaml_cfg_file(self): 'video_special_token': '<__dj__video>', 'accelerator': None, 'num_proc': 4, - 'cpu_required': 1, - 'mem_required': 0, - 'gpu_required': 0, + 'cpu_required': None, + 'mem_required': None, + 'gpu_required': None, 'turbo': False, 'index_key': None, 'skip_op_error': True, @@ -112,10 +112,10 @@ def test_yaml_cfg_file(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, + 'cpu_required': None, + 'mem_required': None, 'turbo': False, - 'gpu_required': 0, + 'gpu_required': None, 'index_key': None, 'skip_op_error': True, 'work_dir': WORKDIR, @@ -196,9 +196,9 @@ def test_mixture_cfg(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, - 'gpu_required': 0, + 'cpu_required': None, + 'mem_required': None, + 'gpu_required': None, 'turbo': False, 'index_key': None, 'skip_op_error': True, @@ -231,10 +231,10 @@ def test_mixture_cfg(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, + 'cpu_required': None, + 'mem_required': None, 'turbo': False, - 'gpu_required': 0, + 'gpu_required': None, 'index_key': None, 'skip_op_error': True, 'work_dir': WORKDIR, @@ -266,10 +266,10 @@ def test_mixture_cfg(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, + 'cpu_required': None, + 'mem_required': None, 'turbo': False, - 'gpu_required': 0, + 'gpu_required': None, 'index_key': None, 'skip_op_error': True, 'work_dir': WORKDIR, @@ -301,10 +301,10 @@ def test_mixture_cfg(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, + 'cpu_required': None, + 'mem_required': None, 'turbo': False, - 'gpu_required': 0, + 'gpu_required': None, 'index_key': None, 'skip_op_error': True, 'work_dir': WORKDIR, @@ -336,10 +336,10 @@ def test_mixture_cfg(self): 'accelerator': None, 'num_proc': 4, 'stats_export_path': None, - 'cpu_required': 1, - 'mem_required': 0, + 'cpu_required': None, + 'mem_required': None, 'turbo': False, - 'gpu_required': 0, + 'gpu_required': None, 'index_key': None, 'skip_op_error': True, 'work_dir': WORKDIR, diff --git a/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py b/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py index b19bc8db0e..b0391e1b4c 100644 --- a/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py @@ -7,6 +7,7 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG +@unittest.skip('avoid oom') class RayBTSMinhashDeduplicatorTest(DataJuicerTestCaseBase): def _run_minhash_dedup(self, dataset: Dataset, target_list, op): diff --git a/tests/ops/deduplicator/test_ray_document_deduplicator.py b/tests/ops/deduplicator/test_ray_document_deduplicator.py index e8bf23183a..9a3cfeb67e 100644 --- a/tests/ops/deduplicator/test_ray_document_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_document_deduplicator.py @@ -7,6 +7,7 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG +@unittest.skip('avoid oom') class RayDocumentDeduplicatorTest(DataJuicerTestCaseBase): def _run_doc_dedup(self, dataset: Dataset, target_list, op): diff --git a/tests/ops/deduplicator/test_ray_image_deduplicator.py b/tests/ops/deduplicator/test_ray_image_deduplicator.py index 0495178458..64d91cb248 100644 --- a/tests/ops/deduplicator/test_ray_image_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_image_deduplicator.py @@ -9,6 +9,7 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG +@unittest.skip('avoid oom') class RayImageDeduplicatorTest(DataJuicerTestCaseBase): data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', diff --git a/tests/ops/deduplicator/test_ray_video_deduplicator.py b/tests/ops/deduplicator/test_ray_video_deduplicator.py index 5bef4431a1..aae447334b 100644 --- a/tests/ops/deduplicator/test_ray_video_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_video_deduplicator.py @@ -9,6 +9,7 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG +@unittest.skip('avoid oom') class RayVideoDeduplicatorTest(DataJuicerTestCaseBase): data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', From d23382ca6e36fa232bd4b07caccf61847ab5d4f2 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Sun, 28 Sep 2025 10:26:44 +0800 Subject: [PATCH 09/12] update _OPS_MEMORY_LIMIT_FRACTION to 1.0 --- data_juicer/utils/process_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index c9383b01b2..b5a9097464 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -16,7 +16,7 @@ # A safety fraction to avoid OOM by not allocating all available memory to operators. # This leaves some memory for Ray's overhead and other system processes. -_OPS_MEMORY_LIMIT_FRACTION = 0.7 +_OPS_MEMORY_LIMIT_FRACTION = 1.0 def setup_mp(method=None): From 2f95b7b9a15ab925a67a83f505a9e1cb0e446bd5 Mon Sep 17 00:00:00 2001 From: "jiangnana.jnn" Date: Tue, 30 Sep 2025 11:56:19 +0800 Subject: [PATCH 10/12] set concurrency to none for cpu operator, using the default autoscaler of ray to ensure performance --- data_juicer/utils/process_utils.py | 17 ++++---- tests/utils/test_process_utils.py | 63 ++++++++++++++++-------------- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index b5a9097464..084f21c61f 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -280,7 +280,8 @@ def calculate_ray_np(operators): ) gpu_req = 1 - cpu_required_frac = cpu_req / total_cpu if cpu_req else 0 + # if no cpu is specified, ray will apply for 1 cpu by default + cpu_required_frac = cpu_req / total_cpu if cpu_req else 1 / total_cpu gpu_required_frac = max( gpu_req / total_gpu if gpu_req else 0, gpu_mem_req / available_gpu_mem if gpu_mem_req else 0, @@ -302,7 +303,7 @@ def calculate_ray_np(operators): f"We recommend specifying the `cpu_required` field in the " f"config file. You can reference the `config_all.yaml` file." ) - # Default to single CPU if no requirements specified + # if no cpu is specified, ray will apply for 1 cpu by default cpu_required_frac = 1 / total_cpu if op.num_proc: if not isinstance(op.num_proc, int): @@ -310,6 +311,9 @@ def calculate_ray_np(operators): f"Op[{op._name}] is running with cpu resource, ``num_proc`` is expected to be set as an integer. " f"Use ``concurrency=n`` to control maximum number of workers to use, but got: {op.num_proc}." ) + # set concurrency to none, using the default autoscaler of ray to ensure performance + if op.num_proc == -1: + op.num_proc = None resource_configs[op._name] = { "cpu_required": cpu_req, @@ -333,19 +337,16 @@ def calculate_ray_np(operators): auto_resource_frac_map[op_name] = (cfg["cpu_required_frac"], cfg["gpu_required_frac"]) else: num_proc = cfg["num_proc"] - - # support the task concurrency to be specified as None value - if not cfg["is_actor"] and not num_proc: - continue - if cfg["is_actor"]: min_proc = num_proc[0] if isinstance(num_proc, (tuple, list)) else num_proc else: min_proc = 1 # when ``fn`` is a function, , only the maximum concurrency can be specified max_proc = num_proc[1] if isinstance(num_proc, (tuple, list)) else num_proc fixed_min_cpu += cfg["cpu_required_frac"] * min_proc - fixed_max_cpu += cfg["cpu_required_frac"] * max_proc fixed_min_gpu += cfg["gpu_required_frac"] * min_proc + if not max_proc: # when num_proc is none, at least one process will be started + max_proc = min_proc # 1 + fixed_max_cpu += cfg["cpu_required_frac"] * max_proc fixed_max_gpu += cfg["gpu_required_frac"] * max_proc # Validate resource availability diff --git a/tests/utils/test_process_utils.py b/tests/utils/test_process_utils.py index 65190cac18..2197837b6a 100644 --- a/tests/utils/test_process_utils.py +++ b/tests/utils/test_process_utils.py @@ -156,16 +156,20 @@ def _use_auto_proc(num_proc, use_cuda): return num_proc == -1 else: return not num_proc or num_proc == -1 + + def create_mock_op(use_cuda, num_proc=-1): + op = MagicMock( + cpu_required=None, + mem_required=None, + gpu_required=None, + num_proc=num_proc, + _name="test_op", + use_cuda=lambda: use_cuda, + ) + op.use_auto_proc = lambda: _use_auto_proc(op.num_proc, use_cuda) + return op - self.mock_op = lambda use_cuda, num_proc=-1: MagicMock( - cpu_required=0, - mem_required=0, - gpu_required=None, - num_proc=num_proc, - _name="test_op", - use_cuda=lambda: use_cuda, - use_auto_proc=lambda: _use_auto_proc(num_proc, use_cuda) - ) + self.mock_op = create_mock_op # Common patchers self.ray_cpu_patcher = patch( @@ -205,7 +209,7 @@ def test_cpu_op_auto_scaling(self): op.cpu_required = 1 calculate_ray_np([op]) - self.assertEqual(op.num_proc, 64) # 64 CPUs / 1 per op + self.assertEqual(op.num_proc, None) self.assertEqual(op.cpu_required, 1) self.assertEqual(op.gpu_required, None) @@ -217,7 +221,7 @@ def test_gpu_op_auto_scaling(self): calculate_ray_np([op]) self.assertEqual(op.num_proc, 8) # Only 1 op and 8 GPU available self.assertEqual(op.gpu_required, 1) - self.assertEqual(op.cpu_required, 0) + self.assertEqual(op.cpu_required, None) def test_user_specified_num_proc(self): """Test user-specified num_proc takes priority""" @@ -265,7 +269,7 @@ def test_mixed_ops_resource_allocation(self): self.assertEqual(fixed_op.cpu_required, 1) self.assertEqual(fixed_op.num_proc, 4) - self.assertEqual(auto_op.num_proc, 63) + self.assertEqual(auto_op.num_proc, None) self.assertEqual(auto_op.cpu_required, 1) def test_insufficient_resources(self): @@ -344,48 +348,47 @@ def test_multi_ops_with_cpu_gpu(self): # fixed gpu: # op3_cuda: (1, 2) # (5*0.2, 10*0.2) - # remaining max cpu: 100-0.2=99.8 # remaining gpu: (3, 4) - # auto gpu: 0.29: 0.5 remaining min gpu = 3 - # find_optimal_concurrency([0.29, 0.5], 3) = [3, 4] + # auto gpu: 0.2: 0.5 remaining min gpu = 3 + # find_optimal_concurrency([0.2, 0.5], 3) = [2, 5] - self.assertEqual(op1_cuda.num_proc, (3, 14)) # min=3, max=4/(2/7) + self.assertEqual(op1_cuda.num_proc, (2, 20)) # min=2, max=4/(2/10) self.assertEqual(op1_cuda.cpu_required, 1) - self.assertEqual(op1_cuda.gpu_required, 0.29) # 2GB / 10GB * 0.7 + self.assertEqual(op1_cuda.gpu_required, 0.2) # 2GB / 10GB * 1.0 self.assertEqual(op1_cuda.mem_required, 2) - self.assertEqual(op2_cuda.num_proc, (4, 8)) # min=4, max=4/0.5 - self.assertEqual(op2_cuda.cpu_required, 0) + self.assertEqual(op2_cuda.num_proc, (5, 8)) # min=4, max=4/0.5 + self.assertEqual(op2_cuda.cpu_required, None) self.assertEqual(op2_cuda.gpu_required, 0.5) - self.assertEqual(op2_cuda.mem_required, 0) + self.assertEqual(op2_cuda.mem_required, None) # fixed gpu self.assertEqual(op3_cuda.num_proc, (5, 10)) - self.assertEqual(op3_cuda.cpu_required, 0) + self.assertEqual(op3_cuda.cpu_required, None) self.assertEqual(op3_cuda.gpu_required, 0.2) - self.assertEqual(op3_cuda.mem_required, 0) + self.assertEqual(op3_cuda.mem_required, None) - self.assertEqual(op1_cpu.num_proc, 11) # 99.8 / (8/(128 * 0.7) * 100 ) - self.assertEqual(op1_cpu.cpu_required, 0) + self.assertEqual(op1_cpu.num_proc, None) + self.assertEqual(op1_cpu.cpu_required, None) self.assertEqual(op1_cpu.gpu_required, None) self.assertEqual(op1_cpu.mem_required, 8) - self.assertEqual(op2_cpu.num_proc, 19) # 99.8 / 5 + self.assertEqual(op2_cpu.num_proc, None) self.assertEqual(op2_cpu.cpu_required, 5) self.assertEqual(op2_cpu.gpu_required, None) - self.assertEqual(op2_cpu.mem_required, 0) + self.assertEqual(op2_cpu.mem_required, None) # fixed cpu self.assertEqual(op3_cpu.num_proc, 10) self.assertEqual(op3_cpu.cpu_required, 0.2) self.assertEqual(op3_cpu.gpu_required, None) - self.assertEqual(op3_cpu.mem_required, 0) + self.assertEqual(op3_cpu.mem_required, None) - self.assertEqual(op4_cpu.num_proc, 99) - self.assertEqual(op4_cpu.cpu_required, 0) + self.assertEqual(op4_cpu.num_proc, None) + self.assertEqual(op4_cpu.cpu_required, None) self.assertEqual(op4_cpu.gpu_required, None) - self.assertEqual(op4_cpu.mem_required, 0) + self.assertEqual(op4_cpu.mem_required, None) if __name__ == '__main__': From 6195ebe4aa1d2971909e8dc0b48fbf6c3bd68ce6 Mon Sep 17 00:00:00 2001 From: Yilun Huang Date: Thu, 9 Oct 2025 14:53:50 +0800 Subject: [PATCH 11/12] Fix OOM issue in unittest/dist (#793) * * start & stop ray cluster in setUp and tearDown method of test cases for ray mode * * start & stop ray cluster in setUp and tearDown method of test cases for ray mode * * try another garbage collection method * * clean up extra code --- data_juicer/utils/unittest_utils.py | 45 ++++++++++--------- tests/analysis/test_column_wise_analysis.py | 2 + tests/analysis/test_correlation_analysis.py | 2 + tests/analysis/test_diversity_analysis.py | 2 + tests/analysis/test_measure.py | 2 + tests/analysis/test_overall_analysis.py | 2 + tests/core/data/test_load_strategy.py | 2 + tests/core/data/test_ray_dataset.py | 1 + tests/core/test_monitor.py | 1 + tests/core/test_ray_exporter.py | 2 + tests/core/test_tracer.py | 1 + tests/download/test_download.py | 1 + tests/format/test_load_formatter.py | 2 + tests/ops/mapper/test_download_file_mapper.py | 2 + tests/utils/test_asset_utils.py | 1 + tests/utils/test_ckpt_utils.py | 1 + tests/utils/test_compress.py | 2 + tests/utils/test_file_utils.py | 1 + tests/utils/test_logger_utils.py | 1 + tests/utils/test_mm_utils.py | 1 + tests/utils/test_process_utils.py | 2 +- 21 files changed, 54 insertions(+), 22 deletions(-) diff --git a/data_juicer/utils/unittest_utils.py b/data_juicer/utils/unittest_utils.py index 6226aa7339..f4cb6a608f 100644 --- a/data_juicer/utils/unittest_utils.py +++ b/data_juicer/utils/unittest_utils.py @@ -83,22 +83,6 @@ def setUpClass(cls): # clear models in memory free_models() - # start ray - current_tag = getattr(cls, "current_tag", "standalone") - if current_tag.startswith("ray"): - ray = LazyLoader("ray") - if not ray.is_initialized(): - logger.info(f">>>>>>>>>>>>>>>>>>>> [Init Ray]: dj_dist_unittest_{cls.__name__}") - ray.init( - "auto", - ignore_reinit_error=True, - namespace=f"dj_dist_unittest_{cls.__name__}", - ) - - # erase existing resources - cls._cleanup_ray_data_state() - gc.collect() - @classmethod def tearDownClass(cls, hf_model_name=None) -> None: import multiprocess @@ -120,11 +104,6 @@ def tearDownClass(cls, hf_model_name=None) -> None: logger.info("CLEAN all TRANSFORMERS_CACHE") shutil.rmtree(transformers.TRANSFORMERS_CACHE) - current_tag = getattr(cls, "current_tag", "standalone") - if current_tag.startswith("ray"): - cls._cleanup_ray_data_state() - gc.collect() - @classmethod def _cleanup_ray_data_state(cls): """clean up the global states of Ray Data""" @@ -136,6 +115,9 @@ def _cleanup_ray_data_state(cls): if hasattr(ray.data._internal.execution.streaming_executor, "_execution_context"): ray.data._internal.execution.streaming_executor._execution_context = None + # trigger gc.collect() on all workers in the cluster + ray._private.internal_api.global_gc() + # clean up stats manager from ray.data._internal.stats import StatsManager @@ -148,10 +130,31 @@ def _cleanup_ray_data_state(cls): def setUp(self): logger.info(f">>>>>>>>>> [Start Test]: {self.id()}") + # start ray + current_tag = getattr(self, "current_tag", "standalone") + if current_tag.startswith("ray"): + ray = LazyLoader("ray") + if not ray.is_initialized(): + logger.info(f">>>>>>>>>>>>>>>>>>>> [Init Ray]: dj_dist_unittest_{self.id()}") + ray.init( + "auto", + ignore_reinit_error=True, + namespace=f"dj_dist_unittest_{self.id()}", + ) + + # erase existing resources + self._cleanup_ray_data_state() + gc.collect() + def tearDown(self) -> None: # clear models in memory free_models() + current_tag = getattr(self, "current_tag", "standalone") + if current_tag.startswith("ray"): + self._cleanup_ray_data_state() + gc.collect() + def generate_dataset(self, data) -> DJDataset: """Generate dataset for a specific executor. diff --git a/tests/analysis/test_column_wise_analysis.py b/tests/analysis/test_column_wise_analysis.py index 701cadd8dc..a2114b1e6f 100644 --- a/tests/analysis/test_column_wise_analysis.py +++ b/tests/analysis/test_column_wise_analysis.py @@ -103,6 +103,8 @@ def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() + def test_init(self): column_wise_analysis = ColumnWiseAnalysis( self.dataset_3_sample, self.temp_output_path) diff --git a/tests/analysis/test_correlation_analysis.py b/tests/analysis/test_correlation_analysis.py index 837573e68e..a7a7524cff 100644 --- a/tests/analysis/test_correlation_analysis.py +++ b/tests/analysis/test_correlation_analysis.py @@ -32,6 +32,8 @@ def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() + def test_is_numeric_list_series(self): res = { 'A': False, diff --git a/tests/analysis/test_diversity_analysis.py b/tests/analysis/test_diversity_analysis.py index c7cb810873..e58daf9a5a 100644 --- a/tests/analysis/test_diversity_analysis.py +++ b/tests/analysis/test_diversity_analysis.py @@ -130,6 +130,8 @@ def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() + def test_analyze(self): diversity_analysis = DiversityAnalysis(self.test_data_en, self.temp_output_path) df_en = diversity_analysis.analyze() diff --git a/tests/analysis/test_measure.py b/tests/analysis/test_measure.py index c175f30439..57e4efa3f2 100644 --- a/tests/analysis/test_measure.py +++ b/tests/analysis/test_measure.py @@ -22,6 +22,8 @@ def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() + def test_convert_to_tensor(self): measure = Measure() diff --git a/tests/analysis/test_overall_analysis.py b/tests/analysis/test_overall_analysis.py index c9544aff98..c0dbac445f 100644 --- a/tests/analysis/test_overall_analysis.py +++ b/tests/analysis/test_overall_analysis.py @@ -71,6 +71,8 @@ def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() + def test_single_column_analysis(self): df = self.dataset.flatten().to_pandas() res = _single_column_analysis(df[f'{Fields.stats}.stats_num']) diff --git a/tests/core/data/test_load_strategy.py b/tests/core/data/test_load_strategy.py index 667824e909..5b22ab1687 100644 --- a/tests/core/data/test_load_strategy.py +++ b/tests/core/data/test_load_strategy.py @@ -277,6 +277,8 @@ def tearDown(self): if osp.exists(self.tmp_dir): shutil.rmtree(self.tmp_dir) + super().tearDown() + @TEST_TAG('ray') def test_absolute_path_resolution(self): diff --git a/tests/core/data/test_ray_dataset.py b/tests/core/data/test_ray_dataset.py index ba6d7653c5..00151c2ca5 100644 --- a/tests/core/data/test_ray_dataset.py +++ b/tests/core/data/test_ray_dataset.py @@ -36,6 +36,7 @@ def setUp(self): def tearDown(self): """Clean up test data""" self.dataset = None + super().tearDown() @TEST_TAG('ray') def test_get_column_basic(self): diff --git a/tests/core/test_monitor.py b/tests/core/test_monitor.py index 38b4e77c31..3291b8f26f 100644 --- a/tests/core/test_monitor.py +++ b/tests/core/test_monitor.py @@ -15,6 +15,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.work_dir): os.system(f'rm -rf {self.work_dir}') + super().tearDown() def test_monitor_current_resources(self): resource_dict = Monitor.monitor_current_resources() diff --git a/tests/core/test_ray_exporter.py b/tests/core/test_ray_exporter.py index 2c591240c8..fdfb6189b7 100644 --- a/tests/core/test_ray_exporter.py +++ b/tests/core/test_ray_exporter.py @@ -36,6 +36,8 @@ def tearDown(self): if osp.exists(self.tmp_dir): shutil.rmtree(self.tmp_dir) + super().tearDown() + def _pop_raw_data_keys(self, keys): res = copy.deepcopy(self.data) for d_i in res: diff --git a/tests/core/test_tracer.py b/tests/core/test_tracer.py index 5c365d02f8..7bd9a51e57 100644 --- a/tests/core/test_tracer.py +++ b/tests/core/test_tracer.py @@ -15,6 +15,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.work_dir): os.system(f'rm -rf {self.work_dir}') + super().tearDown() def test_trace_mapper(self): prev_ds = Dataset.from_list([ diff --git a/tests/download/test_download.py b/tests/download/test_download.py index 33cc0bf37f..d3e9a15e90 100644 --- a/tests/download/test_download.py +++ b/tests/download/test_download.py @@ -21,6 +21,7 @@ def tearDown(self): # Clean up the temporary directory after each test if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) + super().tearDown() def test_wikipedia_urls(self): dump_date = "20241101" diff --git a/tests/format/test_load_formatter.py b/tests/format/test_load_formatter.py index 204c72c781..995a036ba8 100644 --- a/tests/format/test_load_formatter.py +++ b/tests/format/test_load_formatter.py @@ -46,6 +46,8 @@ def tearDown(self): if os.path.exists(self._temp_dir): shutil.rmtree(self._temp_dir) + super().tearDown() + def test_load_formatter_with_json_file(self): """Test loading a JSONL file directly""" formatter = load_formatter(self._json_file) diff --git a/tests/ops/mapper/test_download_file_mapper.py b/tests/ops/mapper/test_download_file_mapper.py index 554b1419e6..abdd915cd4 100644 --- a/tests/ops/mapper/test_download_file_mapper.py +++ b/tests/ops/mapper/test_download_file_mapper.py @@ -46,6 +46,8 @@ def tearDown(self): self.httpd.server_close() shutil.rmtree(self.temp_dir) + super().tearDown() + def _test_image_download(self, ds_list, save_field=None): op = DownloadFileMapper( save_dir=self.temp_dir, diff --git a/tests/utils/test_asset_utils.py b/tests/utils/test_asset_utils.py index ad22bc3b75..55fed5bf44 100644 --- a/tests/utils/test_asset_utils.py +++ b/tests/utils/test_asset_utils.py @@ -15,6 +15,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() def test_basic_func(self): # download assets from the remote server diff --git a/tests/utils/test_ckpt_utils.py b/tests/utils/test_ckpt_utils.py index 1415304749..56a5a0191b 100644 --- a/tests/utils/test_ckpt_utils.py +++ b/tests/utils/test_ckpt_utils.py @@ -15,6 +15,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() def test_basic_func(self): ckpt_path = os.path.join(self.temp_output_path, 'ckpt_1') diff --git a/tests/utils/test_compress.py b/tests/utils/test_compress.py index cba98d273e..f6d1b7bbc5 100644 --- a/tests/utils/test_compress.py +++ b/tests/utils/test_compress.py @@ -26,6 +26,8 @@ def tearDown(self): os.system(f'rm -rf {self.temp_output_path}') config.HF_DATASETS_CACHE = self.ori_cache_dir + super().tearDown() + def test_basic_func(self): cache_utils.CACHE_COMPRESS = 'zstd' ds = load_dataset('json', data_files=self.test_data_path, split='train') diff --git a/tests/utils/test_file_utils.py b/tests/utils/test_file_utils.py index 07ba7598e4..721870ba2c 100644 --- a/tests/utils/test_file_utils.py +++ b/tests/utils/test_file_utils.py @@ -21,6 +21,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() def test_find_files_with_suffix(self): # prepare test files diff --git a/tests/utils/test_logger_utils.py b/tests/utils/test_logger_utils.py index 7ce5a7c1dd..7c76c4cb2a 100644 --- a/tests/utils/test_logger_utils.py +++ b/tests/utils/test_logger_utils.py @@ -20,6 +20,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() def get_log_messages(self, content): lines = content.strip().split('\n') diff --git a/tests/utils/test_mm_utils.py b/tests/utils/test_mm_utils.py index 7d86c1bbeb..24ad85017c 100644 --- a/tests/utils/test_mm_utils.py +++ b/tests/utils/test_mm_utils.py @@ -41,6 +41,7 @@ def setUp(self) -> None: def tearDown(self): if os.path.exists(self.temp_output_path): os.system(f'rm -rf {self.temp_output_path}') + super().tearDown() def test_special_tokens(self): self.assertEqual( diff --git a/tests/utils/test_process_utils.py b/tests/utils/test_process_utils.py index 2197837b6a..c3757dc6fb 100644 --- a/tests/utils/test_process_utils.py +++ b/tests/utils/test_process_utils.py @@ -41,7 +41,7 @@ def setUp(self): def tearDown(self): os.environ[RAY_JOB_ENV_VAR] = self._ori_ray_job_env_value - return super().tearDown() + super().tearDown() def enable_ray_mode(self): os.environ[RAY_JOB_ENV_VAR] = '1' From 04e9efa4eb0f141b8b9a2150ca14edb5533a9257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=83=88=E9=9C=96?= Date: Thu, 9 Oct 2025 14:58:14 +0800 Subject: [PATCH 12/12] * remove skipping tags of some dist test cases for avoiding oom --- tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py | 1 - tests/ops/deduplicator/test_ray_document_deduplicator.py | 1 - tests/ops/deduplicator/test_ray_image_deduplicator.py | 1 - tests/ops/deduplicator/test_ray_video_deduplicator.py | 1 - 4 files changed, 4 deletions(-) diff --git a/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py b/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py index b0391e1b4c..b19bc8db0e 100644 --- a/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_bts_minhash_deduplicator.py @@ -7,7 +7,6 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG -@unittest.skip('avoid oom') class RayBTSMinhashDeduplicatorTest(DataJuicerTestCaseBase): def _run_minhash_dedup(self, dataset: Dataset, target_list, op): diff --git a/tests/ops/deduplicator/test_ray_document_deduplicator.py b/tests/ops/deduplicator/test_ray_document_deduplicator.py index 9a3cfeb67e..e8bf23183a 100644 --- a/tests/ops/deduplicator/test_ray_document_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_document_deduplicator.py @@ -7,7 +7,6 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG -@unittest.skip('avoid oom') class RayDocumentDeduplicatorTest(DataJuicerTestCaseBase): def _run_doc_dedup(self, dataset: Dataset, target_list, op): diff --git a/tests/ops/deduplicator/test_ray_image_deduplicator.py b/tests/ops/deduplicator/test_ray_image_deduplicator.py index 64d91cb248..0495178458 100644 --- a/tests/ops/deduplicator/test_ray_image_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_image_deduplicator.py @@ -9,7 +9,6 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG -@unittest.skip('avoid oom') class RayImageDeduplicatorTest(DataJuicerTestCaseBase): data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', diff --git a/tests/ops/deduplicator/test_ray_video_deduplicator.py b/tests/ops/deduplicator/test_ray_video_deduplicator.py index aae447334b..5bef4431a1 100644 --- a/tests/ops/deduplicator/test_ray_video_deduplicator.py +++ b/tests/ops/deduplicator/test_ray_video_deduplicator.py @@ -9,7 +9,6 @@ from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG -@unittest.skip('avoid oom') class RayVideoDeduplicatorTest(DataJuicerTestCaseBase): data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..',