Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ def __init__(
self._routing_stats: Dict[str, Any] = {}
self._record_routing_stats_ref: Optional[ObjectRef] = None
self._last_record_routing_stats_time: float = 0.0
self._has_user_routing_stats_method: bool = False
self._ingress: bool = False

# Outbound deployments polling state
Expand Down Expand Up @@ -914,6 +915,7 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
self._rank,
self._route_patterns,
self._outbound_deployments,
self._has_user_routing_stats_method,
) = ray.get(self._ready_obj_ref)
except RayTaskError as e:
logger.exception(
Expand Down Expand Up @@ -1075,14 +1077,22 @@ def _should_record_routing_stats(self) -> bool:
"""Determines if a new record routing stats should be kicked off.

A record routing stats will be started if:
1) There is not already an active record routing stats.
2) It has been more than request_routing_stats_period_s since
1) The user has defined a record_routing_stats method on their
deployment class. If not, skip entirely to avoid unnecessary
remote calls that would just return empty dicts.
2) There is not already an active record routing stats.
3) It has been more than request_routing_stats_period_s since
the previous record routing stats was *started*.

This assumes that self._record_routing_stats_ref is reset to `None`
when an active record routing stats succeeds or fails (due to
returning or timeout).
"""
if not self._has_user_routing_stats_method:
# The user hasn't defined a record_routing_stats method, so
# there's no point in making remote calls to collect stats.
return False

if self._record_routing_stats_ref is not None:
# There's already an active record routing stats.
return False
Expand Down
12 changes: 12 additions & 0 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async def gen_wrapper(
ReplicaRank, # rank
Optional[List[str]], # route_patterns
Optional[List[DeploymentID]], # outbound_deployments
bool, # has_user_routing_stats_method
]


Expand Down Expand Up @@ -1073,6 +1074,11 @@ def get_metadata(self) -> ReplicaMetadata:
if hasattr(self._user_callable_asgi_app, "routes"):
route_patterns = extract_route_patterns(self._user_callable_asgi_app)

has_user_routing_stats_method = (
self._user_callable_wrapper is not None
and self._user_callable_wrapper.has_user_routing_stats_method
)
Comment on lines +1077 to +1080
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

self._user_callable_wrapper is initialized in ReplicaBase.__init__ and is not expected to be None at this point. The is not None check is redundant and can be removed to simplify the code.

        has_user_routing_stats_method = (
            self._user_callable_wrapper.has_user_routing_stats_method
        )


return (
self._version.deployment_config,
self._version,
Expand All @@ -1084,6 +1090,7 @@ def get_metadata(self) -> ReplicaMetadata:
current_rank,
route_patterns,
self.list_outbound_deployments(),
has_user_routing_stats_method,
)

def get_dynamically_created_handles(self) -> Set[DeploymentID]:
Expand Down Expand Up @@ -3019,6 +3026,11 @@ def call_user_health_check(self) -> Optional[concurrent.futures.Future]:

return None

@property
def has_user_routing_stats_method(self) -> bool:
"""Whether the user has defined a record_routing_stats method."""
return self._user_record_routing_stats is not None

def call_user_record_routing_stats(self) -> Optional[concurrent.futures.Future]:
self._raise_if_not_initialized("call_user_record_routing_stats")

Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __call__(self, *args):
replica_version_hash = None
for replica in deployment_dict[id]:
ref = replica.get_actor_handle().initialize_and_get_metadata.remote()
_, version, _, _, _, _, _, _, _, _ = ray.get(ref)
_, version, _, _, _, _, _, _, _, _, _ = ray.get(ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This large tuple unpacking is fragile. If ReplicaMetadata is refactored to a dataclass as suggested in python/ray/serve/_private/replica.py, this can be simplified to be more readable and robust.

        metadata = ray.get(ref)
        version = metadata.version

if replica_version_hash is None:
replica_version_hash = hash(version)
assert replica_version_hash == hash(version), (
Expand Down Expand Up @@ -118,7 +118,7 @@ def __call__(self, *args):
for replica_name in recovered_replica_names:
actor_handle = ray.get_actor(replica_name, namespace=SERVE_NAMESPACE)
ref = actor_handle.initialize_and_get_metadata.remote()
_, version, _, _, _, _, _, _, _, _ = ray.get(ref)
_, version, _, _, _, _, _, _, _, _, _ = ray.get(ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This large tuple unpacking is fragile. If ReplicaMetadata is refactored to a dataclass as suggested in python/ray/serve/_private/replica.py, this can be simplified to be more readable and robust.

        metadata = ray.get(ref)
        version = metadata.version

assert replica_version_hash == hash(
version
), "Replica version hash should be the same after recover from actor names"
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,9 @@ class Model:
def __call__(self):
return "hello"

async def record_routing_stats(self):
return {}

serve.run(Model.bind(), name="app")
timeseries = PrometheusTimeseries()

Expand Down