forked from aws-deadline/deadline-cloud-test-fixtures
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
1084 lines (910 loc) · 43.2 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from __future__ import annotations
import abc
import botocore.client
import botocore.exceptions
import glob
import json
import logging
import os
import pathlib
import posixpath
import re
import shutil
import subprocess
import tempfile
import time
from dataclasses import dataclass, field, InitVar, replace
from typing import Any, ClassVar, Optional, cast
from ..models import (
PipInstall,
PosixSessionUser,
)
from .resources import Fleet
from ..util import call_api, wait_for
LOG = logging.getLogger(__name__)
DOCKER_CONTEXT_DIR = os.path.join(os.path.dirname(__file__), "..", "containers", "worker")
class DeadlineWorker(abc.ABC):
@abc.abstractmethod
def start(self) -> None:
pass
@abc.abstractmethod
def stop(self) -> None:
pass
@abc.abstractmethod
def send_command(self, command: str) -> CommandResult:
pass
@abc.abstractmethod
def get_worker_id(self) -> str:
pass
@dataclass(frozen=True)
class CommandResult: # pragma: no cover
exit_code: int
stdout: str
stderr: Optional[str] = None
def __str__(self) -> str:
return "\n".join(
[
f"exit_code: {self.exit_code}",
"",
"================================",
"========= BEGIN stdout =========",
"================================",
"",
self.stdout,
"",
"==============================",
"========= END stdout =========",
"==============================",
"",
"================================",
"========= BEGIN stderr =========",
"================================",
"",
str(self.stderr),
"",
"==============================",
"========= END stderr =========",
"==============================",
]
)
@dataclass(frozen=True)
class DeadlineWorkerConfiguration:
farm_id: str
fleet: Fleet
region: str
allow_shutdown: bool
worker_agent_install: PipInstall
start_service: bool = True
no_install_service: bool = False
service_model_path: str | None = None
no_local_session_logs: str | None = None
disallow_instance_profile: str | None = None
file_mappings: list[tuple[str, str]] | None = None
"""Mapping of files to copy from host environment to worker environment"""
pre_install_commands: list[str] | None = None
"""Commands to run before installing the Worker agent"""
job_user: str = field(default="job-user")
agent_user: str = field(default="deadline-worker")
job_user_group: str = field(default="deadline-job-users")
job_users: list[PosixSessionUser] = field(
default_factory=lambda: [PosixSessionUser("job-user", "job-user")]
)
"""Additional job users to configure for Posix workers"""
windows_job_users: list = field(default_factory=lambda: ["job-user"])
"""Additional job users to configure for Windows workers"""
session_root_dir: str | None = None
"""Path to parent directory of worker session directories"""
@dataclass
class EC2InstanceWorker(DeadlineWorker):
subnet_id: str
security_group_id: str
instance_profile_name: str
bootstrap_bucket_name: str
s3_client: botocore.client.BaseClient
ec2_client: botocore.client.BaseClient
ssm_client: botocore.client.BaseClient
deadline_client: botocore.client.BaseClient
configuration: DeadlineWorkerConfiguration
instance_type: str
instance_shutdown_behavior: str
instance_id: Optional[str] = field(init=False, default=None)
worker_id: Optional[str] = field(init=False, default=None)
"""
Option to override the AMI ID for the EC2 instance. If no override is provided, the default will depend on the subclass being instansiated.
"""
override_ami_id: InitVar[Optional[str]] = None
def __post_init__(self, override_ami_id: Optional[str] = None):
if override_ami_id:
self._ami_id = override_ami_id
@abc.abstractmethod
def ami_ssm_param_name(self) -> str:
raise NotImplementedError("'ami_ssm_param_name' was not implemented.")
@abc.abstractmethod
def ssm_document_name(self) -> str:
raise NotImplementedError("'ssm_document_name' was not implemented.")
@abc.abstractmethod
def _start_worker_agent(self) -> None: # pragma: no cover
raise NotImplementedError("'_start_worker_agent' was not implemented.")
@abc.abstractmethod
def configure_worker_command(
self, *, config: DeadlineWorkerConfiguration
) -> str: # pragma: no cover
raise NotImplementedError("'configure_worker_command' was not implemented.")
@abc.abstractmethod
def start_worker_service(self) -> None: # pragma: no cover
raise NotImplementedError("'_start_worker_service' was not implemented.")
@abc.abstractmethod
def stop_worker_service(self) -> None: # pragma: no cover
raise NotImplementedError("'_stop_worker_service' was not implemented.")
@abc.abstractmethod
def get_worker_id(self) -> str:
raise NotImplementedError("'get_worker_id' was not implemented.")
@abc.abstractmethod
def userdata(self, s3_files) -> str:
raise NotImplementedError("'userdata' was not implemented.")
@abc.abstractmethod
def ebs_devices(self) -> dict[str, int] | None:
"""DeviceName -> VolumeSize (in GiBs) mapping"""
raise NotImplementedError("'ebs_devices' was not implemented.")
def start(self) -> None:
s3_files = self._stage_s3_bucket()
self._launch_instance(s3_files=s3_files)
self._start_worker_agent()
def stop(self) -> None:
LOG.info(f"Terminating EC2 instance {self.instance_id}")
self.ec2_client.terminate_instances(InstanceIds=[self.instance_id])
self.instance_id = None
if not self.configuration.fleet.autoscaling:
try:
self.wait_until_stopped()
except TimeoutError:
LOG.warning(
f"{self.worker_id} did not transition to a STOPPED status, forcibly stopping..."
)
self.set_stopped_status()
try:
self.delete()
except botocore.exceptions.ClientError as error:
LOG.exception(f"Failed to delete worker: {error}")
raise
def delete(self):
try:
self.deadline_client.delete_worker(
farmId=self.configuration.farm_id,
fleetId=self.configuration.fleet.id,
workerId=self.worker_id,
)
LOG.info(f"{self.worker_id} has been deleted from {self.configuration.fleet.id}")
except botocore.exceptions.ClientError as error:
LOG.exception(f"Failed to delete worker: {error}")
raise
def wait_until_stopped(
self, *, max_checks: int = 25, seconds_between_checks: float = 5
) -> None:
for _ in range(max_checks):
response = self.deadline_client.get_worker(
farmId=self.configuration.farm_id,
fleetId=self.configuration.fleet.id,
workerId=self.worker_id,
)
if response["status"] == "STOPPED":
LOG.info(f"{self.worker_id} is STOPPED")
break
time.sleep(seconds_between_checks)
LOG.info(f"Waiting for {self.worker_id} to transition to STOPPED status")
else:
raise TimeoutError
def set_stopped_status(self):
LOG.info(f"Setting {self.worker_id} to STOPPED status")
try:
self.deadline_client.update_worker(
farmId=self.configuration.farm_id,
fleetId=self.configuration.fleet.id,
workerId=self.worker_id,
status="STOPPED",
)
except botocore.exceptions.ClientError as error:
LOG.exception(f"Failed to update worker status: {error}")
raise
def send_command(self, command: str) -> CommandResult:
"""Send a command via SSM to a shell on a launched EC2 instance. Once the command has fully
finished the result of the invocation is returned.
"""
ssm_waiter = self.ssm_client.get_waiter("command_executed")
# To successfully send an SSM Command to an instance the instance must:
# 1) Be in RUNNING state;
# 2) Have the AWS Systems Manager (SSM) Agent running; and
# 3) Have had enough time for the SSM Agent to connect to System's Manager
#
# If we send an SSM command then we will get an InvalidInstanceId error
# if the instance isn't in that state.
NUM_RETRIES = 60
SLEEP_INTERVAL_S = 10
for i in range(0, NUM_RETRIES):
LOG.info(f"Sending SSM command to instance {self.instance_id}")
try:
send_command_response = self.ssm_client.send_command(
InstanceIds=[self.instance_id],
DocumentName=self.ssm_document_name(),
Parameters={"commands": [command]},
)
break
except botocore.exceptions.ClientError as error:
error_code = error.response["Error"]["Code"]
if error_code == "InvalidInstanceId" and i < NUM_RETRIES - 1:
LOG.warning(
f"Instance {self.instance_id} is not ready for SSM command (received InvalidInstanceId error). Retrying in {SLEEP_INTERVAL_S}s."
)
time.sleep(SLEEP_INTERVAL_S)
continue
raise
command_id = send_command_response["Command"]["CommandId"]
LOG.info(f"Waiting for SSM command {command_id} to reach a terminal state")
try:
ssm_waiter.wait(
InstanceId=self.instance_id,
CommandId=command_id,
WaiterConfig={"Delay": 5, "MaxAttempts": 30},
)
except botocore.exceptions.WaiterError: # pragma: no cover
# Swallow exception, we're going to check the result anyway
pass
ssm_command_result = self.ssm_client.get_command_invocation(
InstanceId=self.instance_id,
CommandId=command_id,
)
result = CommandResult(
exit_code=ssm_command_result["ResponseCode"],
stdout=ssm_command_result["StandardOutputContent"],
stderr=ssm_command_result["StandardErrorContent"],
)
if result.exit_code == -1: # pragma: no cover
# Response code of -1 in a terminal state means the command was not received by the node
LOG.error(f"Failed to send SSM command {command_id} to {self.instance_id}: {result}")
LOG.info(f"SSM command {command_id} completed with exit code: {result.exit_code}")
return result
def _stage_s3_bucket(self) -> list[tuple[str, str]] | None:
"""Stages file_mappings to an S3 bucket and returns the mapping of S3 URI to dest path"""
if not self.configuration.file_mappings:
LOG.info("No file mappings to stage to S3")
return None
s3_to_src_mapping: dict[str, str] = {}
s3_to_dst_mapping: dict[str, str] = {}
for src_glob, dst in self.configuration.file_mappings:
for src_file in glob.glob(src_glob):
s3_key = f"worker/{os.path.basename(src_file)}"
assert s3_key not in s3_to_src_mapping, (
"Duplicate S3 keys generated for file mappings. All source files must have unique "
+ f"filenames. Mapping: {self.configuration.file_mappings}"
)
s3_to_src_mapping[s3_key] = src_file
s3_to_dst_mapping[f"s3://{self.bootstrap_bucket_name}/{s3_key}"] = dst
for key, local_path in s3_to_src_mapping.items():
LOG.info(f"Uploading file {local_path} to s3://{self.bootstrap_bucket_name}/{key}")
try:
# self.s3_client.upload_file(local_path, self.bootstrap_bucket_name, key)
with open(local_path, mode="rb") as f:
self.s3_client.put_object(
Bucket=self.bootstrap_bucket_name,
Key=key,
Body=f,
)
except botocore.exceptions.ClientError as e:
LOG.exception(
f"Failed to upload file {local_path} to s3://{self.bootstrap_bucket_name}/{key}: {e}"
)
raise
return list(s3_to_dst_mapping.items())
def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> None:
assert (
not self.instance_id
), "Attempted to launch EC2 instance when one was already launched"
LOG.info("Launching EC2 instance")
LOG.info(
json.dumps(
{
"AMI_ID": self.ami_id,
"Instance Profile": self.instance_profile_name,
"User Data": self.userdata(s3_files),
},
indent=4,
sort_keys=True,
)
)
run_instance_request = {
"MinCount": 1,
"MaxCount": 1,
"ImageId": self.ami_id,
"InstanceType": self.instance_type,
"IamInstanceProfile": {"Name": self.instance_profile_name},
"SubnetId": self.subnet_id,
"SecurityGroupIds": [self.security_group_id],
"MetadataOptions": {"HttpTokens": "required", "HttpEndpoint": "enabled"},
"TagSpecifications": [
{
"ResourceType": "instance",
"Tags": [
{
"Key": "InstanceIdentification",
"Value": "DeadlineScaffoldingWorker",
}
],
}
],
"InstanceInitiatedShutdownBehavior": self.instance_shutdown_behavior,
"UserData": self.userdata(s3_files),
}
devices = self.ebs_devices() or {}
device_mappings = [
{"DeviceName": name, "Ebs": {"VolumeSize": size}} for name, size in devices.items()
]
if device_mappings:
run_instance_request["BlockDeviceMappings"] = device_mappings
run_instance_response = self.ec2_client.run_instances(**run_instance_request)
self.instance_id = run_instance_response["Instances"][0]["InstanceId"]
LOG.info(f"Launched EC2 instance {self.instance_id}")
LOG.info(f"Waiting for EC2 instance {self.instance_id} status to be OK")
instance_running_waiter = self.ec2_client.get_waiter("instance_status_ok")
instance_running_waiter.wait(
InstanceIds=[self.instance_id],
WaiterConfig={"Delay": 15, "MaxAttempts": 75},
)
LOG.info(f"EC2 instance {self.instance_id} status is OK")
@property
def ami_id(self) -> str:
if not hasattr(self, "_ami_id"):
response = call_api(
description=f"Getting latest {type(self)} AMI ID from SSM parameter {self.ami_ssm_param_name()}",
fn=lambda: self.ssm_client.get_parameters(Names=[self.ami_ssm_param_name()]),
)
parameters = response.get("Parameters", [])
assert (
len(parameters) == 1
), f"Received incorrect number of SSM parameters. Expected 1, got response: {response}"
self._ami_id = parameters[0]["Value"]
LOG.info(f"Using latest {type(self)} AMI {self._ami_id}")
return self._ami_id
@dataclass
class WindowsInstanceWorkerBase(EC2InstanceWorker):
"""Base class from which Windows ec2 test instances are derived.
The methods in this base class are written with two cases of worker hosts in mind:
1. A host that is based on a stock Windows server AMI, with no Deadline-anything installed, that
must install the worker agent and the like during boot-up.
2. A host that already has the worker agent, job/agent users, and the like baked into
the host AMI in a location & manner that may differ from case (1).
"""
def ebs_devices(self) -> dict[str, int] | None:
"""DeviceName -> VolumeSize (in GiBs) mapping"""
# defaults to 60GB to match SMF, aws gives 30GB by default
return {"/dev/sda1": 60}
def ssm_document_name(self) -> str:
return "AWS-RunPowerShellScript"
def _start_worker_agent(self) -> None:
assert self.instance_id
LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}")
cmd_result = self.send_command(
f"{self.configure_worker_command(config=self.configuration)}"
)
LOG.info("Successfully configured Worker agent")
LOG.info("Sending SSM Command to check if Worker Agent is running")
cmd_result = self.send_command(
" ; ".join(
[
"echo 'Running Get-Process to check if the agent is running'",
'for($i=1; $i -le 30 -and "" -ne $err ; $i++){sleep $i; Get-Process pythonservice -ErrorVariable err}',
"IF(Get-Process pythonservice){echo '+++SERVICE IS RUNNING+++'}ELSE{echo '+++SERVICE NOT RUNNING+++'; Get-Content -Encoding utf8 C:\ProgramData\Amazon\Deadline\Logs\worker-agent-bootstrap.log,C:\ProgramData\Amazon\Deadline\Logs\worker-agent.log; exit 1}",
]
),
)
assert cmd_result.exit_code == 0, f"Failed to start Worker agent: {cmd_result}"
LOG.info("Successfully started Worker agent")
self.worker_id = self.get_worker_id()
def configure_worker_common(self, *, config: DeadlineWorkerConfiguration) -> str:
"""Get the command to configure the Worker. This must be run as Administrator.
This cannot assume that the agent user exists.
"""
cmds = ["$ErrorActionPreference = 'Stop'"]
if config.service_model_path:
cmds.append(
f"aws configure add-model --service-model file://{config.service_model_path} --service-name deadline; "
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\Administrator\\.aws\\models -Recurse; "
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\{config.job_user}\\.aws\\models -Recurse"
)
if config.no_local_session_logs:
cmds.append(
"[System.Environment]::SetEnvironmentVariable('DEADLINE_WORKER_LOCAL_SESSION_LOGS', 'false', [System.EnvironmentVariableTarget]::Machine); "
"$env:DEADLINE_WORKER_LOCAL_SESSION_LOGS = [System.Environment]::GetEnvironmentVariable('DEADLINE_WORKER_LOCAL_SESSION_LOGS','Machine')",
)
if os.environ.get("DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE"):
LOG.info(
f"Using DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE: {os.environ.get('DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE')}"
)
cmds.append(
f"[System.Environment]::SetEnvironmentVariable('DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE', '{os.environ.get('DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE')}', [System.EnvironmentVariableTarget]::Machine); "
"$env:DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE = [System.Environment]::GetEnvironmentVariable('DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE','Machine')",
)
if os.environ.get("AWS_ENDPOINT_URL_DEADLINE"):
LOG.info(
f"Using AWS_ENDPOINT_URL_DEADLINE: {os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}"
)
cmds.append(
f"[System.Environment]::SetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE', '{os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}', [System.EnvironmentVariableTarget]::Machine); "
"$env:AWS_ENDPOINT_URL_DEADLINE = [System.Environment]::GetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE','Machine')",
)
return "; ".join(cmds)
def start_worker_service(self):
LOG.info("Sending command to start the Worker Agent service")
cmd_result = self.send_command('Start-Service -Name "DeadlineWorker"')
assert cmd_result.exit_code == 0, f"Failed to start Worker Agent service: : {cmd_result}"
def stop_worker_service(self):
LOG.info("Sending command to stop the Worker Agent service")
cmd_result = self.send_command('Stop-Service -Name "DeadlineWorker"')
assert cmd_result.exit_code == 0, f"Failed to stop Worker Agent service: : {cmd_result}"
def get_worker_id(self) -> str:
cmd_result = self.send_command(
" ; ".join(
[
'for($i=1; $i -le 20 -and "" -ne $err ; $i++){sleep $i; Get-Item C:\ProgramData\Amazon\Deadline\Cache\worker.json -ErrorVariable err 1>$null}',
"$worker=Get-Content -Raw C:\ProgramData\Amazon\Deadline\Cache\worker.json | ConvertFrom-Json",
"echo $worker.worker_id",
]
)
)
assert cmd_result.exit_code == 0, f"Failed to get Worker ID: {cmd_result}"
worker_id = cmd_result.stdout.rstrip("\n\r")
assert re.match(
r"^worker-[0-9a-f]{32}$", worker_id
), f"Got nonvalid Worker ID from command stdout: {cmd_result}"
return worker_id
@dataclass
class WindowsInstanceBuildWorker(WindowsInstanceWorkerBase):
"""
This class represents a Windows EC2 Worker Host.
Any commands must be written in Powershell.
"""
WIN2022_AMI_NAME: ClassVar[str] = "Windows_Server-2022-English-Full-Base"
def configure_worker_command(self, *, config: DeadlineWorkerConfiguration) -> str:
"""Get the command to configure the Worker. This must be run as Administrator."""
cmds = [
"Set-PSDebug -trace 1",
self.configure_worker_common(config=config),
config.worker_agent_install.install_command_for_windows,
*(config.pre_install_commands or []),
# fmt: off
(
"install-deadline-worker "
+ "-y "
+ f"--farm-id {config.farm_id} "
+ f"--fleet-id {config.fleet.id} "
+ f"--region {config.region} "
+ f"--user {config.agent_user} "
+ f"{'--allow-shutdown ' if config.allow_shutdown else ''}"
+ f"{'--disallow-instance-profile ' if config.disallow_instance_profile else ''}"
+ (f"--session-root-dir {config.session_root_dir} " if config.session_root_dir is not None else '')
),
# fmt: on
]
if config.service_model_path:
cmds.append(
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\{config.agent_user}\\.aws\\models -Recurse; "
)
if config.start_service:
cmds.append('Start-Service -Name "DeadlineWorker"')
return "; ".join(cmds)
def userdata(self, s3_files) -> str:
copy_s3_command = ""
job_users_cmds = []
if s3_files:
copy_s3_command = " ; ".join([f"aws s3 cp {s3_uri} {dst}" for s3_uri, dst in s3_files])
if self.configuration.windows_job_users:
for job_user in self.configuration.windows_job_users:
job_users_cmds.append(
f"New-LocalUser -Name {job_user} -Password $password -FullName {job_user} -Description {job_user}"
)
job_users_cmds.append(
f"$Cred = New-Object System.Management.Automation.PSCredential {job_user}, $password"
)
job_users_cmds.append(
'Start-Process cmd.exe -Credential $Cred -ArgumentList "/C" -LoadUserProfile -NoNewWindow'
)
configure_job_users = "\n".join(job_users_cmds)
userdata = f"""<powershell>
$ProgressPreference = 'SilentlyContinue'
Invoke-WebRequest -Uri "https://www.python.org/ftp/python/3.12.9/python-3.12.9-amd64.exe" -OutFile "C:\python-3.12.9-amd64.exe"
$installerHash=(Get-FileHash "C:\python-3.12.9-amd64.exe" -Algorithm "MD5")
$expectedHash="1cfb1bbf96007b12b98db895dcd86487"
if ($installerHash.Hash -ne $expectedHash) {{ throw "Could not verify Python installer." }}
Start-Process -FilePath "C:\python-3.12.9-amd64.exe" -ArgumentList "/quiet InstallAllUsers=1 PrependPath=1 AppendPath=1" -Wait
Invoke-WebRequest -Uri "https://awscli.amazonaws.com/AWSCLIV2.msi" -Outfile "C:\AWSCLIV2.msi"
Start-Process msiexec.exe -ArgumentList "/i C:\AWSCLIV2.msi /quiet" -Wait
$env:Path = [System.Environment]::GetEnvironmentVariable("Path","Machine")
$secret = aws secretsmanager get-secret-value --secret-id WindowsPasswordSecret --query SecretString --output text | ConvertFrom-Json
$password = ConvertTo-SecureString -String $($secret.password) -AsPlainText -Force
{copy_s3_command}
{configure_job_users}
</powershell>"""
return userdata
def ami_ssm_param_name(self) -> str:
# Grab the latest Windows Server 2022 AMI
# https://aws.amazon.com/blogs/mt/query-for-the-latest-windows-ami-using-systems-manager-parameter-store/
ami_ssm_param: str = (
f"/aws/service/ami-windows-latest/{WindowsInstanceBuildWorker.WIN2022_AMI_NAME}"
)
return ami_ssm_param
@dataclass
class PosixInstanceWorkerBase(EC2InstanceWorker):
"""Base class from which posix (i.e. Linux) ec2 test instances are derived.
The methods in this base class are written with two cases of worker hosts in mind:
1. A host that is based on a stock linux AMI, with no Deadline-anything installed, that
must install the worker agent and the like during boot-up.
2. A host that already has the worker agent, job/agent users, and the like baked into
the host AMI in a location & manner that may differ from case (1).
"""
def ebs_devices(self) -> dict[str, int] | None:
"""DeviceName -> VolumeSize (in GiBs) mapping"""
# defaults to 30GB to match SMF, aws gives 8GB by default
return {"/dev/xvda": 30}
def ssm_document_name(self) -> str:
return "AWS-RunShellScript"
def send_command(self, command: str) -> CommandResult:
return super().send_command("set -eou pipefail; " + command)
def _start_worker_agent(self) -> None:
assert self.instance_id
LOG.info(
f"Starting worker for farm: {self.configuration.farm_id} and fleet: {self.configuration.fleet.id}"
)
LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}")
cmd_result = self.send_command(self.configure_worker_command(config=self.configuration))
assert cmd_result.exit_code == 0, f"Failed to configure Worker agent: {cmd_result}"
LOG.info("Successfully configured Worker agent")
if self.configuration.start_service:
LOG.info(
f"Sending SSM command to configure Worker agent on instance {self.instance_id}"
)
self.start_worker_service()
LOG.info("Successfully started worker agent")
self.worker_id = self.get_worker_id()
def configure_agent_user_environment(
self, config: DeadlineWorkerConfiguration
) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root.
This can assume that the agent user exists.
"""
cmds = []
if config.service_model_path:
cmds.append(
f"runuser -l {config.agent_user} -s /bin/bash -c 'aws configure add-model --service-model file://{config.service_model_path}'"
)
allow_instance_profile = os.environ.get("DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE", None)
endpoint_url_deadline = os.environ.get("AWS_ENDPOINT_URL_DEADLINE", None)
# Create a systemd drop-in config file to apply the configuration
# See https://wiki.archlinux.org/title/Systemd#Drop-in_files
cmds.extend(
[
"mkdir -p /etc/systemd/system/deadline-worker.service.d/",
'echo "[Service]" > /etc/systemd/system/deadline-worker.service.d/config.conf',
# Configure the region
f'echo "Environment=AWS_REGION={config.region}" >> /etc/systemd/system/deadline-worker.service.d/config.conf',
f'echo "Environment=AWS_DEFAULT_REGION={config.region}" >> /etc/systemd/system/deadline-worker.service.d/config.conf',
]
)
if allow_instance_profile is not None:
LOG.info(f"Using DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE: {allow_instance_profile}")
cmds.append(
f'echo "Environment=DEADLINE_WORKER_ALLOW_INSTANCE_PROFILE={allow_instance_profile}" >> /etc/systemd/system/deadline-worker.service.d/config.conf',
)
if endpoint_url_deadline is not None:
LOG.info(f"Using AWS_ENDPOINT_URL_DEADLINE: {endpoint_url_deadline}")
cmds.append(
f'echo "Environment=AWS_ENDPOINT_URL_DEADLINE={endpoint_url_deadline}" >> /etc/systemd/system/deadline-worker.service.d/config.conf',
)
if config.no_local_session_logs:
cmds.append(
'echo "Environment=DEADLINE_WORKER_LOCAL_SESSION_LOGS=false" >> /etc/systemd/system/deadline-worker.service.d/config.conf',
)
cmds.append("systemctl daemon-reload")
return " && ".join(cmds)
def start_worker_service(self):
LOG.info("Sending command to start the Worker Agent service")
cmd_result = self.send_command(
" && ".join(
[
"systemctl start deadline-worker",
"sleep 5",
"systemctl is-active deadline-worker",
"if test $? -ne 0; then echo '+++AGENT NOT RUNNING+++'; cat /var/log/amazon/deadline/worker-agent-bootstrap.log /var/log/amazon/deadline/worker-agent.log; exit 1; fi",
]
)
)
assert cmd_result.exit_code == 0, f"Failed to start Worker Agent service: {cmd_result}"
def stop_worker_service(self):
LOG.info("Sending command to stop the Worker Agent service")
cmd_result = self.send_command("systemctl stop deadline-worker")
assert cmd_result.exit_code == 0, f"Failed to stop Worker Agent service: {cmd_result}"
def get_worker_id(self) -> str:
# There can be a race condition, so we may need to wait a little bit for the status file to be written.
worker_state_filename = "/var/lib/deadline/worker.json"
cmd_result = self.send_command(
" && ".join(
[
f"t=0 && while [ $t -le 10 ] && ! (test -f {worker_state_filename}); do sleep $t; t=$[$t+1]; done",
f"cat {worker_state_filename} | jq -r '.worker_id'",
]
)
)
assert cmd_result.exit_code == 0, f"Failed to get Worker ID: {cmd_result}"
worker_id = cmd_result.stdout.rstrip("\n\r")
LOG.info(f"Worker ID: {worker_id}")
assert re.match(
r"^worker-[0-9a-f]{32}$", worker_id
), f"Got nonvalid Worker ID from command stdout: {cmd_result}"
return worker_id
@dataclass
class PosixInstanceBuildWorker(PosixInstanceWorkerBase):
"""
This class represents a Linux EC2 Worker Host.
Any commands must be written in Bash.
"""
AL2023_AMI_NAME: ClassVar[str] = "al2023-ami-kernel-6.1-x86_64"
def configure_worker_command(
self, config: DeadlineWorkerConfiguration
) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root."""
cmds = [
"set -x",
"source /opt/deadline/worker/bin/activate",
f"AWS_DEFAULT_REGION={self.configuration.region}",
config.worker_agent_install.install_command_for_linux,
*(config.pre_install_commands or []),
# fmt: off
(
"install-deadline-worker "
+ "-y "
+ f"--farm-id {config.farm_id} "
+ f"--fleet-id {config.fleet.id} "
+ f"--region {config.region} "
+ f"--user {config.agent_user} "
+ f"--group {config.job_user_group} "
+ f"{'--allow-shutdown ' if config.allow_shutdown else ''}"
+ f"{'--no-install-service ' if config.no_install_service else ''}"
+ f"{'--disallow-instance-profile ' if config.disallow_instance_profile else ''}"
+ (f"--session-root-dir {config.session_root_dir} " if config.session_root_dir is not None else '')
),
# fmt: on
f"runuser --login {self.configuration.agent_user} --command 'echo \"source /opt/deadline/worker/bin/activate\" >> $HOME/.bashrc'",
]
for job_user in self.configuration.job_users:
cmds.append(f"usermod -a -G {job_user.group} {self.configuration.agent_user}")
sudoer_rule_users = ",".join(
[
self.configuration.agent_user,
*[job_user.user for job_user in self.configuration.job_users],
]
)
cmds.append(
f'echo "{self.configuration.agent_user} ALL=({sudoer_rule_users}) NOPASSWD: ALL" > /etc/sudoers.d/{self.configuration.agent_user}'
)
cmds.append(self.configure_agent_user_environment(config))
return " && ".join(cmds)
def userdata(self, s3_files) -> str:
copy_s3_command = ""
job_users_cmds = []
if s3_files:
copy_s3_command = " && ".join(
[f"aws s3 cp {s3_uri} {dst} && chmod o+rx {dst}" for s3_uri, dst in s3_files]
)
for job_user in self.configuration.job_users:
job_users_cmds.append(f"groupadd {job_user.group}")
job_users_cmds.append(
f"useradd --create-home --system --shell=/bin/bash --groups={self.configuration.job_user_group} -g {job_user.group} {job_user.user}"
)
configure_job_users = "\n".join(job_users_cmds)
userdata = f"""#!/bin/bash
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
set -x
groupadd --system {self.configuration.job_user_group}
{configure_job_users}
{copy_s3_command}
mkdir /opt/deadline
python3 -m venv /opt/deadline/worker
"""
return userdata
def ami_ssm_param_name(self) -> str:
# Grab the latest AL2023 AMI
# https://aws.amazon.com/blogs/compute/query-for-the-latest-amazon-linux-ami-ids-using-aws-systems-manager-parameter-store/
ami_ssm_param: str = (
f"/aws/service/ami-amazon-linux-latest/{PosixInstanceBuildWorker.AL2023_AMI_NAME}"
)
return ami_ssm_param
@dataclass
class DockerContainerWorker(DeadlineWorker):
configuration: DeadlineWorkerConfiguration
_container_id: Optional[str] = field(init=False, default=None)
def __post_init__(self) -> None:
# Do not install Worker agent service since it's recommended to avoid systemd usage on Docker containers
self.configuration = replace(self.configuration, no_install_service=True)
def start(self) -> None:
self._tmpdir = pathlib.Path(tempfile.mkdtemp())
assert (
len(self.configuration.job_users) == 1
), f"Multiple job users not supported on Docker worker: {self.configuration.job_users}"
# Environment variables for "run_container.sh"
run_container_env = {
**os.environ,
"FARM_ID": self.configuration.farm_id,
"FLEET_ID": self.configuration.fleet.id,
"AGENT_USER": self.configuration.agent_user,
"SHARED_GROUP": self.configuration.job_user_group,
"JOB_USER": self.configuration.job_users[0].user,
"CONFIGURE_WORKER_AGENT_CMD": self.configure_worker_command(
config=self.configuration,
),
}
LOG.info(f"Staging Docker build context directory {str(self._tmpdir)}")
shutil.copytree(DOCKER_CONTEXT_DIR, str(self._tmpdir), dirs_exist_ok=True)
if self.configuration.file_mappings:
# Stage a special dir with files to copy over to a temp folder in the Docker container
# The container is responsible for copying files from that temp folder into the final destinations
file_mappings_dir = self._tmpdir / "file_mappings"
os.makedirs(str(file_mappings_dir))
# Mapping of files in temp Docker container folder to their final destination
docker_file_mappings: dict[str, str] = {}
for src, dst in self.configuration.file_mappings:
src_file_name = os.path.basename(src)
# The Dockerfile copies the file_mappings dir in the build context to "/file_mappings" in the container
# Build up an array of mappings from "/file_mappings" to their final destination
src_docker_path = posixpath.join("/file_mappings", src_file_name)
assert src_docker_path not in docker_file_mappings, (
"Duplicate paths generated for file mappings. All source files must have unique "
+ f"filenames. Mapping: {self.configuration.file_mappings}"
)
docker_file_mappings[src_docker_path] = dst
# Copy the file over to the stage directory
staged_dst = str(file_mappings_dir / src_file_name)
LOG.info(f"Copying file {src} to {staged_dst}")
shutil.copyfile(src, staged_dst)
run_container_env["FILE_MAPPINGS"] = json.dumps(docker_file_mappings)
# Build and start the container
LOG.info("Starting Docker container")
try:
proc = subprocess.Popen(
args="./run_container.sh",
cwd=str(self._tmpdir),
env=run_container_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
)
# Live logging of Docker build
assert proc.stdout
with proc.stdout:
for line in iter(proc.stdout.readline, ""):
LOG.info(line.rstrip("\r\n"))
except Exception as e: # pragma: no cover
LOG.exception(f"Failed to start Worker agent Docker container: {e}")
_handle_subprocess_error(e)
raise
else:
exit_code = proc.wait(timeout=60)
assert exit_code == 0, f"Process failed with exit code {exit_code}"
# Grab the container ID from --cidfile
try:
self._container_id = subprocess.check_output(
args=["cat", ".container_id"],
cwd=str(self._tmpdir),
text=True,
encoding="utf-8",
timeout=1,
).rstrip("\r\n")
except Exception as e: # pragma: no cover
LOG.exception(f"Failed to get Docker container ID: {e}")
_handle_subprocess_error(e)
raise
else:
LOG.info(f"Started Docker container {self._container_id}")
def stop(self) -> None:
assert (
self._container_id
), "Cannot stop Docker container: Container ID is not set. Has the Docker container been started yet?"
LOG.info(f"Terminating Worker agent process in Docker container {self._container_id}")
try:
self.send_command(f"pkill --signal term -f {self.configuration.agent_user}")
except Exception as e: # pragma: no cover
LOG.exception(f"Failed to terminate Worker agent process: {e}")
raise
else:
LOG.info("Worker agent process terminated")
LOG.info(f"Stopping Docker container {self._container_id}")
try:
subprocess.check_output(
args=["docker", "container", "stop", self._container_id],
cwd=str(self._tmpdir),
text=True,
encoding="utf-8",
timeout=30,
)
except Exception as e: # pragma: noc over
LOG.exception(f"Failed to stop Docker container {self._container_id}: {e}")
_handle_subprocess_error(e)
raise
else:
LOG.info(f"Stopped Docker container {self._container_id}")
self._container_id = None
def configure_worker_command(
self, config: DeadlineWorkerConfiguration
) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root."""