-
Notifications
You must be signed in to change notification settings - Fork 19
feat: Add support to fetch and match worker logs #183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,15 +16,18 @@ | |
| import tempfile | ||
| import time | ||
| from dataclasses import dataclass, field, InitVar, replace | ||
| from typing import Any, ClassVar, Optional, cast | ||
| from typing import TYPE_CHECKING, Any, ClassVar, Optional, cast | ||
|
|
||
| from ..models import ( | ||
| PipInstall, | ||
| PosixSessionUser, | ||
| ) | ||
| from .resources import Fleet | ||
| from .resources import CloudWatchLogEvent, Fleet, WorkerLog | ||
| from ..util import call_api, wait_for | ||
|
|
||
| if TYPE_CHECKING: | ||
| from botocore.paginate import PageIterator, Paginator | ||
|
|
||
| LOG = logging.getLogger(__name__) | ||
|
|
||
| DOCKER_CONTEXT_DIR = os.path.join(os.path.dirname(__file__), "..", "containers", "worker") | ||
|
|
@@ -48,6 +51,15 @@ def get_worker_id(self) -> str: | |
| pass | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class WorkerLogConfig: | ||
| cloudwatch_log_group: str | ||
| """The name of the CloudWatch Log Group that the Agent log should be streamed to""" | ||
|
|
||
| cloudwatch_log_stream: str | ||
| """The name of the CloudWatch Log Stream that the Agent log should be streamed to""" | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class CommandResult: # pragma: no cover | ||
| exit_code: int | ||
|
|
@@ -252,6 +264,46 @@ def set_stopped_status(self): | |
| LOG.exception(f"Failed to update worker status: {error}") | ||
| raise | ||
|
|
||
| def _get_worker_logs(self) -> Optional[WorkerLogConfig]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - what about
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arrrg, I had it as the a non private name before but went for |
||
| """Get the log group and log stream for the worker. Retain the API structure""" | ||
| response = self.deadline_client.get_worker( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: do we have a data class for GetWorker response?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have typing in this library for boto classes :(
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The data class comes from the design and structure of this library. There's no use of boto3 generated types There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we type it as |
||
| farmId=self.configuration.farm_id, | ||
| fleetId=self.configuration.fleet.id, | ||
| workerId=self.worker_id, | ||
| ) | ||
| if log_config := response.get("log"): | ||
| LOG.info(f"Log Config structure {log_config}") | ||
| if log_config_options := log_config.get("options"): | ||
| log_group_name = log_config_options.get("logGroupName") | ||
| log_stream_name = log_config_options.get("logStreamName") | ||
| if log_group_name and log_stream_name: | ||
| return WorkerLogConfig( | ||
| cloudwatch_log_group=log_group_name, cloudwatch_log_stream=log_stream_name | ||
| ) | ||
| # Default, no log config yet. | ||
| return None | ||
|
|
||
| def get_logs(self, *, logs_client: botocore.client.BaseClient) -> WorkerLog: | ||
| # Get the worker log group and stream from the service. | ||
| log_config: Optional[WorkerLogConfig] = self._get_worker_logs() | ||
| if not log_config: | ||
| return WorkerLog(worker_id=self.worker_id, logs=[]) # type: ignore[arg-type] | ||
|
|
||
| filter_log_events_paginator: Paginator = logs_client.get_paginator("filter_log_events") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make sure caller has the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same in the rest of the library and we don't check
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This introduce FilterLogEvents call to fleet logs while the prior code filter on session logs no? The permission for CWL is configured at log group level, that's why I suggested checking. Took a quick look seems we have have this permission on all deadline log groups |
||
| filter_log_events_pages: PageIterator = call_api( | ||
| description=f"Fetching log events for worker {self.worker_id} in log group {log_config.cloudwatch_log_group}", | ||
| fn=lambda: filter_log_events_paginator.paginate( | ||
| logGroupName=log_config.cloudwatch_log_group, | ||
| logStreamNames=[log_config.cloudwatch_log_stream], | ||
| ), | ||
| ) | ||
| log_events = filter_log_events_pages.build_full_result() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: type
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's no typing for boto in this package. |
||
| log_events = [CloudWatchLogEvent.from_api_response(e) for e in log_events["events"]] | ||
| # For debugging test cases. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we remove this comment? Doesn't seem to be adding any value.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do, otherwise everyone wants to remove LOG.info :) |
||
| # LOG.info(log_events) | ||
|
|
||
| return WorkerLog(worker_id=self.worker_id, logs=log_events) # type: ignore[arg-type] | ||
|
|
||
| def send_command(self, command: str) -> CommandResult: | ||
| """Send a command via SSM to a shell on a launched EC2 instance. Once the command has fully | ||
| finished the result of the invocation is returned. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious - why only import those during type checking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done in the rest of the library too, there's some differences in versions of python I think.