Skip to content

Commit d925c65

Browse files
authored
feat: Support running configuration scripts after worker reaches STARTED state (#601)
Signed-off-by: David Leong <[email protected]>
1 parent 5935efc commit d925c65

16 files changed

+1150
-27
lines changed

docs/logging.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,18 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
5353
| AgentInfo | None | None | platform; python[interpreter,version]; agent[version,installedAt,runningAs]; depenencies | Information about the running Agent software. |
5454
| API | Req | 📤 | operation; request_url; params; resource (optional) | A request to an AWS API. Only requests to AWS Deadline Cloud APIs contain a resource field. |
5555
| API | Resp | 📥 | operation; params; status_code, request_id; error (optional) | A response from an AWS API request. |
56-
| FileSystem | Read/Write/Create/Delete | 💾 | filepath; message | A filesystem operation. |
5756
| AWSCreds | Load/Install/Delete | 🔑 | resource; message; role_arn (optional) | Related to an operation for AWS Credentials. |
5857
| AWSCreds | Query | 🔑 | resource; message; role_arn (optional); expiry (optional) | Related to an operation for AWS Credentials. |
5958
| AWSCreds | Refresh | 🔑 | resource; message; role_arn (optional); expiry (optional); scheduled_time (optional) | Related to an operation for AWS Credentials. |
59+
| FileSystem | Read/Write/Create/Delete | 💾 | filepath; message | A filesystem operation. |
6060
| Metrics | System | 📊 | many | System metrics. |
6161
| Session | Starting/Failed/AWSCreds/Complete/Info | 🔷 | queue_id; job_id; session_id | An update or information related to a Session. |
6262
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
6363
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
6464
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
6565
| Session | Runtime | 🔷 | queue_id; job_id; session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
6666
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |
67+
| Worker | HostConfiguration | 📜 | farm_id; fleet_id; worker_id (optional); message; status; exit_code (optional); success (optional) | Worker Host configuration event. |
6768

6869
If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the
6970
`structured_logs` option in the [`worker.toml.example`](../src/deadline_worker_agent/installer/worker.toml.example)

docs/state.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ The worker maintains a worker state which is a JSON representation of the follow
4545
```jsonc
4646
{
4747
"worker_id": "<WORKER_ID>",
48-
"instance_id": "<INSTANCE_ID>" # OPTIONAL
48+
"instance_id": "<INSTANCE_ID>", # OPTIONAL
49+
"host_configuration_succeeded": bool # OPTINAL
4950
}
5051
```
5152

@@ -58,6 +59,9 @@ This contains a unique identifier that represents the worker in your AWS Deadlin
5859

5960
This is an optional parameter which contains an identifier of the EC2 instance running the worker, if applicable.
6061

62+
#### `host_configuration_succeeded`
63+
64+
This is an optional parameter which indicates host configuration has been executed on the worker. Workers will only run once per worker.
6165

6266
#### How the State File Affects Startup Behavior
6367

pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ dependencies = [
1414
"requests ~= 2.31",
1515
"boto3 >= 1.34.75",
1616
"deadline == 0.49.*",
17-
"openjd-sessions == 0.10.*",
17+
# Pinned to patch version due to Host Config Script runner usage of private OpenJD Sessions API.
18+
"openjd-sessions == 0.10.1",
1819
# tomli became tomllib in standard library in Python 3.11
1920
"tomli == 2.0.* ; python_version<'3.11'",
2021
"tomlkit == 0.13.*",

src/deadline_worker_agent/api_models.py

+9
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ class AttachmentUploadAction(TypedDict):
103103
taskId: str
104104

105105

106+
class HostConfiguration(TypedDict):
107+
scriptBody: str
108+
"""Host Configuration Script Body."""
109+
110+
scriptTimeoutSeconds: int
111+
"""Customer Supplied timeout."""
112+
113+
106114
class LogConfiguration(TypedDict):
107115
error: NotRequired[str]
108116
logDriver: str
@@ -414,6 +422,7 @@ class UpdateWorkerScheduleRequest(TypedDict):
414422

415423
class UpdateWorkerResponse(TypedDict):
416424
log: NotRequired[LogConfiguration]
425+
hostConfiguration: NotRequired[HostConfiguration]
417426

418427

419428
class IpAddresses(TypedDict):

src/deadline_worker_agent/aws/deadline/__init__.py

+11
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ class DeadlineRequestInterrupted(Exception):
112112
pass
113113

114114

115+
@dataclass(frozen=True)
116+
class WorkerHostConfiguration:
117+
"""Host Configuration after a worker has started."""
118+
119+
script_body: str
120+
"""Host Configuration Script Body."""
121+
122+
script_timeout_seconds: int
123+
"""Customer Supplied timeout."""
124+
125+
115126
@dataclass(frozen=True)
116127
class WorkerLogConfig:
117128
"""The destination where the Worker Agent should synchronize its logs to"""

src/deadline_worker_agent/feature_flag.py

+2
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77
ASSET_SYNC_JOB_USER_FEATURE = (
88
os.environ.get("ASSET_SYNC_JOB_USER_FEATURE", "false").lower() == "true"
99
)
10+
11+
HOST_CONFIGURATION_FEATURE = os.environ.get("HOST_CONFIGURATION_FEATURE", "false").lower() == "true"

src/deadline_worker_agent/log_messages.py

+48
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class WorkerLogEventOp(str, Enum):
164164
ID = "ID" # The ID that the Agent is running as
165165
STATUS = "Status"
166166
DELETE = "Delete"
167+
HOST_CONFIGURATION = "HostConfiguration"
167168

168169

169170
class WorkerLogEvent(BaseLogEvent):
@@ -201,6 +202,53 @@ def asdict(self) -> dict[str, Any]:
201202
return self.add_exception_to_dict(dd)
202203

203204

205+
class WorkerHostConfigurationStatus(str, Enum):
206+
RUNNING = "Running"
207+
SUCCEEDED = "Succeeded"
208+
FAILED = "Failed"
209+
SKIPPED = "Skipped"
210+
211+
212+
class WorkerHostConfigurationLogEvent(WorkerLogEvent):
213+
ti = "📜"
214+
type = "Worker"
215+
status: WorkerHostConfigurationStatus
216+
exit_code: Optional[int]
217+
success: Optional[bool]
218+
219+
def __init__(
220+
self,
221+
*,
222+
farm_id: str,
223+
fleet_id: str,
224+
message: str,
225+
status: WorkerHostConfigurationStatus,
226+
worker_id: Optional[str] = None,
227+
exit_code: Optional[int] = None,
228+
success: Optional[bool] = None,
229+
) -> None:
230+
self.exit_code = exit_code
231+
self.success = success
232+
self.status = status
233+
234+
super().__init__(
235+
op=WorkerLogEventOp.HOST_CONFIGURATION,
236+
farm_id=farm_id,
237+
fleet_id=fleet_id,
238+
worker_id=worker_id,
239+
message=message,
240+
)
241+
242+
def asdict(self) -> dict[str, str]:
243+
dd = super().asdict()
244+
dd.update(status=self.status)
245+
if self.exit_code:
246+
dd.update(exit_code=self.exit_code)
247+
if self.success:
248+
dd.update(success=self.success)
249+
return self.add_exception_to_dict(dd)
250+
251+
204252
class FilesystemLogEventOp(str, Enum):
205253
READ = "Read"
206254
WRITE = "Write"

src/deadline_worker_agent/startup/bootstrap.py

+26-8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from dataclasses import dataclass, asdict, fields, field
66
from time import sleep
7-
from typing import Optional
7+
from typing import Any, Dict, Optional, Tuple, cast
88
import json
99
import logging as _logging
1010
import stat
@@ -16,6 +16,7 @@
1616
from ..aws.deadline import (
1717
DeadlineRequestConditionallyRecoverableError,
1818
DeadlineRequestUnrecoverableError,
19+
WorkerHostConfiguration,
1920
WorkerLogConfig,
2021
construct_worker_log_config,
2122
create_worker,
@@ -100,6 +101,8 @@ class WorkerPersistenceInfo:
100101
)
101102
"""The EC2 instance ID of the Worker, if applicable"""
102103

104+
host_configuration_succeeded: bool | None = field(default=None)
105+
103106
@classmethod
104107
def load(cls, *, config: Configuration) -> Optional[WorkerPersistenceInfo]:
105108
"""Load the Worker Bootstrap from the Worker Agent state persistence file"""
@@ -115,7 +118,7 @@ def load(cls, *, config: Configuration) -> Optional[WorkerPersistenceInfo]:
115118
)
116119

117120
with config.worker_state_file.open("r", encoding="utf8") as fh:
118-
data: dict[str, str] = json.load(fh)
121+
data: dict[str, str | bool] = json.load(fh)
119122

120123
own_fields = set(f.name for f in fields(class_or_instance=WorkerPersistenceInfo))
121124
selected_data = {key: value for key, value in data.items() if key in own_fields}
@@ -129,7 +132,7 @@ def load(cls, *, config: Configuration) -> Optional[WorkerPersistenceInfo]:
129132
)
130133
)
131134

