-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathenable_aws_adaptive_polling.py
More file actions
218 lines (199 loc) · 8.1 KB
/
enable_aws_adaptive_polling.py
File metadata and controls
218 lines (199 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
import argparse
import json
import sys
import urllib.error
import urllib.request
def http_request(method, url, token, body=None):
data = None
if body is not None:
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(url, data=data, method=method)
req.add_header("X-SF-TOKEN", token)
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = resp.read().decode("utf-8")
if payload:
return resp.status, json.loads(payload)
return resp.status, None
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8") if e.fp else ""
raise RuntimeError(f"HTTP {e.code} for {url}: {err_body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Request failed for {url}: {e}") from e
def format_minutes(value, missing_label=None):
if value is None:
return missing_label or ""
try:
numeric = float(value)
except (TypeError, ValueError):
return ""
if numeric >= 1000:
numeric = numeric / 60000
if numeric.is_integer():
return str(int(numeric))
return str(round(numeric, 2))
def print_table(rows, show_inactive_metrics_poll_rate, missing_label=None):
if not rows:
return
if show_inactive_metrics_poll_rate:
headers = ("ID", "NAME", "INACTIVE_METRICS_POLL_RATE")
id_width = max(len(headers[0]), *(len(rid) for rid, _, _ in rows))
name_width = max(len(headers[1]), *(len(name or "") for _, name, _ in rows))
minutes_width = max(
len(headers[2]),
*(len(format_minutes(minutes, missing_label)) for _, _, minutes in rows),
)
print(f"{headers[0].ljust(id_width)} {headers[1].ljust(name_width)} {headers[2].ljust(minutes_width)}")
print(f"{'-' * id_width} {'-' * name_width} {'-' * minutes_width}")
for rid, name, minutes in rows:
print(
f"{rid.ljust(id_width)} "
f"{(name or '').ljust(name_width)} "
f"{format_minutes(minutes, missing_label).ljust(minutes_width)}"
)
return
headers = ("ID", "NAME")
id_width = max(len(headers[0]), *(len(rid) for rid, _, _ in rows))
name_width = max(len(headers[1]), *(len(name or "") for _, name, _ in rows))
print(f"{headers[0].ljust(id_width)} {headers[1].ljust(name_width)}")
print(f"{'-' * id_width} {'-' * name_width}")
for rid, name, _ in rows:
print(
f"{rid.ljust(id_width)} "
f"{(name or '').ljust(name_width)}"
)
def print_updated(rows, total_expected, show_inactive_metrics_poll_rate, target_minutes):
if total_expected and len(rows) == total_expected:
print("All updates succeeded. Adaptive polling is enabled and set to " f"{target_minutes} minutes")
return
if rows:
suffix = "s" if len(rows) != 1 else ""
print(f"Updated {len(rows)} integration{suffix}:")
print_table(rows, show_inactive_metrics_poll_rate)
return
print("Updated 0 integrations")
def main():
parser = argparse.ArgumentParser(
description="Enable adaptive polling for eligible AWSCloudWatch integrations."
)
parser.add_argument(
"domainName",
help=(
"full domain name (e.g. app.us1.signalfx.com)"
),
)
parser.add_argument("apiToken",
help="API token, found on UI in: [My Profile -> Show User API Access Token] OR [Settings -> Access Tokens]")
parser.add_argument(
"--inactiveMetricsPollRateMinutes",
type=int,
default=15,
metavar="minutes",
help="inactive metrics poll rate to set (default: 15)",
)
parser.add_argument(
"--includeDisabled",
action="store_true",
default=False,
help="include disabled integrations (default: false)",
)
parser.add_argument(
"--overrideExisting",
action="store_true",
default=False,
help="override inactive metrics poll rate for integrations which already have adaptive polling configured (default: false)",
)
args = parser.parse_args()
if args.inactiveMetricsPollRateMinutes < 1:
parser.error("--inactiveMetricsPollRateMinutes must be a positive value")
base = f"https://{args.domainName}"
list_url = f"{base}/v2/integration?type=AWSCloudWatch"
_, payload = http_request("GET", list_url, args.apiToken)
results = payload.get("results", []) if isinstance(payload, dict) else []
suffix = "s" if len(results) != 1 else ""
print(f"Found {len(results)} integration{suffix} of type AWSCloudWatch.")
candidates = []
filtered_disabled = []
filtered_inactive_metrics_poll = []
filtered_streaming = []
for item in results:
if not isinstance(item, dict):
continue
if not args.includeDisabled and not item.get("enabled", False):
integ_id = item.get("id")
if integ_id:
filtered_disabled.append(integ_id)
continue
if not args.overrideExisting and "inactiveMetricsPollRate" in item and item.get("inactiveMetricsPollRate") is not None:
integ_id = item.get("id")
if integ_id:
filtered_inactive_metrics_poll.append(integ_id)
continue
if item.get("metricStreamsSyncState") == "ENABLED":
integ_id = item.get("id")
if integ_id:
filtered_streaming.append(integ_id)
continue
candidates.append(item)
remaining = [
(item.get("id"), item.get("name"), item.get("inactiveMetricsPollRate"))
for item in candidates
if item.get("id")
]
remaining_ids = [rid for rid, _, _ in remaining]
if not args.includeDisabled and filtered_disabled:
print(
f"Filtered out disabled integrations ({len(filtered_disabled)}): "
f"{', '.join(filtered_disabled)}"
)
if not args.overrideExisting and filtered_inactive_metrics_poll:
print(
f"Filtered out integrations with inactive metrics poll rate already set ({len(filtered_inactive_metrics_poll)}): "
f"{', '.join(filtered_inactive_metrics_poll)}"
)
if filtered_streaming:
print(
f"Filtered out integrations with metricStreamsSyncState==ENABLED ({len(filtered_streaming)}): "
f"{', '.join(filtered_streaming)}"
)
if remaining:
remaining_suffix = "s" if len(remaining_ids) != 1 else ""
print(f"\n{len(remaining_ids)} integration{remaining_suffix} queued for update: ")
print_table(remaining, args.overrideExisting, missing_label="none")
reply = input("\nProceed with updates? [y/N]: ").strip().lower()
print()
if reply not in ("y", "yes"):
print("Aborted. No updates performed.")
return
else:
print("No integrations meet the provided criteria. No updates performed.")
return
updated = []
update_error = None
try:
for item in candidates:
integ_id = item.get("id")
if not integ_id:
continue
body = dict(item)
body["inactiveMetricsPollRate"] = args.inactiveMetricsPollRateMinutes * 60000
put_url = f"{base}/v2/integration/{integ_id}"
http_request("PUT", put_url, args.apiToken, body=body)
updated.append((integ_id, item.get("name"), args.inactiveMetricsPollRateMinutes))
except Exception as exc:
update_error = exc
raise
finally:
if update_error is not None:
print("Update FAILED before completion.")
print_updated(updated, len(candidates), args.overrideExisting, args.inactiveMetricsPollRateMinutes,)
print_updated(updated, len(candidates), args.overrideExisting, args.inactiveMetricsPollRateMinutes, )
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(f"\nError: {exc}", file=sys.stderr)
sys.exit(1)