diff --git a/src/cloudwatch/modules/flusher.py b/src/cloudwatch/modules/flusher.py index 95ad4b9..2a939ca 100644 --- a/src/cloudwatch/modules/flusher.py +++ b/src/cloudwatch/modules/flusher.py @@ -27,6 +27,7 @@ def __init__(self, config_helper, dataset_resolver): self.client = None self.config = config_helper self.metric_map = {} + self.last_value = {} self.last_flush_time = time.time() self.nan_key_set = set() self.enable_high_resolution_metrics = config_helper.enable_high_resolution_metrics @@ -56,12 +57,24 @@ def _resolve_ds_names(self, value_list): return ds_names + def _resolve_ds_types(self, value_list): + ds_types = self._dataset_resolver.get_dataset_types(value_list.type) + if not ds_types: + return ['gauge' for i in range(len(value_list.values))] + + return ds_types + def _expand_value_list(self, value_list): if len(value_list.values) == 1: + ds_type = self._resolve_ds_types(value_list) + if value_list.meta: + value_list.meta['ds_type'] = ds_type[0] + else: + value_list.meta = {'ds_type': ds_type[0]} return [value_list] expanded = [] - for ds_name, value in zip(self._resolve_ds_names(value_list), value_list.values): + for ds_name, ds_type, value in zip(self._resolve_ds_names(value_list), self._resolve_ds_types(value_list), value_list.values): new_value = value_list.__class__( host=value_list.host, plugin=value_list.plugin, @@ -73,6 +86,10 @@ def _expand_value_list(self, value_list): meta=value_list.meta, values=[value] ) + if new_value.meta: + new_value.meta['ds_type'] = ds_type + else: + new_value.meta = {'ds_type': ds_type} expanded.append(new_value) return expanded @@ -101,7 +118,8 @@ def _flush_if_need(self, current_time): if self.config.debug and self.metric_map: state = "" for dimension_metrics in self.metric_map: - state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] " + if self.metric_map[dimension_metrics][0].statistics: + state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] " self._LOGGER.info("[debug] flushing metrics " + state) self._flush() @@ -138,7 +156,8 @@ def _aggregate_metric(self, value_list): if self.config.debug and self.metric_map: state = "" for dimension_metrics in self.metric_map: - state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] " + if self.metric_map[dimension_metrics][0].statistics: + state += str(dimension_metrics) + "[" + str(self.metric_map[dimension_metrics][0].statistics.sample_count) + "] " self._LOGGER.info("[debug] flushing metrics " + state) self._flush() nan_value_count = self._add_metric_to_queue(value_list, adjusted_time, key) @@ -150,6 +169,10 @@ def _aggregate_metric(self, value_list): def _add_metric_to_queue(self, value_list, adjusted_time, key): nan_value_count = 0 metrics = MetricDataBuilder(self.config, value_list, adjusted_time).build() + if key in self.last_value: + for metric in metrics: + if metric.metric_name in self.last_value[key]: + metric.cumulative(last_update=self.last_value[key][metric.metric_name]['last_update'], last_value=self.last_value[key][metric.metric_name]['last_value']) nan_value_count = self._add_values_to_metrics(metrics, value_list) if nan_value_count != len(value_list.values): self.metric_map[key] = metrics @@ -174,7 +197,7 @@ def _add_values_to_metrics(self, dimension_metrics, value_list): nan_value_count = 0 for value in value_list.values: if self.is_numerical_value(value): - metric.add_value(value) + metric.add_value(value, value_list.time) else: nan_value_count += 1 return nan_value_count @@ -207,7 +230,15 @@ def _prepare_batch(self): metric_batch = [] while self.metric_map: key, dimension_metrics = self.metric_map.popitem() + if key not in self.last_value: + self.last_value[key] = {} for metric in dimension_metrics: + self.last_value[key][metric.metric_name] = { + 'last_update': metric.last_update, + 'last_value': metric.last_value + } + if metric.statistics is None: + continue if len(metric_batch) < self._MAX_METRICS_PER_PUT_REQUEST: metric_batch.append(metric) else: diff --git a/src/cloudwatch/modules/metricdata.py b/src/cloudwatch/modules/metricdata.py index 1669e66..93781ea 100644 --- a/src/cloudwatch/modules/metricdata.py +++ b/src/cloudwatch/modules/metricdata.py @@ -27,8 +27,40 @@ def __init__(self, metric_name='', unit="", dimensions={}, statistic_values=None self.timestamp = timestamp else: self.timestamp = awsutils.get_aws_timestamp() + self.ds_type = None + self.interval = None + self.last_update = None + self.last_value = None - def add_value(self, value): + def cumulative(self, ds_type=None, interval=None, last_update=None, last_value=None): + if ds_type: + self.ds_type = ds_type + if interval: + self.interval = interval + if last_update: + self.last_update = last_update + if last_value: + self.last_value = last_value + + def add_value(self, value, time=None): + + if self.ds_type in ['derive', 'counter']: + if self.last_update is None or self.last_value is None or \ + self.interval and time - self.last_update > 2*self.interval or \ + time - self.last_update == 0: + self.last_update, self.last_value = time, value + return + + self.last_update, self.last_value, value = \ + time, value, (value - self.last_value)/(time - self.last_update) + if self.ds_type == 'counter': + if value < 0: + value += 2**32 - 1 + if value < 0: + value += 2**64 - 2**32 + else: + self.last_update, self.last_value = time, value + if not self.statistics: self.statistics = self.Statistics(value) else: @@ -83,6 +115,9 @@ def __init__(self, config_helper, vl, adjusted_time=None): self.config = config_helper self.vl = vl self.adjusted_time = adjusted_time + self.ds_type = None + if vl.meta and 'ds_type' in vl.meta: + self.ds_type = vl.meta['ds_type'] def build(self): """ Builds metric data object with name and dimensions but without value or statistics """ @@ -91,6 +126,9 @@ def build(self): metric_array.append(MetricDataStatistic(metric_name=self._build_metric_name(), dimensions=self._build_asg_dimension(), timestamp=self._build_timestamp())) if self.config.push_constant: metric_array.append(MetricDataStatistic(metric_name=self._build_metric_name(), dimensions=self._build_constant_dimension(), timestamp=self._build_timestamp())) + if self.ds_type: + for metric in metric_array: + metric.cumulative(ds_type=self.ds_type, interval=self.vl.interval) return metric_array def _build_timestamp(self):