Skip to content

Commit cc1674d

Browse files
Improve Ray submission logic
1 parent 0e553e0 commit cc1674d

File tree

2 files changed

+21
-19
lines changed

2 files changed

+21
-19
lines changed

src/fmcore/framework/_chain/Chain.py

+1
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ def stop(self, force: bool = True):
847847
## Needs to be done after ChainExecution is defined to avoid circular dependencies.
848848
Chain.run = safe_validate_arguments()(Chain.run)
849849

850+
850851
class FunctionStep(Step):
851852
fn: Any
852853

src/fmcore/framework/_evaluator/RayEvaluator.py

+20-19
Original file line numberDiff line numberDiff line change
@@ -432,16 +432,16 @@ def _kill_actors(self):
432432
@property
433433
def max_num_actors(self) -> int:
434434
cluster_resources: Dict = ray.cluster_resources()
435-
RAY_NUM_CPUS: int = int(cluster_resources["CPU"])
436-
RAY_NUM_GPUS: int = int(cluster_resources.get("GPU", 0))
435+
ray_cluster_num_cpus: int = int(cluster_resources["CPU"])
436+
ray_cluster_num_gpus: int = int(cluster_resources.get("GPU", 0))
437437

438438
max_num_cpu_actors: int = max_num_resource_actors(
439439
self.resources_per_model.num_cpus,
440-
RAY_NUM_CPUS,
440+
ray_cluster_num_cpus,
441441
)
442442
max_num_gpu_actors: Union[int, float] = max_num_resource_actors(
443443
self.resources_per_model.num_gpus,
444-
RAY_NUM_GPUS,
444+
ray_cluster_num_gpus,
445445
)
446446
max_num_actors: int = min(max_num_gpu_actors, max_num_cpu_actors)
447447
return max_num_actors
@@ -457,8 +457,8 @@ def model_num_gpus(self) -> Union[conint(ge=0), confloat(ge=0.0, lt=1.0)]:
457457
@property
458458
def num_actors(self) -> int:
459459
cluster_resources: Dict = ray.cluster_resources()
460-
RAY_NUM_CPUS: int = int(cluster_resources["CPU"])
461-
RAY_NUM_GPUS: int = int(cluster_resources.get("GPU", 0))
460+
ray_cluster_num_cpus: int = int(cluster_resources["CPU"])
461+
ray_cluster_num_gpus: int = int(cluster_resources.get("GPU", 0))
462462

