2
2
3
3
import os
4
4
import time
5
+ from collections import deque
5
6
from datetime import datetime , timezone
7
+ from math import ceil
8
+
9
+ import boto3
6
10
import pytz
7
11
8
12
from tap_cloudwatch .exception import InvalidQueryException
9
- from collections import deque
10
- import boto3
11
- from math import ceil
13
+
12
14
13
15
class CloudwatchAPI :
14
16
"""Cloudwatch class for interacting with the API."""
@@ -66,7 +68,7 @@ def _create_client(self, config):
66
68
def _request_more_records ():
67
69
return True
68
70
69
- def split_batch_into_windows (self , start_time , end_time , batch_increment_s ):
71
+ def _split_batch_into_windows (self , start_time , end_time , batch_increment_s ):
70
72
diff_s = end_time - start_time
71
73
total_batches = ceil (diff_s / batch_increment_s )
72
74
batch_windows = []
@@ -81,62 +83,86 @@ def split_batch_into_windows(self, start_time, end_time, batch_increment_s):
81
83
batch_windows .append ((query_start , query_end ))
82
84
return batch_windows
83
85
84
- def validate_query (self , query ):
86
+ def _validate_query (self , query ):
85
87
if "|sort" in query .replace (" " , "" ):
86
88
raise InvalidQueryException ("sort not allowed" )
87
89
if "|limit" in query .replace (" " , "" ):
88
90
raise InvalidQueryException ("limit not allowed" )
89
91
if "stats" in query :
90
92
raise InvalidQueryException ("stats not allowed" )
91
93
if "@timestamp" not in query .split ("|" )[0 ]:
92
- raise InvalidQueryException ("@timestamp field is used as the replication key so it must be selected" )
94
+ raise InvalidQueryException (
95
+ "@timestamp field is used as the replication key so it must be selected"
96
+ )
93
97
94
98
def get_records_iterator (self , bookmark , log_group , query , batch_increment_s ):
95
99
"""Retrieve records from Cloudwatch."""
96
100
end_time = datetime .now (timezone .utc ).timestamp ()
97
101
start_time = bookmark .timestamp ()
98
- self .validate_query (query )
99
- batch_windows = self .split_batch_into_windows (start_time , end_time , batch_increment_s )
102
+ self ._validate_query (query )
103
+ batch_windows = self ._split_batch_into_windows (
104
+ start_time , end_time , batch_increment_s
105
+ )
100
106
101
107
queue = deque ()
102
108
for window in batch_windows :
103
109
if len (queue ) < (self .max_concurrent_queries - 1 ):
104
- queue .append ((self .start_query (window [0 ], window [1 ], log_group , query ), window [0 ], window [1 ]))
110
+ queue .append (
111
+ (
112
+ self ._start_query (window [0 ], window [1 ], log_group , query ),
113
+ window [0 ],
114
+ window [1 ],
115
+ )
116
+ )
105
117
else :
106
118
query_id , start , end = queue .popleft ()
107
- queue .append ((self .start_query (window [0 ], window [1 ], log_group , query ), window [0 ], window [1 ]))
108
- results = self .get_results (log_group , start , end , query , query_id )
119
+ queue .append (
120
+ (
121
+ self ._start_query (window [0 ], window [1 ], log_group , query ),
122
+ window [0 ],
123
+ window [1 ],
124
+ )
125
+ )
126
+ results = self ._get_results (log_group , start , end , query , query_id )
109
127
yield results
110
128
111
129
while len (queue ) > 0 :
112
130
query_id , start , end = queue .popleft ()
113
- results = self .get_results (log_group , start , end , query , query_id )
131
+ results = self ._get_results (log_group , start , end , query , query_id )
114
132
yield results
115
133
116
- def handle_limit_exceeded (self , response , log_group , query_start , query_end , query ):
134
+ def _handle_limit_exceeded (
135
+ self , response , log_group , query_start , query_end , query
136
+ ):
117
137
results = response .get ("results" )
118
138
last_record = results [- 1 ]
119
139
120
- latest_ts_str = [i ["value" ] for i in last_record if i ["field" ] == "@timestamp" ][0 ]
140
+ latest_ts_str = [i ["value" ] for i in last_record if i ["field" ] == "@timestamp" ][
141
+ 0
142
+ ]
121
143
# Include latest ts in query, this could cause duplicates but
122
144
# without it we might miss ties
123
- new_query_start = int (datetime .fromisoformat (latest_ts_str ).replace (tzinfo = pytz .UTC ).timestamp ())
124
- new_query_id = self .start_query (new_query_start , query_end , log_group , query )
125
- return self .get_results (log_group , new_query_start , query_end , query , new_query_id )
145
+ new_query_start = int (
146
+ datetime .fromisoformat (latest_ts_str ).replace (tzinfo = pytz .UTC ).timestamp ()
147
+ )
148
+ new_query_id = self ._start_query (new_query_start , query_end , log_group , query )
149
+ return self ._get_results (
150
+ log_group , new_query_start , query_end , query , new_query_id
151
+ )
126
152
127
- def alter_query (self , query ):
153
+ def _alter_query (self , query ):
128
154
query += " | sort @timestamp asc"
129
155
return query
130
156
131
- def start_query (self , query_start , query_end , log_group , query , prev_start = None ):
157
+ def _start_query (self , query_start , query_end , log_group , query , prev_start = None ):
132
158
self .logger .info (
133
159
(
134
160
"Submitting query for batch from:"
135
161
f" `{ datetime .utcfromtimestamp (query_start ).isoformat ()} UTC` -"
136
162
f" `{ datetime .utcfromtimestamp (query_end ).isoformat ()} UTC`"
137
163
)
138
164
)
139
- query = self .alter_query (query )
165
+ query = self ._alter_query (query )
140
166
start_query_response = self .client .start_query (
141
167
logGroupName = log_group ,
142
168
startTime = query_start ,
@@ -146,7 +172,9 @@ def start_query(self, query_start, query_end, log_group, query, prev_start=None)
146
172
)
147
173
return start_query_response ["queryId" ]
148
174
149
- def get_results (self , log_group , query_start , query_end , query , query_id , prev_start = None ):
175
+ def _get_results (
176
+ self , log_group , query_start , query_end , query , query_id , prev_start = None
177
+ ):
150
178
self .logger .info (
151
179
(
152
180
"Retrieving results for batch from:"
@@ -161,15 +189,19 @@ def get_results(self, log_group, query_start, query_end, query, query_id, prev_s
161
189
if response .get ("ResponseMetadata" , {}).get ("HTTPStatusCode" ) != 200 :
162
190
raise Exception (f"Failed: { response } " )
163
191
result_size = response .get ("statistics" , {}).get ("recordsMatched" )
164
- results = response ['results' ]
165
- self .logger .info (
166
- f"Result set size '{ int (result_size )} ' received."
167
- )
192
+ results = response ["results" ]
193
+ self .logger .info (f"Result set size '{ int (result_size )} ' received." )
168
194
if result_size > self .limit :
169
195
if prev_start == query_start :
170
- raise Exception ("Stuck in a loop, smaller batch still exceeds limit. Reduce batch window." )
196
+ raise Exception (
197
+ "Stuck in a loop, smaller batch still exceeds limit."
198
+ "Reduce batch window."
199
+ )
171
200
self .logger .info (
172
- f"Result set size '{ int (result_size )} ' exceeded limit '{ self .limit } '. Re-running sub-batch..."
201
+ f"Result set size '{ int (result_size )} ' exceeded limit "
202
+ f"'{ self .limit } '. Re-running sub-batch..."
203
+ )
204
+ results += self ._handle_limit_exceeded (
205
+ response , log_group , query_start , query_end , query
173
206
)
174
- results += self .handle_limit_exceeded (response , log_group , query_start , query_end , query )
175
207
return results
0 commit comments