Skip to content

Commit 7f61ca5

Browse files
JasonW404jason
andauthored
feat: add parallel file copy for ratio task execution (#475)
* feat: add parallel file copy for ratio task execution - Add dynamic concurrency calculation based on CPU and memory resources - Refactor handle_ratio_relations to use asyncio.Semaphore for parallel execution - Use hard links for file copying (fallback to symlink/copy2) - Add resource_utils.py for system resource detection - Add 8 new config parameters for concurrency tuning (all-flash defaults) - Add psutil dependency for accurate resource detection Performance improvement: concurrent file copying significantly reduces execution time for ratio tasks with large files, especially on all-flash storage systems. * fix: update psutil version constraint to allow v7.x The poetry.lock file contains psutil 7.2.2 which was outside the previous constraint >=6.0.0,<7.0.0. This mismatch caused CI build failure with 'pyproject.toml changed significantly' error. Update constraint to >=6.0.0,<8.0.0 to match existing lock file. * chore: update poetry.lock for psutil dependency * fix: regenerate poetry.lock with poetry 2.2.1 for CI compatibility --------- Co-authored-by: jason <jason@example.com>
1 parent 864c69d commit 7f61ca5

5 files changed

Lines changed: 339 additions & 58 deletions

File tree

runtime/datamate-python/app/core/config.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,29 @@ def build_database_url(self):
8484
# 文件存储配置(共享文件系统)
8585
file_storage_path: str = "/data/files"
8686

87+
# ==================== 配比任务并行复制配置 ====================
88+
# 动态并发计算参数(全闪存储高性能场景默认值)
89+
90+
# 并发下限(最少并发数)
91+
ratio_copy_min_concurrent: int = 8
92+
93+
# 并发上限(最多并发数,防止资源耗尽)
94+
ratio_copy_max_concurrent: int = 128
95+
96+
# CPU核心系数(每个核心贡献的并发数,全闪存储建议4.0)
97+
ratio_copy_cpu_factor: float = 4.0
98+
99+
# 每并发任务预估内存占用(MB)
100+
ratio_copy_memory_per_task_mb: int = 32
101+
102+
# 内存安全保留比例(保留给其他进程)
103+
ratio_copy_memory_reserve_ratio: float = 0.2
104+
105+
# 是否启用动态计算(False则使用固定值)
106+
ratio_copy_dynamic_concurrent: bool = True
107+
108+
# 固定并发数(当 dynamic_concurrent=False 时使用)
109+
ratio_copy_fixed_concurrent: int = 10
110+
87111
# 全局设置实例
88112
settings = Settings()

runtime/datamate-python/app/module/ratio/service/ratio_task.py

Lines changed: 129 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import List, Optional, Dict, Any
2+
from typing import List, Optional, Dict, Any, Tuple
33
import random
44
import json
55
import os
@@ -10,6 +10,7 @@
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111

1212
from app.core.logging import get_logger
13+
from app.core.config import settings
1314
from app.db.models.base_entity import LineageNode, LineageEdge
1415
from app.db.models.ratio_task import RatioInstance, RatioRelation
1516
from app.db.models import Dataset, DatasetFiles
@@ -18,6 +19,7 @@
1819
from app.module.shared.common.lineage import LineageService
1920
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
2021
from app.module.ratio.schema.ratio_task import FilterCondition
22+
from app.module.shared.util.resource_utils import get_concurrent_for_ratio_copy, get_system_resource_info
2123

2224
logger = get_logger(__name__)
2325

@@ -78,66 +80,55 @@ async def create_task(
7880

7981
@staticmethod
8082
async def execute_dataset_ratio_task(instance_id: str) -> None:
81-
"""Execute a ratio task in background.
82-
83-
Supported ratio_method:
84-
- DATASET: randomly select counts files from each source dataset
85-
- TAG: randomly select counts files matching relation.filter_conditions tags
86-
87-
Steps:
88-
- Mark instance RUNNING
89-
- For each relation: fetch ACTIVE files, optionally filter by tags
90-
- Copy selected files into target dataset
91-
- Update dataset statistics and mark instance SUCCESS/FAILED
92-
"""
93-
async with AsyncSessionLocal() as session: # type: AsyncSession
83+
async with AsyncSessionLocal() as session:
9484
try:
95-
# Load instance and relations
9685
inst_res = await session.execute(select(RatioInstance).where(RatioInstance.id == instance_id))
9786
instance: Optional[RatioInstance] = inst_res.scalar_one_or_none()
9887
if not instance:
9988
logger.error(f"Ratio instance not found: {instance_id}")
10089
return
101-
logger.info(f"start execute ratio task: {instance_id}")
90+
91+
logger.info(f"Starting ratio task {instance_id}")
92+
logger.info(f"System resources: {get_system_resource_info()}")
10293

10394
rel_res = await session.execute(
10495
select(RatioRelation).where(RatioRelation.ratio_instance_id == instance_id)
10596
)
10697
relations: List[RatioRelation] = list(rel_res.scalars().all())
10798

108-
# Mark running
10999
instance.status = TaskStatus.RUNNING.name
110100

111-
# Load target dataset
112101
ds_res = await session.execute(select(Dataset).where(Dataset.id == instance.target_dataset_id))
113102
target_ds: Optional[Dataset] = ds_res.scalar_one_or_none()
114103
if not target_ds:
115104
logger.error(f"Target dataset not found for instance {instance_id}")
116105
instance.status = TaskStatus.FAILED.name
117106
return
118107

119-
added_count, added_size = await RatioTaskService.handle_ratio_relations(relations,session, target_ds)
108+
max_concurrent = get_concurrent_for_ratio_copy(settings)
109+
logger.info(f"Using {max_concurrent} concurrent workers for ratio task {instance_id}")
120110

121-
# Update target dataset statistics
122-
target_ds.file_count = (target_ds.file_count or 0) + added_count # type: ignore
123-
target_ds.size_bytes = (target_ds.size_bytes or 0) + added_size # type: ignore
124-
# If target dataset has files, mark it ACTIVE
125-
if (target_ds.file_count or 0) > 0: # type: ignore
111+
added_count, added_size = await RatioTaskService.handle_ratio_relations_parallel(
112+
relations, session, target_ds, max_concurrent
113+
)
114+
115+
target_ds.file_count = (target_ds.file_count or 0) + added_count
116+
target_ds.size_bytes = (target_ds.size_bytes or 0) + added_size
117+
if (target_ds.file_count or 0) > 0:
126118
target_ds.status = "ACTIVE"
127119

128-
# Done
129120
instance.status = TaskStatus.COMPLETED.name
130-
logger.info(f"Dataset ratio execution completed: instance={instance_id}, files={added_count}, size={added_size}, {instance.status}")
121+
logger.info(f"Ratio task completed: {instance_id}, files={added_count}, size={added_size}")
122+
131123
await RatioTaskService._add_task_to_graph(
132124
session=session,
133125
src_relations=relations,
134126
task=instance,
135127
dst_dataset=target_ds,
136128
)
137129
except Exception as e:
138-
logger.exception(f"Dataset ratio execution failed for {instance_id}: {e}")
130+
logger.exception(f"Ratio task failed for {instance_id}: {e}")
139131
try:
140-
# Try mark failed
141132
inst_res = await session.execute(select(RatioInstance).where(RatioInstance.id == instance_id))
142133
instance = inst_res.scalar_one_or_none()
143134
if instance:
@@ -147,6 +138,116 @@ async def execute_dataset_ratio_task(instance_id: str) -> None:
147138
finally:
148139
await session.commit()
149140

141+
@staticmethod
142+
async def handle_ratio_relations_parallel(
143+
relations: list[RatioRelation],
144+
session: AsyncSession,
145+
target_ds: Dataset,
146+
max_concurrent: int = 10
147+
) -> Tuple[int, int]:
148+
existing_path_rows = await session.execute(
149+
select(DatasetFiles.file_path).where(DatasetFiles.dataset_id == target_ds.id)
150+
)
151+
existing_paths = set(p for p in existing_path_rows.scalars().all() if p)
152+
source_paths = set()
153+
154+
all_copy_tasks: List[Tuple[DatasetFiles, str, str]] = []
155+
156+
for rel in relations:
157+
if not rel.source_dataset_id or not rel.counts or rel.counts <= 0:
158+
continue
159+
160+
files = await RatioTaskService.get_files(rel, session)
161+
if not files:
162+
continue
163+
164+
pick_n = min(rel.counts or 0, len(files))
165+
chosen = random.sample(files, pick_n) if pick_n < len(files) else files
166+
167+
for file in chosen:
168+
if file.file_path in source_paths:
169+
continue
170+
171+
dst_prefix = f"/dataset/{target_ds.id}/"
172+
file_name = RatioTaskService.get_new_file_name(dst_prefix, existing_paths, file)
173+
new_path = dst_prefix + file_name
174+
175+
file_record = DatasetFiles(
176+
dataset_id=target_ds.id,
177+
file_name=file_name,
178+
file_path=new_path,
179+
file_type=file.file_type,
180+
file_size=file.file_size,
181+
check_sum=file.check_sum,
182+
tags=file.tags,
183+
tags_updated_at=datetime.now(),
184+
dataset_filemetadata=file.dataset_filemetadata,
185+
status="ACTIVE",
186+
)
187+
188+
all_copy_tasks.append((file_record, file.file_path, new_path))
189+
existing_paths.add(new_path)
190+
source_paths.add(file.file_path)
191+
192+
if not all_copy_tasks:
193+
return 0, 0
194+
195+
dst_dir = f"/dataset/{target_ds.id}/"
196+
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)
197+
198+
semaphore = asyncio.Semaphore(max_concurrent)
199+
successful_records: List[DatasetFiles] = []
200+
added_count = 0
201+
added_size = 0
202+
203+
async def copy_with_semaphore(
204+
file_record: DatasetFiles,
205+
src_path: str,
206+
dst_path: str
207+
) -> Tuple[bool, DatasetFiles]:
208+
async with semaphore:
209+
try:
210+
file_dst_dir = os.path.dirname(dst_path)
211+
if file_dst_dir != dst_dir:
212+
await asyncio.to_thread(os.makedirs, file_dst_dir, exist_ok=True)
213+
214+
try:
215+
await asyncio.to_thread(os.link, src_path, dst_path)
216+
except OSError:
217+
try:
218+
await asyncio.to_thread(os.symlink, src_path, dst_path)
219+
except OSError:
220+
await asyncio.to_thread(shutil.copy2, src_path, dst_path)
221+
222+
return True, file_record
223+
except Exception as e:
224+
logger.error(f"Copy failed: {src_path} -> {dst_path}: {e}")
225+
return False, file_record
226+
227+
tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks]
228+
results = await asyncio.gather(*tasks, return_exceptions=True)
229+
230+
for i, result in enumerate(results):
231+
if isinstance(result, Exception):
232+
logger.error(f"Copy task {i} raised exception: {result}")
233+
continue
234+
success, file_record = result
235+
if success:
236+
added_count += 1
237+
added_size += int(file_record.file_size or 0)
238+
successful_records.append(file_record)
239+
240+
if successful_records:
241+
session.add_all(successful_records)
242+
await session.flush()
243+
244+
logger.info(
245+
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
246+
f"{added_size} bytes, {len(results) - added_count} failures"
247+
)
248+
249+
return added_count, added_size
250+
150251
@staticmethod
151252
async def handle_ratio_relations(relations: list[RatioRelation], session, target_ds: Dataset) -> tuple[int, int]:
152253
# Preload existing target file paths for deduplication

0 commit comments

Comments
 (0)