132-
return cls(**selected_data)
135+
return cls(**cast(Dict[str, Any], selected_data))
133136

134137
def save(self, *, config: Configuration) -> None:
135138
"""Save the Worker Bootstrap to the Worker Agent state persistence file"""
@@ -185,6 +188,9 @@ class WorkerBootstrap:
185188
log_config: Optional[WorkerLogConfig] = None
186189
"""The log configuration for the Worker"""
187190

191+
host_config: Optional[WorkerHostConfiguration] = None
192+
"""The host configuration for the Worker"""
193+
188194

189195
def bootstrap_worker(config: Configuration, *, use_existing_worker: bool = True) -> WorkerBootstrap:
190196
"""Contains startup logic to ensure that the Worker is created and started"""
@@ -217,7 +223,7 @@ def bootstrap_worker(config: Configuration, *, use_existing_worker: bool = True)
217223

218224
try:
219225
# raises: BootstrapWithoutWorkerLoad, SystemExit
220-
log_config = _start_worker(
226+
log_config, host_config = _start_worker(
221227
deadline_client=deadline_client,
222228
config=config,
223229
worker_id=worker_info.worker_id,
@@ -246,6 +252,7 @@ def bootstrap_worker(config: Configuration, *, use_existing_worker: bool = True)
246252
worker_info=worker_info,
247253
session=worker_session,
248254
log_config=log_config,
255+
host_config=host_config,
249256
)
250257

251258

@@ -437,15 +444,16 @@ def _start_worker(
437444
config: Configuration,
438445
worker_id: str,
439446
has_existing_worker: bool,
440-
) -> Optional[WorkerLogConfig]:
447+
) -> Tuple[Optional[WorkerLogConfig], Optional[WorkerHostConfiguration]]:
441448
"""Updates the Worker in the service to the STARTED state.
442449
443450
Returns:
444451
Optional[WorkerLogConfig] -- Non-None only if the UpdateWorker request
445452
contained a log configuration for the Worker Agent to use for writing
446453
its own logs. The returned WorkerLogConfig is the configuration that it
447454
should use.
448-
455+
Optional[WorkerHostConfiguration] -- Non-None if UpdateWorker request contains a
456+
host configuration for the Worker Agent to configure the host.
449457
Raises:
450458
BootstrapWithoutWorkerLoad
451459
SystemExit
@@ -498,9 +506,19 @@ def _start_worker(
498506
)
499507
)
500508

509+
logging_config = None
501510
if log_config := response.get("log"):
502-
return construct_worker_log_config(log_config=log_config)
503-
return None
511+
logging_config = construct_worker_log_config(log_config=log_config)
512+
513+
host_config = None
514+
if response_host_config := response.get("hostConfiguration"):
515+
# scriptTimeoutSeconds is technically always there from the backend.
516+
host_config = WorkerHostConfiguration(
517+
script_body=response_host_config.get("scriptBody", ""),
518+
script_timeout_seconds=response_host_config["scriptTimeoutSeconds"],
519+
)
520+
521+
return logging_config, host_config
504522

505523

506524
def _enforce_no_instance_profile_or_stop_worker(

src/deadline_worker_agent/startup/entrypoint.py

+77
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from typing import Optional
1616
from pathlib import Path
1717

18+
from ..feature_flag import HOST_CONFIGURATION_FEATURE
19+
1820
from ..api_models import WorkerStatus
1921
from ..aws.deadline import (
2022
update_worker,
@@ -30,13 +32,16 @@
3032
from ..log_messages import (
3133
AgentInfoLogEvent,
3234
LogRecordStringTranslationFilter,
35+
WorkerHostConfigurationStatus,
3336
WorkerLogEvent,
3437
WorkerLogEventOp,
38+
WorkerHostConfigurationLogEvent,
3539
)
3640
from ..log_sync.cloudwatch import stream_cloudwatch_logs
3741
from ..log_sync.loggers import ROOT_LOGGER, logger as log_sync_logger
3842
from ..worker import Worker
3943
from .bootstrap import bootstrap_worker
44+
from .host_configuration_script import HostConfigurationScriptRunner
4045

4146
__all__ = ["entrypoint"]
4247
_logger = logging.getLogger(__name__)
@@ -163,6 +168,78 @@ def filter(self, record: logging.LogRecord) -> bool:
163168
# logs that we forward to CloudWatch.
164169
_log_agent_info()
165170

171+
# If there was a host config, and it was bootstrapped before, only log a message.
172+
if worker_bootstrap.host_config and not HOST_CONFIGURATION_FEATURE:
173+
_logger.info(
174+
WorkerHostConfigurationLogEvent(
175+
farm_id=config.farm_id,
176+
fleet_id=config.fleet_id,
177+
worker_id=worker_id,
178+
message="Host Configuration Feature is not enabled.",
179+
status=WorkerHostConfigurationStatus.SKIPPED,
180+
)
181+
)
182+
elif (
183+
worker_bootstrap.host_config
184+
and worker_bootstrap.worker_info.host_configuration_succeeded
185+
):
186+
_logger.info(
187+
WorkerHostConfigurationLogEvent(
188+
farm_id=config.farm_id,
189+
fleet_id=config.fleet_id,
190+
worker_id=worker_id,
191+
message="Host Configuration has been setup before. Not running config scripts.",
192+
status=WorkerHostConfigurationStatus.SKIPPED,
193+
)
194+
)
195+
# Before the run looop starts, run the Host Configuration script.
196+
elif worker_bootstrap.host_config:
197+
_logger.info(
198+
WorkerHostConfigurationLogEvent(
199+
farm_id=config.farm_id,
200+
fleet_id=config.fleet_id,
201+
worker_id=worker_id,
202+
message="Running host configuration script.",
203+
status=WorkerHostConfigurationStatus.RUNNING,
204+
)
205+
)
206+
host_config_runner = HostConfigurationScriptRunner(
207+
logger=_logger,
208+
configuration=config,
209+
worker_id=worker_id,
210+
session_directory=config.worker_persistence_dir,
211+
worker_boto3_session=session,
212+
host_configuration_script=worker_bootstrap.host_config.script_body,
213+
host_configuration_timeout_seconds=worker_bootstrap.host_config.script_timeout_seconds,
214+
)
215+
exit_code = host_config_runner.run()
216+
if exit_code == 0:
217+
_logger.info(
218+
WorkerHostConfigurationLogEvent(
219+
farm_id=config.farm_id,
220+
fleet_id=config.fleet_id,
221+
worker_id=worker_id,
222+
message="Worker Agent host configuration succeeded. Starting worker session loop.",
223+
status=WorkerHostConfigurationStatus.SUCCEEDED,
224+
exit_code=exit_code,
225+
)
226+
)
227+
# Persist host configuration has been performed.
228+
worker_bootstrap.worker_info.host_configuration_succeeded = True
229+
worker_bootstrap.worker_info.save(config=config)
230+
else:
231+
_logger.critical(
232+
WorkerHostConfigurationLogEvent(
233+
farm_id=config.farm_id,
234+
fleet_id=config.fleet_id,
235+
worker_id=worker_id,
236+
message=f"Worker Agent host configuration failed with exit code {exit_code}. Cannot run jobs, exiting.",
237+
status=WorkerHostConfigurationStatus.FAILED,
238+
exit_code=exit_code,
239+
)
240+
)
241+
sys.exit(1)
242+
166243
worker_sessions = Worker(
167244
farm_id=config.farm_id,
168245
fleet_id=config.fleet_id,

0 commit comments

Comments
 (0)