1616from ray .serve ._private .test_utils import (
1717 Counter ,
1818 check_num_replicas_eq ,
19- get_application_url ,
2019 get_deployment_details ,
20+ request_with_retries ,
2121 tlog ,
2222)
2323
2424
25- def request_with_retries (endpoint , timeout = 30 , app_name = SERVE_DEFAULT_APP_NAME ):
26- start = time .time ()
27- while True :
28- try :
29- return httpx .get (
30- get_application_url ("HTTP" , app_name = app_name ) + endpoint ,
31- timeout = timeout ,
32- )
33- except (httpx .RequestError , IndexError ):
34- if time .time () - start > timeout :
35- raise TimeoutError
36- time .sleep (0.1 )
37-
38-
3925@pytest .mark .skip (reason = "Consistently failing." )
4026def test_controller_failure (serve_instance ):
4127 @serve .deployment (name = "controller_failure" )
@@ -44,16 +30,16 @@ def function(_):
4430
4531 serve .run (function .bind ())
4632
47- assert request_with_retries ("/controller_failure/" , timeout = 1 ).text == "hello1"
33+ assert request_with_retries (timeout = 1 ).text == "hello1"
4834
4935 for _ in range (10 ):
50- response = request_with_retries ("/controller_failure/" , timeout = 30 )
36+ response = request_with_retries (timeout = 30 )
5137 assert response .text == "hello1"
5238
5339 ray .kill (serve .context ._global_client ._controller , no_restart = False )
5440
5541 for _ in range (10 ):
56- response = request_with_retries ("/controller_failure/" , timeout = 30 )
42+ response = request_with_retries (timeout = 30 )
5743 assert response .text == "hello1"
5844
5945 def function2 (_ ):
@@ -64,7 +50,7 @@ def function2(_):
6450 serve .run (function .options (func_or_class = function2 ).bind ())
6551
6652 def check_controller_failure ():
67- response = request_with_retries ("/controller_failure/" , timeout = 30 )
53+ response = request_with_retries (timeout = 30 )
6854 return response .text == "hello2"
6955
7056 wait_for_condition (check_controller_failure )
@@ -78,50 +64,12 @@ def function3(_):
7864 ray .kill (serve .context ._global_client ._controller , no_restart = False )
7965
8066 for _ in range (10 ):
81- response = request_with_retries ("/controller_failure/" , timeout = 30 )
67+ response = request_with_retries (timeout = 30 )
8268 assert response .text == "hello2"
83- response = request_with_retries ("/controller_failure_2/" , timeout = 30 )
69+ response = request_with_retries (timeout = 30 )
8470 assert response .text == "hello3"
8571
8672
87- def _kill_http_proxies ():
88- http_proxies = ray .get (
89- serve .context ._global_client ._controller .get_proxies .remote ()
90- )
91- for http_proxy in http_proxies .values ():
92- ray .kill (http_proxy , no_restart = False )
93-
94-
95- def test_http_proxy_failure (serve_instance ):
96- @serve .deployment (name = "proxy_failure" )
97- def function (_ ):
98- return "hello1"
99-
100- serve .run (function .bind ())
101-
102- assert request_with_retries ("/proxy_failure/" , timeout = 1.0 ).text == "hello1"
103-
104- for _ in range (10 ):
105- response = request_with_retries ("/proxy_failure/" , timeout = 30 )
106- assert response .text == "hello1"
107-
108- _kill_http_proxies ()
109-
110- def function2 (_ ):
111- return "hello2"
112-
113- serve .run (function .options (func_or_class = function2 ).bind ())
114-
115- def check_new ():
116- for _ in range (10 ):
117- response = request_with_retries ("/proxy_failure/" , timeout = 30 )
118- if response .text != "hello2" :
119- return False
120- return True
121-
122- wait_for_condition (check_new )
123-
124-
12573def _get_worker_handles (deployment_name : str , app_name : str = SERVE_DEFAULT_APP_NAME ):
12674 id = DeploymentID (name = deployment_name , app_name = app_name )
12775 controller = serve .context ._global_client ._controller
@@ -141,7 +89,7 @@ def __call__(self, *args):
14189 serve .run (Worker1 .bind ())
14290
14391 # Get the PID of the worker.
144- old_pid = request_with_retries ("/worker_failure/" , timeout = 1 ).text
92+ old_pid = request_with_retries (timeout = 1 ).text
14593
14694 # Kill the worker.
14795 handles = _get_worker_handles ("worker_failure" )
@@ -151,7 +99,7 @@ def __call__(self, *args):
15199 # Wait until the worker is killed and a one is started.
152100 start = time .time ()
153101 while time .time () - start < 30 :
154- response = request_with_retries ("/worker_failure/" , timeout = 30 )
102+ response = request_with_retries (timeout = 30 )
155103 if response .text != old_pid :
156104 break
157105 else :
@@ -192,7 +140,7 @@ def __call__(self, *args):
192140 start = time .time ()
193141 while time .time () - start < 30 :
194142 time .sleep (0.1 )
195- response = request_with_retries ("/replica_failure/" , timeout = 1 ).text
143+ response = request_with_retries (timeout = 1 ).text
196144 assert response in ["1" , "2" ]
197145 responses .add (response )
198146 if len (responses ) > 1 :
@@ -211,7 +159,7 @@ def __call__(self, *args):
211159 try :
212160 # The timeout needs to be small here because the request to
213161 # the restarting worker will hang.
214- request_with_retries ("/replica_failure/" , timeout = 0.1 )
162+ request_with_retries (timeout = 0.1 )
215163 break
216164 except TimeoutError :
217165 time .sleep (0.1 )
0 commit comments