463463
model_num_cpus: Union[conint(ge=1), confloat(ge=0.0, lt=1.0)] = self.model_num_cpus
464464
model_num_gpus: Union[conint(ge=0), confloat(ge=0.0, lt=1.0)] = self.model_num_gpus
@@ -468,14 +468,14 @@ def num_actors(self) -> int:
468468
warnings.warn(
469469
f"`num_models` is not specified. Since each model-copy requires "
470470
f"{model_num_cpus} cpus and {model_num_gpus} gpus, we create {max_num_actors} model-copies so as "
471-
f"to utilize the entire Ray cluster (having {RAY_NUM_CPUS} cpus and {RAY_NUM_GPUS} gpus). "
471+
f"to utilize the entire Ray cluster (having {ray_cluster_num_cpus} cpus and {ray_cluster_num_gpus} gpus). "
472472
f"To reduce the cluster-utilization, explicitly pass `num_models`."
473473
)
474474
num_actors: int = max_num_actors
475475
elif num_actors > max_num_actors:
476476
warnings.warn(
477477
f"Requested {num_actors} model-copies (each with {model_num_cpus} cpus and {model_num_gpus} gpus); "
478-
f"however, the Ray cluster only has {RAY_NUM_CPUS} cpus and {RAY_NUM_GPUS} gpus, "
478+
f"however, the Ray cluster only has {ray_cluster_num_cpus} cpus and {ray_cluster_num_gpus} gpus, "
479479
f"thus we can create at most {max_num_actors} model-copies."
480480
)
481481
num_actors: int = min(num_actors, max_num_actors)
@@ -512,7 +512,8 @@ def _create_nested_evaluator_params(self, **kwargs) -> Dict:
512512
## we don't need to initialize it:
513513
init=False,
514514
init_model=False,
515-
verbosity=0, ## Ensures we do not print anything from the nested evaluator.
515+
## Ensures we do not print anything from the nested evaluator:
516+
verbosity=0,
516517
),
517518
}
518519
).dict()
@@ -548,7 +549,7 @@ def _run_evaluation(
548549
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.LEAST_USED,
549550
read_as: Optional[DataLayout] = DataLayout.PANDAS,
550551
submission_batch_size: Optional[conint(ge=1)] = None,
551-
submission_max_queued: conint(ge=0) = 2,
552+
worker_queue_len: conint(ge=0) = 2,
552553
submission_batch_wait: confloat(ge=0) = 15,
553554
evaluation_timeout: confloat(ge=0, allow_inf_nan=True) = math.inf,
554555
allow_partial_predictions: bool = False,
@@ -644,7 +645,7 @@ def _run_evaluation(
644645
num_cpus=0.1,
645646
max_concurrency=max(
646647
num_actors_created + 2,
647-
submission_max_queued * num_actors_created + 2,
648+
worker_queue_len * num_actors_created + 2,
648649
),
649650
).remote()
650651
dataset_params: Dict = dataset.dict(exclude={"data"})
@@ -681,7 +682,7 @@ def _run_evaluation(
681682
)
682683
submissions_progress_bar.update(1)
683684
## Initialize with number of rows completed so far:
684-
rows_completed: int = ray.get(row_counter.get_rows_completed.remote())
685+
rows_completed: int = get_result(row_counter.get_rows_completed.remote())
685686
rows_completed_progress_bar: ProgressBar = ProgressBar.of(
686687
progress_bar,
687688
total=input_len,
@@ -700,7 +701,7 @@ def _run_evaluation(
700701
unit="batch",
701702
)
702703
## Initialize to zero:
703-
rows_completed: int = ray.get(row_counter.get_rows_completed.remote())
704+
rows_completed: int = 0
704705
rows_completed_progress_bar: ProgressBar = ProgressBar.of(
705706
progress_bar,
706707
total=input_len,
@@ -716,8 +717,8 @@ def _run_evaluation(
716717
fetch_partitions=1,
717718
)
718719
):
719-
## When using DataLoadingStrategy.LOCAL, we can pick which actor to send the data to based on
720-
## the LoadBalancingStrategy.
720+
## When using DataLoadingStrategy.LOCAL, we pick which
721+
## actor to send the data to based on LoadBalancingStrategy.
721722
if load_balancing_strategy is LoadBalancingStrategy.ROUND_ROBIN:
722723
actor_comp: RayActorComposite = self.model[part_i % num_actors_created]
723724
elif load_balancing_strategy is LoadBalancingStrategy.RANDOM:
@@ -728,10 +729,10 @@ def _run_evaluation(
728729
## After that, we will pick the actor with the least load which has most-recently completed
729730
actor_usages: List[Tuple[int, float, str]] = self._get_actor_usages()
730731
min_actor_usage: int = min([actor_usage for actor_usage, _, _ in actor_usages])
731-
while min_actor_usage > submission_max_queued:
732+
while min_actor_usage > worker_queue_len:
732733
debug_logger(
733734
f"Actor usages:\n{actor_usages}\n"
734-
f"(All are above submission_least_used_threshold={submission_max_queued}, "
735+
f"(All are above submission_least_used_threshold={worker_queue_len}, "
735736
f"waiting for {submission_batch_wait} seconds)."
736737
)
737738
time.sleep(submission_batch_wait)
@@ -793,7 +794,7 @@ def _run_evaluation(
793794
)
794795
submissions_progress_bar.update(1)
795796
## Track progress while submitting, since submitting can take upto an hour:
796-
new_rows_completed: int = ray.get(row_counter.get_rows_completed.remote())
797+
new_rows_completed: int = get_result(row_counter.get_rows_completed.remote())
797798
rows_completed_progress_bar.update(new_rows_completed - rows_completed)
798799
rows_completed: int = new_rows_completed
799800
else:
@@ -806,7 +807,7 @@ def _run_evaluation(
806807
and time.time() < rows_completed_start_time + evaluation_timeout
807808
):
808809
time.sleep(self.progress_update_frequency)
809-
new_rows_completed: int = ray.get(row_counter.get_rows_completed.remote())
810+
new_rows_completed: int = get_result(row_counter.get_rows_completed.remote())
810811
rows_completed_progress_bar.update(new_rows_completed - rows_completed)
811812
rows_completed: int = new_rows_completed
812813
rows_completed_progress_bar.success(f"Evaluated {input_len_str} rows")

0 commit comments

Comments
 (0)