|
18 | 18 | sleep, |
19 | 19 | ) |
20 | 20 | from anyio.from_thread import BlockingPortal |
21 | | -from grpc.aio import Channel |
| 21 | +from grpc.aio import AioRpcError, Channel |
22 | 22 | from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc |
23 | 23 |
|
24 | 24 | from .exceptions import LeaseError |
25 | 25 | from jumpstarter.client import client_from_path |
26 | 26 | from jumpstarter.client.grpc import ClientService |
27 | 27 | from jumpstarter.common import TemporaryUnixListener |
28 | 28 | from jumpstarter.common.condition import condition_false, condition_message, condition_present_and_equal, condition_true |
| 29 | +from jumpstarter.common.exceptions import ConnectionError |
29 | 30 | from jumpstarter.common.grpc import translate_grpc_exceptions |
30 | 31 | from jumpstarter.common.streams import connect_router_stream |
31 | 32 | from jumpstarter.config.tls import TLSConfigV1Alpha1 |
@@ -189,8 +190,34 @@ async def handle_async(self, stream): |
189 | 190 | @asynccontextmanager |
190 | 191 | async def serve_unix_async(self): |
191 | 192 | async with TemporaryUnixListener(self.handle_async) as path: |
| 193 | + logger.debug("Serving Unix socket at %s", path) |
| 194 | + await self._wait_for_ready_connection(path) |
| 195 | + # TODO: talk to the exporter to make sure it's ready.... (once we have the hooks) |
192 | 196 | yield path |
193 | 197 |
|
| 198 | + async def _wait_for_ready_connection(self, path: str): |
| 199 | + retries_left = 5 |
| 200 | + logger.info("Waiting for ready connection at %s", path) |
| 201 | + while True: |
| 202 | + try: |
| 203 | + with ExitStack() as stack: |
| 204 | + async with client_from_path(path, self.portal, stack, allow=self.allow, unsafe=self.unsafe) as _: |
| 205 | + break |
| 206 | + except AioRpcError as e: |
| 207 | + if retries_left > 1: |
| 208 | + retries_left -= 1 |
| 209 | + else: |
| 210 | + logger.error("Max retries reached while waiting for ready connection at %s", path) |
| 211 | + raise ConnectionError("Max retries reached while waiting for ready connection at %s" % path) from e |
| 212 | + if e.code().name == "UNAVAILABLE": |
| 213 | + logger.warning("Still waiting for connection to be ready at %s", path) |
| 214 | + else: |
| 215 | + logger.warning("Waiting for ready connection to %s: %s", path, e) |
| 216 | + await sleep(5) |
| 217 | + except Exception as e: |
| 218 | + logger.error("Unexpected error while waiting for ready connection to %s: %s", path, e) |
| 219 | + raise ConnectionError("Unexpected error while waiting for ready connection to %s" % path) from e |
| 220 | + |
194 | 221 | @asynccontextmanager |
195 | 222 | async def monitor_async(self, threshold: timedelta = timedelta(minutes=5)): |
196 | 223 | async def _monitor(): |
|
0 commit comments