16
16
import tempfile
17
17
import time
18
18
from dataclasses import dataclass , field , InitVar , replace
19
- from typing import Any , ClassVar , Optional , cast
19
+ from typing import TYPE_CHECKING , Any , ClassVar , Optional , cast
20
20
21
21
from ..models import (
22
22
PipInstall ,
23
23
PosixSessionUser ,
24
24
)
25
- from .resources import Fleet
25
+ from .resources import CloudWatchLogEvent , Fleet , WorkerLog
26
26
from ..util import call_api , wait_for
27
27
28
+ if TYPE_CHECKING :
29
+ from botocore .paginate import PageIterator , Paginator
30
+
28
31
LOG = logging .getLogger (__name__ )
29
32
30
33
DOCKER_CONTEXT_DIR = os .path .join (os .path .dirname (__file__ ), ".." , "containers" , "worker" )
@@ -48,6 +51,15 @@ def get_worker_id(self) -> str:
48
51
pass
49
52
50
53
54
+ @dataclass (frozen = True )
55
+ class WorkerLogConfig :
56
+ cloudwatch_log_group : str
57
+ """The name of the CloudWatch Log Group that the Agent log should be streamed to"""
58
+
59
+ cloudwatch_log_stream : str
60
+ """The name of the CloudWatch Log Stream that the Agent log should be streamed to"""
61
+
62
+
51
63
@dataclass (frozen = True )
52
64
class CommandResult : # pragma: no cover
53
65
exit_code : int
@@ -252,6 +264,46 @@ def set_stopped_status(self):
252
264
LOG .exception (f"Failed to update worker status: { error } " )
253
265
raise
254
266
267
+ def get_worker_logs (self ) -> Optional [WorkerLogConfig ]:
268
+ """Get the log group and log stream for the worker. Retain the API structure"""
269
+ response = self .deadline_client .get_worker (
270
+ farmId = self .configuration .farm_id ,
271
+ fleetId = self .configuration .fleet .id ,
272
+ workerId = self .worker_id ,
273
+ )
274
+ if log_config := response ["log" ]:
275
+ LOG .info (f"LogGroup structure { log_config } " )
276
+ if log_config_options := log_config .get ("options" ):
277
+ log_group_name = log_config_options .get ("logGroupName" )
278
+ log_stream_name = log_config_options .get ("logStreamName" )
279
+ if log_group_name and log_stream_name :
280
+ return WorkerLogConfig (
281
+ cloudwatch_log_group = log_group_name , cloudwatch_log_stream = log_stream_name
282
+ )
283
+ # Default, no log group yet.
284
+ return None
285
+
286
+ def get_all_worker_logs (self , * , logs_client : botocore .client .BaseClient ) -> WorkerLog :
287
+ # Get the worker log group and stream from the service.
288
+ log_config = self .get_worker_logs ()
289
+ if not log_config :
290
+ return WorkerLog (worker_id = self .worker_id , logs = []) # type: ignore[arg-type]
291
+
292
+ filter_log_events_paginator : Paginator = logs_client .get_paginator ("filter_log_events" )
293
+ filter_log_events_pages : PageIterator = call_api (
294
+ description = f"Fetching log events for session { self .worker_id } in log group { log_config .cloudwatch_log_group } " ,
295
+ fn = lambda : filter_log_events_paginator .paginate (
296
+ logGroupName = log_config .cloudwatch_log_group ,
297
+ logStreamNames = [log_config .cloudwatch_log_stream ],
298
+ ),
299
+ )
300
+ log_events = filter_log_events_pages .build_full_result ()
301
+ log_events = [CloudWatchLogEvent .from_api_response (e ) for e in log_events ["events" ]]
302
+ # For debugging test cases.
303
+ # LOG.info(log_events)
304
+
305
+ return WorkerLog (worker_id = self .worker_id , logs = log_events ) # type: ignore[arg-type]
306
+
255
307
def send_command (self , command : str ) -> CommandResult :
256
308
"""Send a command via SSM to a shell on a launched EC2 instance. Once the command has fully
257
309
finished the result of the invocation is returned.
0 commit comments