3737 RunningReplicaInfo ,
3838)
3939from ray .serve ._private .config import DeploymentConfig
40- from ray .serve .config import GangRuntimeFailurePolicy
4140from ray .serve ._private .constants import (
4241 DEFAULT_LATENCY_BUCKET_MS ,
4342 MAX_PER_REPLICA_RETRY_COUNT ,
7271 msgpack_serialize ,
7372)
7473from ray .serve ._private .version import DeploymentVersion
74+ from ray .serve .config import GangRuntimeFailurePolicy
7575from ray .serve .generated .serve_pb2 import DeploymentLanguage
7676from ray .serve .schema import (
7777 DeploymentDetails ,
@@ -2486,7 +2486,11 @@ def get_num_running_replicas(self, version: DeploymentVersion = None) -> int:
24862486 return self ._replicas .count (states = [ReplicaState .RUNNING ], version = version )
24872487
24882488 def get_gang_config (self ):
2489- return self ._target_state .info .deployment_config .gang_scheduling_config if self ._target_state is not None else None
2489+ return (
2490+ self ._target_state .info .deployment_config .gang_scheduling_config
2491+ if self ._target_state is not None
2492+ else None
2493+ )
24902494
24912495 def get_num_replicas_to_add (self ) -> int :
24922496 """Calculate the number of replicas to be added to reach the target state."""
@@ -2502,7 +2506,11 @@ def get_num_replicas_to_add(self) -> int:
25022506 return max (0 , delta )
25032507
25042508 def get_replica_resource_dict (self ) -> Dict [str , float ]:
2505- return self ._target_state .info .replica_config .resource_dict .copy () if self ._target_state is not None else {}
2509+ return (
2510+ self ._target_state .info .replica_config .resource_dict .copy ()
2511+ if self ._target_state is not None
2512+ else {}
2513+ )
25062514
25072515 def get_active_node_ids (self ) -> Set [str ]:
25082516 """Get the node ids of all running replicas in this deployment.
@@ -2957,7 +2965,9 @@ def _check_and_stop_outdated_version_replicas(self) -> bool:
29572965
29582966 def scale_deployment_replicas (
29592967 self ,
2960- gang_placement_groups : Optional [Dict [DeploymentID , GangPreparationResult ]] = None ,
2968+ gang_placement_groups : Optional [
2969+ Dict [DeploymentID , GangPreparationResult ]
2970+ ] = None ,
29612971 ) -> Tuple [List [ReplicaSchedulingRequest ], DeploymentDownscaleRequest ]:
29622972 """Scale the given deployment to the number of replicas.
29632973
@@ -3020,7 +3030,9 @@ def scale_deployment_replicas(
30203030 assign_rank_callback = self ._rank_manager .assign_rank ,
30213031 )
30223032 upscale .append (scheduling_request )
3023- self ._replicas .add (ReplicaState .STARTING , new_deployment_replica )
3033+ self ._replicas .add (
3034+ ReplicaState .STARTING , new_deployment_replica
3035+ )
30243036
30253037 elif delta_replicas < 0 :
30263038 to_remove = - delta_replicas
@@ -3100,9 +3112,7 @@ def _add_replicas_with_gang_scheduling(
31003112 gang_id = gang_id ,
31013113 rank = bundle_index ,
31023114 world_size = gang_size ,
3103- member_replica_ids = [
3104- r .unique_id for r in member_replica_ids
3105- ],
3115+ member_replica_ids = [r .unique_id for r in member_replica_ids ],
31063116 )
31073117
31083118 new_deployment_replica = DeploymentReplica (
@@ -3425,13 +3435,11 @@ def check_and_update_replicas(self):
34253435 )
34263436 self ._stop_replica (replica , graceful_stop = False )
34273437 if replica .version == self ._target_state .version :
3428- self ._curr_status_info = (
3429- self ._curr_status_info .handle_transition (
3430- trigger = DeploymentStatusInternalTrigger .HEALTH_CHECK_FAILED ,
3431- message = "A replica's health check failed. This "
3432- "deployment will be UNHEALTHY until the replica "
3433- "recovers or a new deploy happens." ,
3434- )
3438+ self ._curr_status_info = self ._curr_status_info .handle_transition (
3439+ trigger = DeploymentStatusInternalTrigger .HEALTH_CHECK_FAILED ,
3440+ message = "A replica's health check failed. This "
3441+ "deployment will be UNHEALTHY until the replica "
3442+ "recovers or a new deploy happens." ,
34353443 )
34363444 else :
34373445 self ._replicas .add (replica .actor_details .state , replica )
@@ -4382,9 +4390,7 @@ def _prepare_gang_placement_groups(
43824390 # Skip if deployment has replicas still stopping. Their resources
43834391 # haven't been released yet, so PG creation would likely fail or
43844392 # block waiting for resources. We'll retry next reconciliation loop.
4385- if deployment_state ._replicas .count (
4386- states = [ReplicaState .STOPPING ]
4387- ) > 0 :
4393+ if deployment_state ._replicas .count (states = [ReplicaState .STOPPING ]) > 0 :
43884394 continue
43894395
43904396 gang_requests [deployment_id ] = GangPlacementGroupRequest (
@@ -4402,7 +4408,6 @@ def _prepare_gang_placement_groups(
44024408
44034409 return self ._deployment_scheduler .schedule_gang_placement_groups (gang_requests )
44044410
4405-
44064411 def record_request_routing_info (self , info : RequestRoutingInfo ) -> None :
44074412 """
44084413 Record request routing information for a replica.
0 commit comments