Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions src/cloudwatch/modules/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, config_helper, dataset_resolver):
self.client = None
self.config = config_helper
self.metric_map = {}
self.last_value = {}
self.last_flush_time = time.time()
self.nan_key_set = set()
self.enable_high_resolution_metrics = config_helper.enable_high_resolution_metrics
Expand Down Expand Up @@ -56,12 +57,24 @@ def _resolve_ds_names(self, value_list):

return ds_names

def _resolve_ds_types(self, value_list):
ds_types = self._dataset_resolver.get_dataset_types(value_list.type)
if not ds_types:
return ['gauge' for i in range(len(value_list.values))]

return ds_types

def _expand_value_list(self, value_list):
if len(value_list.values) == 1:
ds_type = self._resolve_ds_types(value_list)
if value_list.meta:
value_list.meta['ds_type'] = ds_type[0]
else:
value_list.meta = {'ds_type': ds_type[0]}
return [value_list]

expanded = []
for ds_name, value in zip(self._resolve_ds_names(value_list), value_list.values):
for ds_name, ds_type, value in zip(self._resolve_ds_names(value_list), self._resolve_ds_types(value_list), value_list.values):
new_value = value_list.__class__(
host=value_list.host,
plugin=value_list.plugin,
Expand All @@ -73,6 +86,10 @@ def _expand_value_list(self, value_list):
meta=value_list.meta,
values=[value]
)
if new_value.meta:
new_value.meta['ds_type'] = ds_type
else:
new_value.meta = {'ds_type': ds_type}
expanded.append(new_value)

return expanded
Expand Down Expand Up @@ -101,7 +118,8 @@ def _flush_if_need(self, current_time):
if self.config.debug and self.metric_map:
state = ""
for dimension_metrics in self.metric_map:
state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] "
if self.metric_map[dimension_metrics][0].statistics:
state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] "
self._LOGGER.info("[debug] flushing metrics " + state)
self._flush()

Expand Down Expand Up @@ -138,7 +156,8 @@ def _aggregate_metric(self, value_list):
if self.config.debug and self.metric_map:
state = ""
for dimension_metrics in self.metric_map:
state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] "
if self.metric_map[dimension_metrics][0].statistics:
state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] "
self._LOGGER.info("[debug] flushing metrics " + state)
self._flush()
nan_value_count = self._add_metric_to_queue(value_list, adjusted_time, key)
Expand All @@ -150,6 +169,10 @@ def _aggregate_metric(self, value_list):
def _add_metric_to_queue(self, value_list, adjusted_time, key):
nan_value_count = 0
metrics = MetricDataBuilder(self.config, value_list, adjusted_time).build()
if key in self.last_value:
for metric in metrics:
if metric.metric_name in self.last_value[key]:
metric.cumulative(last_update=self.last_value[key][metric.metric_name]['last_update'], last_value=self.last_value[key][metric.metric_name]['last_value'])
nan_value_count = self._add_values_to_metrics(metrics, value_list)
if nan_value_count != len(value_list.values):
self.metric_map[key] = metrics
Expand All @@ -174,7 +197,7 @@ def _add_values_to_metrics(self, dimension_metrics, value_list):
nan_value_count = 0
for value in value_list.values:
if self.is_numerical_value(value):
metric.add_value(value)
metric.add_value(value, value_list.time)
else:
nan_value_count += 1
return nan_value_count
Expand Down Expand Up @@ -207,7 +230,15 @@ def _prepare_batch(self):
metric_batch = []
while self.metric_map:
key, dimension_metrics = self.metric_map.popitem()
if key not in self.last_value:
self.last_value[key] = {}
for metric in dimension_metrics:
self.last_value[key][metric.metric_name] = {
'last_update': metric.last_update,
'last_value': metric.last_value
}
if metric.statistics is None:
continue
if len(metric_batch) < self._MAX_METRICS_PER_PUT_REQUEST:
metric_batch.append(metric)
else:
Expand Down
40 changes: 39 additions & 1 deletion src/cloudwatch/modules/metricdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,40 @@ def __init__(self, metric_name='', unit="", dimensions={}, statistic_values=None
self.timestamp = timestamp
else:
self.timestamp = awsutils.get_aws_timestamp()
self.ds_type = None
self.interval = None
self.last_update = None
self.last_value = None

def add_value(self, value):
def cumulative(self, ds_type=None, interval=None, last_update=None, last_value=None):
if ds_type:
self.ds_type = ds_type
if interval:
self.interval = interval
if last_update:
self.last_update = last_update
if last_value:
self.last_value = last_value

def add_value(self, value, time=None):

if self.ds_type in ['derive', 'counter']:
if self.last_update is None or self.last_value is None or \
self.interval and time - self.last_update > 2*self.interval or \
time - self.last_update == 0:
self.last_update, self.last_value = time, value
return

self.last_update, self.last_value, value = \
time, value, (value - self.last_value)/(time - self.last_update)
if self.ds_type == 'counter':
if value < 0:
value += 2**32 - 1
if value < 0:
value += 2**64 - 2**32
else:
self.last_update, self.last_value = time, value

if not self.statistics:
self.statistics = self.Statistics(value)
else:
Expand Down Expand Up @@ -83,6 +115,9 @@ def __init__(self, config_helper, vl, adjusted_time=None):
self.config = config_helper
self.vl = vl
self.adjusted_time = adjusted_time
self.ds_type = None
if vl.meta and 'ds_type' in vl.meta:
self.ds_type = vl.meta['ds_type']

def build(self):
""" Builds metric data object with name and dimensions but without value or statistics """
Expand All @@ -91,6 +126,9 @@ def build(self):
metric_array.append(MetricDataStatistic(metric_name=self._build_metric_name(), dimensions=self._build_asg_dimension(), timestamp=self._build_timestamp()))
if self.config.push_constant:
metric_array.append(MetricDataStatistic(metric_name=self._build_metric_name(), dimensions=self._build_constant_dimension(), timestamp=self._build_timestamp()))
if self.ds_type:
for metric in metric_array:
metric.cumulative(ds_type=self.ds_type, interval=self.vl.interval)
return metric_array

def _build_timestamp(self):
Expand Down