Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 24 additions & 51 deletions lib/iris/src/iris/cluster/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ def start(self) -> None:
timeout_ms=60_000,
interceptors=interceptors,
)
self._log_pusher = LogPusher(
"/system/log-server",
interceptors=interceptors,
resolver=self._resolve_log_service,
)

# Start lifecycle thread: register + serve + reset loop
self._threads.spawn(target=self._run_lifecycle, name="worker-lifecycle")
Expand Down Expand Up @@ -460,61 +465,29 @@ def _register(self, stop_event: threading.Event) -> str | None:

return None

def _resolve_log_service(self) -> str | None:
"""Resolve the LogService address via the /system/log-server endpoint.

Called before registration, so the controller may not yet be reachable.
Treats RPC errors and missing endpoints the same: log a warning and
return None so the caller can skip remote log attachment without
crashing the lifecycle thread.
"""
if not self._controller_client:
return None
try:
resp = self._controller_client.list_endpoints(
controller_pb2.Controller.ListEndpointsRequest(
prefix="/system/log-server",
exact=True,
),
)
except Exception as e:
logger.warning("Failed to resolve /system/log-server: %s", e)
return None
def _resolve_log_service(self, server_url: str) -> str:
"""Look up ``server_url`` on the controller's endpoint registry."""
if self._controller_client is None:
raise ConnectionError("worker controller client not yet initialized")
resp = self._controller_client.list_endpoints(
controller_pb2.Controller.ListEndpointsRequest(prefix=server_url, exact=True),
)
if not resp.endpoints:
logger.warning("No /system/log-server endpoint registered on controller")
return None
addr = resp.endpoints[0].address
logger.info("Resolved /system/log-server -> %s", addr)
return addr
raise ConnectionError(f"No {server_url!r} endpoint registered on controller")
return resp.endpoints[0].address

def _attach_log_handler(self) -> None:
"""Create LogPusher and attach RemoteLogHandler under ``worker_log_key``.

Always tears down any existing handler first so each lifecycle cycle
re-resolves /system/log-server (picking up log-server failover) and
rebuilds the LogPusher against the fresh address.

Skipped when ``self._worker_id`` is not yet known locally — in that
(rare) case the controller will assign an id during ``_register`` and
the lifecycle loop re-calls this method with the canonical id.
"""
self._detach_log_handler()
if not self._worker_id:
return
log_addr = self._resolve_log_service()
if not log_addr:
"""Attach or rename the remote log handler under ``worker_log_key(self._worker_id)``."""
if not self._worker_id or self._log_pusher is None:
return
log_interceptors = ()
if self._config.auth_token:
log_interceptors = (AuthTokenInjector(StaticTokenProvider(self._config.auth_token)),)
self._log_pusher = LogPusher(log_addr, interceptors=log_interceptors)
self._log_handler = RemoteLogHandler(
self._log_pusher,
key=worker_log_key(self._worker_id),
)
self._log_handler.setLevel(logging.INFO)
self._log_handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s"))
logging.getLogger().addHandler(self._log_handler)
key = worker_log_key(self._worker_id)
if self._log_handler is None:
self._log_handler = RemoteLogHandler(self._log_pusher, key=key)
self._log_handler.setLevel(logging.INFO)
self._log_handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s"))
logging.getLogger().addHandler(self._log_handler)
else:
self._log_handler.key = key

def _detach_log_handler(self) -> None:
"""Remove and close the current RemoteLogHandler and LogPusher if any."""
Expand Down
Loading
Loading