7373from iris .cluster .controller .dashboard import ControllerDashboard
7474from iris .cluster .providers .k8s .tasks import K8sTaskProvider
7575from iris .cluster .controller .provider import TaskProvider
76+ from iris .cluster .controller .worker_provider import WorkerProvider
7677from iris .cluster .controller .scheduler import (
7778 JobRequirements ,
7879 Scheduler ,
9495 ReservationClaim ,
9596 SchedulingEvent ,
9697)
97- from iris .cluster .log_store import PROCESS_LOG_KEY , LogStore , LogStoreHandler
98+ from iris .cluster .log_store import CONTROLLER_LOG_KEY , LogStoreHandler
99+ from iris .log_server .server import LogServiceImpl
98100from iris .cluster .types import (
99101 JobName ,
100102 WorkerStatus ,
105107)
106108from rigging .log_setup import slow_log
107109from iris .managed_thread import ManagedThread , ThreadContainer , get_thread_container
108- from iris .rpc import cluster_pb2
110+ from iris .rpc import cluster_pb2 , logging_pb2
109111from iris .rpc .auth import TokenVerifier
110112from rigging .timing import Duration , ExponentialBackoff , RateLimiter , Timer , Timestamp , TokenBucket
111113
123125_HISTORY_CLEANUP_INTERVAL_S = 60.0
124126
125127
128+ class _InProcessLogPusher :
129+ """Adapts LogServiceImpl to the LogPusherProtocol for in-process use.
130+
131+ Avoids a network round-trip when the K8s provider is co-hosted with
132+ the controller: calls push_logs() directly on the service impl.
133+ """
134+
135+ def __init__ (self , log_service : LogServiceImpl ) -> None :
136+ self ._log_service = log_service
137+
138+ def push (self , key : str , entries : list [logging_pb2 .LogEntry ]) -> None :
139+ if entries :
140+ self ._log_service .push_logs (logging_pb2 .PushLogsRequest (key = key , entries = entries ), ctx = None )
141+
142+
126143class SchedulingOutcome (enum .Enum ):
127144 """Result of a scheduling cycle, used to drive adaptive backoff."""
128145
@@ -998,19 +1015,25 @@ def __init__(
9981015 self ._db = db
9991016 else :
10001017 self ._db = ControllerDB (db_dir = config .local_state_dir / "db" )
1001- self ._log_store = LogStore (
1018+
1019+ self ._log_service = LogServiceImpl (
10021020 log_dir = config .local_state_dir / "logs" ,
10031021 remote_log_dir = f"{ config .remote_state_dir .rstrip ('/' )} /logs" ,
10041022 )
10051023
1006- # Wire log store into the K8s provider so its LogCollector can write logs directly.
1007- # Collectors are created lazily on first sync(), so just setting the field is enough.
1024+ # Wire an in-process log pusher into providers so log entries are
1025+ # forwarded through the LogService without a network hop.
1026+ # - K8sTaskProvider: its LogCollector pushes logs directly.
1027+ # - WorkerProvider: forwards log_entries piggybacked on heartbeat
1028+ # responses from old workers that predate push-based logging.
1029+ in_process_log_pusher = _InProcessLogPusher (self ._log_service )
10081030 if isinstance (self ._provider , K8sTaskProvider ):
1009- self ._provider .log_store = self ._log_store
1031+ self ._provider .log_pusher = in_process_log_pusher
1032+ elif isinstance (self ._provider , WorkerProvider ):
1033+ self ._provider .log_pusher = in_process_log_pusher
10101034
10111035 self ._transitions = ControllerTransitions (
10121036 db = self ._db ,
1013- log_store = self ._log_store ,
10141037 heartbeat_failure_threshold = config .heartbeat_failure_threshold ,
10151038 user_budget_defaults = config .user_budget_defaults ,
10161039 )
@@ -1023,20 +1046,23 @@ def __init__(
10231046 self ._db ,
10241047 controller = self ,
10251048 bundle_store = self ._bundle_store ,
1026- log_store = self ._log_store ,
1049+ log_service = self ._log_service ,
10271050 auth = config .auth ,
1051+ system_endpoints = {},
10281052 )
10291053 self ._dashboard = ControllerDashboard (
10301054 self ._service ,
1055+ log_service = self ._log_service ,
10311056 host = config .host ,
10321057 port = config .port ,
10331058 auth_verifier = config .auth_verifier ,
10341059 auth_provider = config .auth_provider ,
10351060 auth_optional = config .auth .optional if config .auth else False ,
10361061 )
10371062
1038- # Ingest process logs into the LogStore so they are available via FetchLogs.
1039- self ._log_store_handler = LogStoreHandler (self ._log_store , key = PROCESS_LOG_KEY )
1063+ # Ingest controller process logs into the LogStore via LogStoreHandler.
1064+ # This writes directly to the co-hosted LogStore (no RPC round-trip).
1065+ self ._log_store_handler = LogStoreHandler (self ._log_service .log_store , key = CONTROLLER_LOG_KEY )
10401066 self ._log_store_handler .setLevel (logging .DEBUG )
10411067 self ._log_store_handler .setFormatter (logging .Formatter ("%(asctime)s %(name)s %(message)s" ))
10421068 logging .getLogger ("iris" ).addHandler (self ._log_store_handler )
@@ -1144,6 +1170,12 @@ def start(self) -> None:
11441170 timeout = Duration .from_seconds (5.0 ),
11451171 )
11461172
1173+ # Register system endpoints. The address here is used for in-process
1174+ # and local-network callers (e.g. CLI, tests). Remote workers fall back
1175+ # to their configured controller_address when the resolved endpoint
1176+ # is unreachable, since the log service is co-hosted on the controller.
1177+ self ._service ._system_endpoints ["/system/log-server" ] = self .url
1178+
11471179 def stop (self ) -> None :
11481180 """Stop all background components gracefully.
11491181
@@ -1183,7 +1215,7 @@ def stop(self) -> None:
11831215 # sqlite3.ProgrammingError spam from late log records.
11841216 logging .getLogger ("iris" ).removeHandler (self ._log_store_handler )
11851217 self ._log_store_handler .close ()
1186- self ._log_store .close ()
1218+ self ._log_service .close ()
11871219 self ._db .close ()
11881220 self ._bundle_store .close ()
11891221
0 commit comments