-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcleanup.py
executable file
·228 lines (169 loc) · 7.68 KB
/
cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python3
import argparse
import sys
import time
from typing import Callable
import boto3
import botocore.exceptions
ECS_TASK_QUERY_BATCH_SIZE = 100
ECS_TASK_DEFINITION_DELETE_BATCH_SIZE = 10
def exit_error(message: str) -> None:
print(f"Error: {message}", file=sys.stderr)
sys.exit(1)
def write_warning(message: str) -> None:
print(f"Warning: {message}")
def read_arguments() -> tuple[str, bool, bool]:
# create parser
parser = argparse.ArgumentParser(
description="Amazon Elastic Container Service (ECS) task definition cleanup"
)
parser.add_argument(
"--set-inactive",
choices=["retain-versions", "aggressive"],
const="retain-versions",
nargs="?",
help="unused task definitions set inactive, 'aggressive' targets all definition versions (default: %(const)s)",
)
parser.add_argument(
"--delete-inactive",
action="store_true",
help="delete inactive task definitions",
)
parser.add_argument(
"--commit",
action="store_true",
help="apply changes to ECS, dry run if not provided",
)
arg_list = parser.parse_args()
if (arg_list.set_inactive is None) and (not arg_list.delete_inactive):
exit_error("must supply at least one of --set-inactive or --delete-inactive")
return (arg_list.set_inactive, arg_list.delete_inactive, arg_list.commit)
def dryrun_message(commit: bool) -> str:
if not commit:
return " (DRYRUN)"
return ""
def canonical_task_definition_arn(task_definition_arn: str) -> str:
end_cut = task_definition_arn.rfind(":")
return task_definition_arn[0:end_cut]
def ecs_cluster_arn_list(client) -> list[str]:
resp = client.list_clusters()
return sorted(resp["clusterArns"])
def ecs_cluster_task_arn_list(client, cluster_arn: str) -> list[str]:
resp = client.list_tasks(cluster=cluster_arn)
return resp["taskArns"]
def ecs_cluster_task_definition_arn_list(
client, cluster_arn: str, task_arn_list: list[str]
) -> list[str]:
arn_list = []
while task_arn_list:
# grab maximum batch of `ECS_TASK_QUERY_BATCH_SIZE` ARNs from `task_arn_list`
query_arn_list = task_arn_list[:ECS_TASK_QUERY_BATCH_SIZE]
del task_arn_list[:ECS_TASK_QUERY_BATCH_SIZE]
# query for tasks, add each task definition ARN used onto list
resp = client.describe_tasks(cluster=cluster_arn, tasks=query_arn_list)
for item in resp["tasks"]:
arn_list.append(item["taskDefinitionArn"])
return arn_list
def ecs_task_definition_arn_list(client, status: str) -> list[str]:
arn_list = []
next_token = ""
while True:
# query for next page of task definitions, append results onto return list
resp = client.list_task_definitions(status=status, nextToken=next_token)
arn_list.extend(resp["taskDefinitionArns"])
if not "nextToken" in resp:
# end of results
break
next_token = resp["nextToken"]
return arn_list
def ecs_task_definition_deregister(client, definition_arn: str) -> None:
client.deregister_task_definition(taskDefinition=definition_arn)
def ecs_task_definition_delete(client, definition_arn_list: list[str]) -> None:
client.delete_task_definitions(taskDefinitions=definition_arn_list)
def process_aws_api_batch_throttle(
process_list: list[str], batch_size: int, api_handler: Callable[[list[str]], None]
):
# process items in `process_list` in batches set by `batch_size`, passing batch to `api_handler`
# if `botocore.exceptions.ClientError` throttling exception thrown - pause and retry
while process_list:
try:
# fetch batch of items, pass to handler
api_handler(process_list[:batch_size])
del process_list[:batch_size]
except botocore.exceptions.ClientError as err:
# extract error code, determine if API throttling
error_code = err.response.get("Error", {}).get("Code")
if error_code == "ThrottlingException":
write_warning("API rate limit exceeded - sleeping for a moment")
time.sleep(2)
def main():
# read CLI arguments, create ECS client
(set_inactive_mode, delete_inactive, commit) = read_arguments()
client = boto3.client("ecs")
if set_inactive_mode is not None:
# task: execute active unused task definitions set inactive
print(
f"Setting unused ECS task definitions ACTIVE -> INACTIVE{dryrun_message(commit)}"
)
definition_in_use_arn_list = []
definition_in_use_canonical_arn_set = set()
# process each ECS cluster in turn
for cluster_arn in ecs_cluster_arn_list(client):
print(f"Processing ECS cluster: {cluster_arn}")
# pull task list for cluster and in turn, the in-use task definition for each of those tasks
task_arn_list = ecs_cluster_task_arn_list(client, cluster_arn)
in_use_arn_list = ecs_cluster_task_definition_arn_list(
client, cluster_arn, task_arn_list
)
# add in-use task definition/canonical ARNs to collections
definition_in_use_arn_list.extend(in_use_arn_list)
for definition_arn in in_use_arn_list:
definition_in_use_canonical_arn_set.add(
canonical_task_definition_arn(definition_arn)
)
print(f"Task count: {len(task_arn_list)}")
# fetch list of task definitions currently active
print("\nFetching ACTIVE ECS task definitions")
active_arn_list = ecs_task_definition_arn_list(client, "ACTIVE")
print(f"Definition count: {len(active_arn_list)}")
# determine task definitions not assigned to active ECS cluster tasks
unused_arn_list = []
for active_arn in active_arn_list:
if (set_inactive_mode == "retain-versions") and (
canonical_task_definition_arn(active_arn)
in definition_in_use_canonical_arn_set
):
# canonical ARN for task definition in use by cluster task - do not set INACTIVE
continue
if active_arn not in definition_in_use_arn_list:
unused_arn_list.append(active_arn)
print(f"Unused definition count: {len(unused_arn_list)}\n")
# deregister ECS task definitions determined to be unused by current cluster tasks
def _definition_deregister(batch_list: list[str]):
definition_arn = batch_list[0]
print(f"Deregister: {definition_arn}{dryrun_message(commit)}")
if commit:
ecs_task_definition_deregister(client, definition_arn)
process_aws_api_batch_throttle(unused_arn_list, 1, _definition_deregister)
if delete_inactive:
# task: delete inactive task definitions
print(
f"Mark INACTIVE ECS task definitions for deletion{dryrun_message(commit)}"
)
# fetch list of task definitions currently inactive
print("\nFetching INACTIVE ECS task definitions")
inactive_arn_list = ecs_task_definition_arn_list(client, "INACTIVE")
print(f"Definition count: {len(inactive_arn_list)}")
# delete each task definition currently inactive
def _definition_delete(batch_list: list[str]):
for definition_arn in batch_list:
print(f"Delete: {definition_arn}{dryrun_message(commit)}")
if commit:
ecs_task_definition_delete(client, batch_list)
process_aws_api_batch_throttle(
inactive_arn_list,
ECS_TASK_DEFINITION_DELETE_BATCH_SIZE,
_definition_delete,
)
if __name__ == "__main__":
main()