From 51349f035464765ec8c85294570b3c3043ad180b Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Wed, 15 Jan 2025 19:15:04 -0800 Subject: [PATCH 1/7] c --- requirements.txt | 2 +- snuba/web/rpc/common/common.py | 16 +++++++++++++--- .../R_eap_spans/resolver_trace_item_table.py | 2 ++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index fbb75c3d8a..a84a969df2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ python-rapidjson==1.8 redis==4.5.4 sentry-arroyo==2.19.4 sentry-kafka-schemas==0.1.129 -sentry-protos==0.1.49 +sentry-protos==0.1.51 sentry-redis-tools==0.3.0 sentry-relay==0.9.4 sentry-sdk==2.18.0 diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index ad56136d42..a62a9ed995 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -124,7 +124,10 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: ) if attr_key.type == AttributeKey.Type.TYPE_INT: return f.CAST(column(attr_key.name[len("sentry.") :]), "Int64", alias=alias) - if attr_key.type == AttributeKey.Type.TYPE_FLOAT: + if ( + attr_key.type == AttributeKey.Type.TYPE_FLOAT + or attr_key.type == AttributeKey.Type.TYPE_DOUBLE + ): return f.CAST( column(attr_key.name[len("sentry.") :]), "Float64", alias=alias ) @@ -133,7 +136,11 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: ) if attr_key.name in NORMALIZED_COLUMNS: - if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type: + # the second if statement allows Sentry to send TYPE_DOUBLE to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS + if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( + attr_key.type == AttributeKey.Type.TYPE_DOUBLE + and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_FLOAT + ): return column(attr_key.name[len("sentry.") :], alias=attr_key.name) raise BadSnubaRPCRequestException( f"Attribute {attr_key.name} must be requested as {NORMALIZED_COLUMNS[attr_key.name]}, got {attr_key.type}" @@ -144,7 +151,10 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: return SubscriptableReference( alias=alias, column=column("attr_str"), key=literal(attr_key.name) ) - if attr_key.type == AttributeKey.Type.TYPE_FLOAT: + if ( + attr_key.type == AttributeKey.Type.TYPE_FLOAT + or attr_key.type == AttributeKey.Type.TYPE_DOUBLE + ): return SubscriptableReference( alias=alias, column=column("attr_num"), key=literal(attr_key.name) ) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 3640fc35ff..d47d77a215 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -250,6 +250,8 @@ def _convert_results( converters[column.label] = lambda x: AttributeValue(val_int=int(x)) elif column.key.type == AttributeKey.TYPE_FLOAT: converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + elif column.key.type == AttributeKey.TYPE_DOUBLE: + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) elif column.HasField("aggregation"): converters[column.label] = lambda x: AttributeValue(val_float=float(x)) else: From e77192c1ee791708dd6a40f3d0cf0d1e0c7c0a83 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Wed, 15 Jan 2025 21:00:23 -0800 Subject: [PATCH 2/7] ok this method is jank but its still functional --- snuba/web/rpc/common/common.py | 6 +++ snuba/web/rpc/v1/endpoint_get_traces.py | 54 +++++++++++++++++-- .../R_eap_spans/resolver_trace_item_table.py | 5 +- tests/web/rpc/v1/test_endpoint_get_traces.py | 2 +- .../test_endpoint_trace_item_table.py | 46 ++++++++-------- ...endpoint_trace_item_table_extrapolation.py | 20 +++---- 6 files changed, 93 insertions(+), 40 deletions(-) diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index a62a9ed995..f05c9c4605 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -294,6 +294,8 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression v_expression = literal(v.val_str) case "val_float": v_expression = literal(v.val_float) + case "val_double": + v_expression = literal(v.val_double) case "val_int": v_expression = literal(v.val_int) case "val_null": @@ -310,6 +312,10 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression v_expression = literals_array( None, list(map(lambda x: literal(x), v.val_float_array.values)) ) + case "val_double_array": + v_expression = literals_array( + None, list(map(lambda x: literal(x), v.val_double_array.values)) + ) case default: raise NotImplementedError( f"translation of AttributeValue type {default} is not implemented" diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 572ede9fc9..105f74eefb 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -44,6 +44,13 @@ _DEFAULT_ROW_LIMIT = 10_000 _BUFFER_WINDOW = 2 * 3600 # 2 hours + +def _convert_key_to_support_doubles_and_floats_for_backward_compat( + key: TraceAttribute.Key.ValueType, +) -> TraceAttribute.Key.ValueType: + return TraceAttribute.Key.ValueType(-1 * key) + + _ATTRIBUTES: dict[ TraceAttribute.Key.ValueType, tuple[str, AttributeKey.Type.ValueType], @@ -69,6 +76,16 @@ AttributeKey.Type.TYPE_STRING, ), } +# for every AttributeKey of TYPE_FLOAT a user may add during the backward compat period, this adds the TYPE_DOUBLE equivalent +_attributes_backward_compat = dict() +for k in _ATTRIBUTES: + v = _ATTRIBUTES[k] + if v[1] == AttributeKey.Type.TYPE_FLOAT: + _attributes_backward_compat[ + _convert_key_to_support_doubles_and_floats_for_backward_compat(k) + ] = (v[0], AttributeKey.Type.TYPE_DOUBLE) +_ATTRIBUTES.update(_attributes_backward_compat) + _TYPES_TO_CLICKHOUSE: dict[ AttributeKey.Type.ValueType, tuple[str, Callable[[Any], AttributeValue]], @@ -85,6 +102,10 @@ "Float64", lambda x: AttributeValue(val_float=float(x)), ), + AttributeKey.Type.TYPE_DOUBLE: ( + "Float64", + lambda x: AttributeValue(val_double=float(x)), + ), } @@ -102,11 +123,19 @@ def _attribute_to_expression( alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key == TraceAttribute.Key.KEY_START_TIMESTAMP: - attribute = _ATTRIBUTES[trace_attribute.key] + attribute = ( + _ATTRIBUTES[ + _convert_key_to_support_doubles_and_floats_for_backward_compat( + trace_attribute.key + ) + ] + if trace_attribute.type == AttributeKey.Type.TYPE_DOUBLE + else _ATTRIBUTES[trace_attribute.key] + ) return f.cast( f.min(column("start_timestamp")), _TYPES_TO_CLICKHOUSE[attribute[1]][0], - alias=_ATTRIBUTES[trace_attribute.key][0], + alias=attribute[0], ) if trace_attribute.key == TraceAttribute.Key.KEY_ROOT_SPAN_NAME: # TODO: Change to return the root span name instead of the trace's first span's name. @@ -116,7 +145,15 @@ def _attribute_to_expression( alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key in _ATTRIBUTES: - attribute = _ATTRIBUTES[trace_attribute.key] + attribute = ( + _ATTRIBUTES[ + _convert_key_to_support_doubles_and_floats_for_backward_compat( + trace_attribute.key + ) + ] + if trace_attribute.type == AttributeKey.Type.TYPE_DOUBLE + else _ATTRIBUTES[trace_attribute.key] + ) return f.cast( column(attribute[0]), _TYPES_TO_CLICKHOUSE[attribute[1]][0], @@ -165,8 +202,15 @@ def _convert_results( TraceAttribute, ] = defaultdict(TraceAttribute) for attribute in request.attributes: - value = row[_ATTRIBUTES[attribute.key][0]] - type = _ATTRIBUTES[attribute.key][1] + backward_compat_attribute_key = ( + _convert_key_to_support_doubles_and_floats_for_backward_compat( + attribute.key + ) + if attribute.type == AttributeKey.Type.TYPE_DOUBLE + else attribute.key + ) + value = row[_ATTRIBUTES[backward_compat_attribute_key][0]] + type = _ATTRIBUTES[backward_compat_attribute_key][1] values[attribute.key] = TraceAttribute( key=attribute.key, value=_TYPES_TO_CLICKHOUSE[type][1](value), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index d47d77a215..06485d1bfd 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -253,7 +253,10 @@ def _convert_results( elif column.key.type == AttributeKey.TYPE_DOUBLE: converters[column.label] = lambda x: AttributeValue(val_double=float(x)) elif column.HasField("aggregation"): - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + if column.key.type == AttributeKey.TYPE_FLOAT: + converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + if column.key.type == AttributeKey.TYPE_DOUBLE: + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) else: raise BadSnubaRPCRequestException( "column is neither an attribute or aggregation" diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index ed1d733186..2505fd63a0 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -413,7 +413,7 @@ def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None: key=TraceAttribute.Key.KEY_START_TIMESTAMP, type=AttributeKey.TYPE_FLOAT, value=AttributeValue( - val_float=start_timestamp_per_trace_id[ + val_double=start_timestamp_per_trace_id[ trace_id_per_start_timestamp[start_timestamp] ], ), diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index 16424f79c7..0049f8915e 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -290,21 +290,21 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, + type=AttributeKey.TYPE_DOUBLE, name="eap.measurement", ), op=ComparisonFilter.OP_LESS_THAN_OR_EQUALS, - value=AttributeValue(val_float=101), + value=AttributeValue(val_double=101), ), ), TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, + type=AttributeKey.TYPE_DOUBLE, name="eap.measurement", ), op=ComparisonFilter.OP_GREATER_THAN, - value=AttributeValue(val_float=999), + value=AttributeValue(val_double=999), ), ), ] @@ -527,7 +527,7 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_MAX, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="max(my.float.field)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -537,7 +537,7 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="avg(my.float.field)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -567,17 +567,17 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: TraceItemColumnValues( attribute_name="max(my.float.field)", results=[ - AttributeValue(val_float=101.2), - AttributeValue(val_float=101.2), - AttributeValue(val_float=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), ], ), TraceItemColumnValues( attribute_name="avg(my.float.field)", results=[ - AttributeValue(val_float=101.2), - AttributeValue(val_float=101.2), - AttributeValue(val_float=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), ], ), ] @@ -603,7 +603,7 @@ def test_table_with_columns_not_in_groupby(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_MAX, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="max(my.float.field)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -651,7 +651,7 @@ def test_order_by_non_selected(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="eap.measurement" + type=AttributeKey.TYPE_DOUBLE, name="eap.measurement" ), label="avg(eap.measurment)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -665,7 +665,7 @@ def test_order_by_non_selected(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_MAX, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="max(my.float.field)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -706,7 +706,7 @@ def test_order_by_aggregation(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="eap.measurement" + type=AttributeKey.TYPE_DOUBLE, name="eap.measurement" ), label="avg(eap.measurment)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -720,7 +720,7 @@ def test_order_by_aggregation(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="eap.measurement" + type=AttributeKey.TYPE_DOUBLE, name="eap.measurement" ), label="avg(eap.measurment)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -731,7 +731,7 @@ def test_order_by_aggregation(self, setup_teardown: Any) -> None: limit=5, ) response = EndpointTraceItemTable().execute(message) - measurements = [v.val_float for v in response.column_values[1].results] + measurements = [v.val_double for v in response.column_values[1].results] assert sorted(measurements) == measurements def test_aggregation_on_attribute_column(self) -> None: @@ -768,7 +768,7 @@ def test_aggregation_on_attribute_column(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="custom_measurement" + type=AttributeKey.TYPE_DOUBLE, name="custom_measurement" ), label="avg(custom_measurement)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -779,7 +779,7 @@ def test_aggregation_on_attribute_column(self) -> None: limit=5, ) response = EndpointTraceItemTable().execute(message) - measurement_avg = [v.val_float for v in response.column_values[0].results][0] + measurement_avg = [v.val_double for v in response.column_values[0].results][0] assert measurement_avg == 420 def test_different_column_label_and_attr_name(self, setup_teardown: Any) -> None: @@ -804,7 +804,7 @@ def test_different_column_label_and_attr_name(self, setup_teardown: Any) -> None aggregation=AttributeAggregation( aggregate=Function.FUNCTION_COUNT, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="sentry.duration_ms" + type=AttributeKey.TYPE_DOUBLE, name="sentry.duration_ms" ), ), label="count()", @@ -1444,7 +1444,7 @@ def test_apply_labels_to_columns(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="custom_measurement" + type=AttributeKey.TYPE_DOUBLE, name="custom_measurement" ), label="avg(custom_measurement)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -1454,7 +1454,7 @@ def test_apply_labels_to_columns(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_AVG, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="custom_measurement" + type=AttributeKey.TYPE_DOUBLE, name="custom_measurement" ), label="avg(custom_measurement_2)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py index 2592fa0c01..d423621b25 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py @@ -208,15 +208,15 @@ def test_aggregation_on_attribute_column(self) -> None: limit=5, ) response = EndpointTraceItemTable().execute(message) - measurement_sum = [v.val_float for v in response.column_values[0].results][0] - measurement_avg = [v.val_float for v in response.column_values[1].results][0] + measurement_sum = [v.val_double for v in response.column_values[0].results][0] + measurement_avg = [v.val_double for v in response.column_values[1].results][0] measurement_count_custom_measurement = [ - v.val_float for v in response.column_values[2].results + v.val_double for v in response.column_values[2].results ][0] measurement_count_duration = [ - v.val_float for v in response.column_values[3].results + v.val_double for v in response.column_values[3].results ][0] - measurement_p90 = [v.val_float for v in response.column_values[4].results][0] + measurement_p90 = [v.val_double for v in response.column_values[4].results][0] assert measurement_sum == 98 # weighted sum - 0*1 + 1*2 + 2*4 + 3*8 + 4*16 assert ( abs(measurement_avg - 3.16129032) < 0.000001 @@ -279,7 +279,7 @@ def test_count_reliability(self) -> None: limit=5, ) response = EndpointTraceItemTable().execute(message) - measurement_count = [v.val_float for v in response.column_values[0].results][0] + measurement_count = [v.val_double for v in response.column_values[0].results][0] measurement_reliability = [v for v in response.column_values[0].reliabilities][ 0 ] @@ -383,7 +383,7 @@ def test_count_reliability_with_group_by(self) -> None: measurement_tags = [v.val_str for v in response.column_values[0].results] assert measurement_tags == ["foo", "bar"] - measurement_sums = [v.val_float for v in response.column_values[1].results] + measurement_sums = [v.val_double for v in response.column_values[1].results] measurement_reliabilities = [v for v in response.column_values[1].reliabilities] assert measurement_sums == [sum(range(5)), 0] assert measurement_reliabilities == [ @@ -391,7 +391,7 @@ def test_count_reliability_with_group_by(self) -> None: Reliability.RELIABILITY_UNSPECIFIED, ] # low reliability due to low sample count - measurement_avgs = [v.val_float for v in response.column_values[2].results] + measurement_avgs = [v.val_double for v in response.column_values[2].results] measurement_reliabilities = [v for v in response.column_values[2].reliabilities] assert len(measurement_avgs) == 2 assert measurement_avgs[0] == sum(range(5)) / 5 @@ -401,7 +401,7 @@ def test_count_reliability_with_group_by(self) -> None: Reliability.RELIABILITY_UNSPECIFIED, ] # low reliability due to low sample count - measurement_counts = [v.val_float for v in response.column_values[3].results] + measurement_counts = [v.val_double for v in response.column_values[3].results] measurement_reliabilities = [v for v in response.column_values[3].reliabilities] assert measurement_counts == [5, 0] assert measurement_reliabilities == [ @@ -409,7 +409,7 @@ def test_count_reliability_with_group_by(self) -> None: Reliability.RELIABILITY_UNSPECIFIED, ] # low reliability due to low sample count - measurement_p90s = [v.val_float for v in response.column_values[4].results] + measurement_p90s = [v.val_double for v in response.column_values[4].results] measurement_reliabilities = [v for v in response.column_values[4].reliabilities] assert len(measurement_p90s) == 2 assert measurement_p90s[0] == 4 From 31232bb216db503157f0c042b6be2d88c837bb6f Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Thu, 16 Jan 2025 11:43:04 -0800 Subject: [PATCH 3/7] aggregation bug --- snuba/web/rpc/common/common.py | 1 + .../v1/endpoint_trace_item_attribute_names.py | 1 + .../R_eap_spans/resolver_trace_item_table.py | 13 +- tests/web/rpc/v1/test_endpoint_get_traces.py | 120 +++++++++++++++++- 4 files changed, 132 insertions(+), 3 deletions(-) diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index f05c9c4605..3bd67d6344 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -137,6 +137,7 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: if attr_key.name in NORMALIZED_COLUMNS: # the second if statement allows Sentry to send TYPE_DOUBLE to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS + print("attr_keyyy", attr_key.name, attr_key.type) if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( attr_key.type == AttributeKey.Type.TYPE_DOUBLE and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_FLOAT diff --git a/snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py b/snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py index 187f67708a..5a52baddb9 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py @@ -55,6 +55,7 @@ def convert_to_snuba_request(req: TraceItemAttributeNamesRequest) -> SnubaReques ) elif req.type in ( AttributeKey.Type.TYPE_FLOAT, + AttributeKey.Type.TYPE_DOUBLE, AttributeKey.Type.TYPE_INT, AttributeKey.Type.TYPE_BOOLEAN, ): diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 06485d1bfd..38e4457e1b 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -242,6 +242,7 @@ def _convert_results( for column in request.columns: if column.HasField("key"): + print("columnnnnkey", column, column.key, column.key.type) if column.key.type == AttributeKey.TYPE_BOOLEAN: converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) elif column.key.type == AttributeKey.TYPE_STRING: @@ -253,15 +254,20 @@ def _convert_results( elif column.key.type == AttributeKey.TYPE_DOUBLE: converters[column.label] = lambda x: AttributeValue(val_double=float(x)) elif column.HasField("aggregation"): - if column.key.type == AttributeKey.TYPE_FLOAT: + print("columnnn", column) + print("columnnnn.key", column.key) + print("columnnn.key.type", column.key.type) + if column.aggregation.key.type == AttributeKey.TYPE_FLOAT: converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - if column.key.type == AttributeKey.TYPE_DOUBLE: + if column.aggregation.key.type == AttributeKey.TYPE_DOUBLE: converters[column.label] = lambda x: AttributeValue(val_double=float(x)) else: raise BadSnubaRPCRequestException( "column is neither an attribute or aggregation" ) + print("covertersss", converters) + res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) for row in data: for column_name, value in row.items(): @@ -299,13 +305,16 @@ def trace_item_type(cls) -> TraceItemType.ValueType: return TraceItemType.TRACE_ITEM_TYPE_SPAN def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: + print("in_msgggg", in_msg) snuba_request = _build_snuba_request(in_msg) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), request=snuba_request, timer=self._timer, ) + print("ressss", res) column_values = _convert_results(in_msg, res.result.get("data", [])) + print("column_valuesss", column_values) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 2505fd63a0..2cd34a284c 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -336,7 +336,9 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: ) assert MessageToDict(response) == MessageToDict(expected_response) - def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None: + def test_with_data_and_aggregated_fields_backward_compat( + self, setup_teardown: Any + ) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) @@ -412,6 +414,122 @@ def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None: TraceAttribute( key=TraceAttribute.Key.KEY_START_TIMESTAMP, type=AttributeKey.TYPE_FLOAT, + value=AttributeValue( + val_float=start_timestamp_per_trace_id[ + trace_id_per_start_timestamp[start_timestamp] + ], + ), + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT, + type=AttributeKey.TYPE_INT, + value=AttributeValue( + val_int=_SPAN_COUNT // len(_TRACE_IDS), + ), + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT, + type=AttributeKey.TYPE_INT, + value=AttributeValue( + val_int=(_SPAN_COUNT // len(_TRACE_IDS)) - 1, + ), + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_ROOT_SPAN_NAME, + type=AttributeKey.TYPE_STRING, + value=AttributeValue( + val_str="root", + ), + ), + ], + ) + for start_timestamp in reversed( + sorted(trace_id_per_start_timestamp.keys()) + ) + ], + page_token=PageToken(offset=len(_TRACE_IDS)), + meta=ResponseMeta(request_id=_REQUEST_ID), + ) + for start_timestamp in reversed(sorted(trace_id_per_start_timestamp.keys())): + print(start_timestamp, trace_id_per_start_timestamp[start_timestamp]) + assert MessageToDict(response) == MessageToDict(expected_response) + + def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) + three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) + start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) + for s in _SPANS: + start_timestamp_per_trace_id[s["trace_id"]] = min( + start_timestamp_per_trace_id[s["trace_id"]], + s["start_timestamp_precise"], + ) + trace_id_per_start_timestamp: dict[float, str] = { + timestamp: trace_id + for trace_id, timestamp in start_timestamp_per_trace_id.items() + } + message = GetTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=Timestamp(seconds=three_hours_later), + request_id=_REQUEST_ID, + ), + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_START_TIMESTAMP, + type=AttributeKey.TYPE_DOUBLE, + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT, + type=AttributeKey.TYPE_INT, + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT, + type=AttributeKey.TYPE_INT, + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_ROOT_SPAN_NAME, + type=AttributeKey.TYPE_STRING, + ), + ], + filters=[ + GetTracesRequest.TraceFilter( + item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="sentry.op", + type=AttributeKey.TYPE_STRING, + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_str="db"), + ), + ), + ), + ], + ) + response = EndpointGetTraces().execute(message) + expected_response = GetTracesResponse( + traces=[ + GetTracesResponse.Trace( + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.TYPE_STRING, + value=AttributeValue( + val_str=trace_id_per_start_timestamp[start_timestamp], + ), + ), + TraceAttribute( + key=TraceAttribute.Key.KEY_START_TIMESTAMP, + type=AttributeKey.TYPE_DOUBLE, value=AttributeValue( val_double=start_timestamp_per_trace_id[ trace_id_per_start_timestamp[start_timestamp] From ddd872fa9ee9682588be1ca294256f6a67bc1954 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Thu, 16 Jan 2025 14:57:09 -0800 Subject: [PATCH 4/7] 3steps --- snuba/web/rpc/common/common.py | 14 ++++---- .../R_eap_spans/resolver_trace_item_table.py | 8 +---- .../test_endpoint_trace_item_table.py | 33 +++++++++++++++++++ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 3bd67d6344..2495d37ad4 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -82,12 +82,12 @@ def transform(exp: Expression) -> Expression: "sentry.segment_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage "sentry.segment_name": AttributeKey.Type.TYPE_STRING, "sentry.is_segment": AttributeKey.Type.TYPE_BOOLEAN, - "sentry.duration_ms": AttributeKey.Type.TYPE_FLOAT, - "sentry.exclusive_time_ms": AttributeKey.Type.TYPE_FLOAT, + "sentry.duration_ms": AttributeKey.Type.TYPE_DOUBLE, + "sentry.exclusive_time_ms": AttributeKey.Type.TYPE_DOUBLE, "sentry.retention_days": AttributeKey.Type.TYPE_INT, "sentry.name": AttributeKey.Type.TYPE_STRING, - "sentry.sampling_weight": AttributeKey.Type.TYPE_FLOAT, - "sentry.sampling_factor": AttributeKey.Type.TYPE_FLOAT, + "sentry.sampling_weight": AttributeKey.Type.TYPE_DOUBLE, + "sentry.sampling_factor": AttributeKey.Type.TYPE_DOUBLE, "sentry.timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, "sentry.start_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, "sentry.end_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, @@ -136,11 +136,11 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: ) if attr_key.name in NORMALIZED_COLUMNS: - # the second if statement allows Sentry to send TYPE_DOUBLE to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS + # the second if statement allows Sentry to send TYPE_FLOAT to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS print("attr_keyyy", attr_key.name, attr_key.type) if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( - attr_key.type == AttributeKey.Type.TYPE_DOUBLE - and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_FLOAT + attr_key.type == AttributeKey.Type.TYPE_FLOAT + and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_DOUBLE ): return column(attr_key.name[len("sentry.") :], alias=attr_key.name) raise BadSnubaRPCRequestException( diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 38e4457e1b..79c19c58f7 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -254,13 +254,7 @@ def _convert_results( elif column.key.type == AttributeKey.TYPE_DOUBLE: converters[column.label] = lambda x: AttributeValue(val_double=float(x)) elif column.HasField("aggregation"): - print("columnnn", column) - print("columnnnn.key", column.key) - print("columnnn.key.type", column.key.type) - if column.aggregation.key.type == AttributeKey.TYPE_FLOAT: - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - if column.aggregation.key.type == AttributeKey.TYPE_DOUBLE: - converters[column.label] = lambda x: AttributeValue(val_double=float(x)) + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) else: raise BadSnubaRPCRequestException( "column is neither an attribute or aggregation" diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index 0049f8915e..b3a36fd241 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -555,6 +555,39 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: limit=5, ) response = EndpointTraceItemTable().execute(message) + + print("responseee", response.column_values) + + print( + "RESULTSSS", + [ + TraceItemColumnValues( + attribute_name="location", + results=[ + AttributeValue(val_str="backend"), + AttributeValue(val_str="frontend"), + AttributeValue(val_str="mobile"), + ], + ), + TraceItemColumnValues( + attribute_name="max(my.float.field)", + results=[ + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + ], + ), + TraceItemColumnValues( + attribute_name="avg(my.float.field)", + results=[ + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + AttributeValue(val_double=101.2), + ], + ), + ], + ) + assert response.column_values == [ TraceItemColumnValues( attribute_name="location", From 298b46c14a6891cd2e7f5f5e6ebcaee1c6617abf Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Fri, 17 Jan 2025 00:04:27 -0800 Subject: [PATCH 5/7] round --- .../logical/calculated_average_processor.py | 2 ++ .../logical/filter_in_select_optimizer.py | 3 ++ .../logical/optional_attribute_aggregation.py | 1 + snuba/web/db_query.py | 4 +++ .../R_eap_spans/common/aggregation.py | 2 +- .../R_eap_spans/resolver_trace_item_table.py | 4 ++- tests/web/rpc/test_common.py | 14 +++++++++ ...est_endpoint_trace_item_attribute_names.py | 31 ++++++++++++++++++- .../test_endpoint_trace_item_table.py | 1 + ...endpoint_trace_item_table_extrapolation.py | 2 +- 10 files changed, 60 insertions(+), 4 deletions(-) diff --git a/snuba/query/processors/logical/calculated_average_processor.py b/snuba/query/processors/logical/calculated_average_processor.py index d696b9cc12..c1d658f89b 100644 --- a/snuba/query/processors/logical/calculated_average_processor.py +++ b/snuba/query/processors/logical/calculated_average_processor.py @@ -16,6 +16,7 @@ class CalculatedAverageProcessor(LogicalQueryProcessor): """ def process_query(self, query: Query, query_settings: QuerySettings) -> None: + print("huh_process_query") # use a matcher to find something like avg(value) matcher = FunctionCallMatch( Or([String("avg"), String("avgIf")]), @@ -24,6 +25,7 @@ def process_query(self, query: Query, query_settings: QuerySettings) -> None: ) def transform_expression(exp: Expression) -> Expression: + print("huh_transform_expression") match = matcher.match(exp) if isinstance(exp, FunctionCall) and match is not None: suffix = "If" if exp.function_name == "avgIf" else "" diff --git a/snuba/query/processors/logical/filter_in_select_optimizer.py b/snuba/query/processors/logical/filter_in_select_optimizer.py index e5766af27d..6391740365 100644 --- a/snuba/query/processors/logical/filter_in_select_optimizer.py +++ b/snuba/query/processors/logical/filter_in_select_optimizer.py @@ -45,6 +45,7 @@ class FindConditionalAggregateFunctionsVisitor( """ def __init__(self) -> None: + print("huh_FindConditionalAggregateFunctionsVisitor") self._matches: list[FunctionCall | CurriedFunctionCall] = [] def visit_literal(self, exp: Literal) -> list[FunctionCall | CurriedFunctionCall]: @@ -100,6 +101,7 @@ class FilterInSelectOptimizer(LogicalQueryProcessor): """ def process_query(self, query: LogicalQuery, query_settings: QuerySettings) -> None: + print("huh_FilterInSelectOptimizer") try: new_condition = self.get_select_filter(query) except Exception: @@ -115,6 +117,7 @@ def get_select_filter( self, query: LogicalQuery | CompositeQuery[QueryEntity], ) -> FunctionCall | None: + print("huh_get_select_filter") """ Given a query, grabs all the conditions from conditional aggregates and lifts into one condition. diff --git a/snuba/query/processors/logical/optional_attribute_aggregation.py b/snuba/query/processors/logical/optional_attribute_aggregation.py index d69c5a03c9..67636f33a1 100644 --- a/snuba/query/processors/logical/optional_attribute_aggregation.py +++ b/snuba/query/processors/logical/optional_attribute_aggregation.py @@ -32,6 +32,7 @@ def __init__( aggregation_names: list[str], curried_aggregation_names: list[str], ): + print("huh_OptionalAttributeAggregationTransformer") self._attribute_column_names = attribute_column_names self._aggregation_names = aggregation_names self._curried_aggregation_names = curried_aggregation_names diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index 45a28af589..b183369cc9 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -176,6 +176,8 @@ def execute_query( # Apply clickhouse query setting overrides clickhouse_query_settings.update(query_settings.get_clickhouse_settings()) + print("formatted_queryyyy", formatted_query) + result = reader.execute( formatted_query, clickhouse_query_settings, @@ -183,6 +185,8 @@ def execute_query( robust=robust, ) + print("resulttttttt", result) + timer.mark("execute") stats.update( { diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py index 67d530f725..f48b8e7518 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py @@ -628,7 +628,7 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: Function.FUNCTION_P90: cf.quantile(0.9)(field, **alias_dict), Function.FUNCTION_P95: cf.quantile(0.95)(field, **alias_dict), Function.FUNCTION_P99: cf.quantile(0.99)(field, **alias_dict), - Function.FUNCTION_AVG: f.avg(field, **alias_dict), + Function.FUNCTION_AVG: f.round(f.avg(field), 9, **alias_dict), Function.FUNCTION_MAX: f.max(field, **alias_dict), Function.FUNCTION_MIN: f.min(field, **alias_dict), Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 79c19c58f7..99b14d296f 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -148,8 +148,10 @@ def _build_query(request: TraceItemTableRequest) -> Query: ) elif column.HasField("aggregation"): function_expr = aggregation_to_expression(column.aggregation) + print("function_expr_1", function_expr) # aggregation label may not be set and the column label takes priority anyways. function_expr = replace(function_expr, alias=column.label) + print("function_expr_2", function_expr) selected_columns.append( SelectedExpression(name=column.label, expression=function_expr) ) @@ -186,7 +188,7 @@ def _build_query(request: TraceItemTableRequest) -> Query: raise BadSnubaRPCRequestException( "Column is neither an aggregate or an attribute" ) - + print("selected_columnsss", selected_columns) res = Query( from_clause=entity, selected_columns=selected_columns, diff --git a/tests/web/rpc/test_common.py b/tests/web/rpc/test_common.py index ab36e49d06..5bcc083fb2 100644 --- a/tests/web/rpc/test_common.py +++ b/tests/web/rpc/test_common.py @@ -43,6 +43,14 @@ def test_timestamp_columns(self) -> None: ) == f.CAST( column(col[len("sentry.") :]), "Float64", alias=col + "_TYPE_FLOAT" ) + assert attribute_key_to_expression( + AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name=col, + ), + ) == f.CAST( + column(col[len("sentry.") :]), "Float64", alias=col + "_TYPE_DOUBLE" + ) def test_normalized_col(self) -> None: for col in [ @@ -71,6 +79,12 @@ def test_attributes(self) -> None: alias="derp_TYPE_FLOAT", column=column("attr_num"), key=literal("derp") ) + assert attribute_key_to_expression( + AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="derp"), + ) == SubscriptableReference( + alias="derp_TYPE_DOUBLE", column=column("attr_num"), key=literal("derp") + ) + assert attribute_key_to_expression( AttributeKey(type=AttributeKey.TYPE_INT, name="derp"), ) == f.CAST( diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py b/tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py index a824966a3b..6db0824772 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py @@ -114,7 +114,7 @@ def test_basic(self) -> None: ) assert res.attributes == expected - def test_simple_float(self) -> None: + def test_simple_float_backward_compat(self) -> None: req = TraceItemAttributeNamesRequest( meta=RequestMeta( project_ids=[1, 2, 3], @@ -143,6 +143,35 @@ def test_simple_float(self) -> None: ) assert res.attributes == expected + def test_simple_double(self) -> None: + req = TraceItemAttributeNamesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int((BASE_TIME - timedelta(days=1)).timestamp()) + ), + end_timestamp=Timestamp( + seconds=int((BASE_TIME + timedelta(days=1)).timestamp()) + ), + ), + limit=TOTAL_GENERATED_ATTR_PER_TYPE, + type=AttributeKey.Type.TYPE_DOUBLE, + value_substring_match="", + ) + res = EndpointTraceItemAttributeNames().execute(req) + expected = [] + for i in range(TOTAL_GENERATED_ATTR_PER_TYPE): + expected.append( + TraceItemAttributeNamesResponse.Attribute( + name=f"b_measurement_{str(i).zfill(3)}", + type=AttributeKey.Type.TYPE_DOUBLE, + ) + ) + assert res.attributes == expected + def test_with_filter(self) -> None: req = TraceItemAttributeNamesRequest( meta=RequestMeta( diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index b3a36fd241..19b97180f3 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -502,6 +502,7 @@ def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: def test_table_with_aggregates(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(BASE_TIME.timestamp())) hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + # breakpoint() message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1, 2, 3], diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py index d423621b25..9a59a653a1 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py @@ -187,7 +187,7 @@ def test_aggregation_on_attribute_column(self) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_COUNT, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="sentry.duration_ms" + type=AttributeKey.TYPE_DOUBLE, name="sentry.duration_ms" ), label="count(sentry.duration_ms)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, From 628c37184ad9893257d9d948303d6ef4606bef1b Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Fri, 17 Jan 2025 01:46:12 -0800 Subject: [PATCH 6/7] round --- .../logical/calculated_average_processor.py | 2 - .../logical/filter_in_select_optimizer.py | 3 - .../logical/optional_attribute_aggregation.py | 1 - snuba/web/db_query.py | 4 - snuba/web/rpc/common/common.py | 1 - .../R_eap_spans/common/aggregation.py | 6 +- .../R_eap_spans/resolver_trace_item_table.py | 9 --- .../test_endpoint_trace_item_table.py | 80 +++++++++---------- 8 files changed, 44 insertions(+), 62 deletions(-) diff --git a/snuba/query/processors/logical/calculated_average_processor.py b/snuba/query/processors/logical/calculated_average_processor.py index c1d658f89b..d696b9cc12 100644 --- a/snuba/query/processors/logical/calculated_average_processor.py +++ b/snuba/query/processors/logical/calculated_average_processor.py @@ -16,7 +16,6 @@ class CalculatedAverageProcessor(LogicalQueryProcessor): """ def process_query(self, query: Query, query_settings: QuerySettings) -> None: - print("huh_process_query") # use a matcher to find something like avg(value) matcher = FunctionCallMatch( Or([String("avg"), String("avgIf")]), @@ -25,7 +24,6 @@ def process_query(self, query: Query, query_settings: QuerySettings) -> None: ) def transform_expression(exp: Expression) -> Expression: - print("huh_transform_expression") match = matcher.match(exp) if isinstance(exp, FunctionCall) and match is not None: suffix = "If" if exp.function_name == "avgIf" else "" diff --git a/snuba/query/processors/logical/filter_in_select_optimizer.py b/snuba/query/processors/logical/filter_in_select_optimizer.py index 6391740365..e5766af27d 100644 --- a/snuba/query/processors/logical/filter_in_select_optimizer.py +++ b/snuba/query/processors/logical/filter_in_select_optimizer.py @@ -45,7 +45,6 @@ class FindConditionalAggregateFunctionsVisitor( """ def __init__(self) -> None: - print("huh_FindConditionalAggregateFunctionsVisitor") self._matches: list[FunctionCall | CurriedFunctionCall] = [] def visit_literal(self, exp: Literal) -> list[FunctionCall | CurriedFunctionCall]: @@ -101,7 +100,6 @@ class FilterInSelectOptimizer(LogicalQueryProcessor): """ def process_query(self, query: LogicalQuery, query_settings: QuerySettings) -> None: - print("huh_FilterInSelectOptimizer") try: new_condition = self.get_select_filter(query) except Exception: @@ -117,7 +115,6 @@ def get_select_filter( self, query: LogicalQuery | CompositeQuery[QueryEntity], ) -> FunctionCall | None: - print("huh_get_select_filter") """ Given a query, grabs all the conditions from conditional aggregates and lifts into one condition. diff --git a/snuba/query/processors/logical/optional_attribute_aggregation.py b/snuba/query/processors/logical/optional_attribute_aggregation.py index 67636f33a1..d69c5a03c9 100644 --- a/snuba/query/processors/logical/optional_attribute_aggregation.py +++ b/snuba/query/processors/logical/optional_attribute_aggregation.py @@ -32,7 +32,6 @@ def __init__( aggregation_names: list[str], curried_aggregation_names: list[str], ): - print("huh_OptionalAttributeAggregationTransformer") self._attribute_column_names = attribute_column_names self._aggregation_names = aggregation_names self._curried_aggregation_names = curried_aggregation_names diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index b183369cc9..45a28af589 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -176,8 +176,6 @@ def execute_query( # Apply clickhouse query setting overrides clickhouse_query_settings.update(query_settings.get_clickhouse_settings()) - print("formatted_queryyyy", formatted_query) - result = reader.execute( formatted_query, clickhouse_query_settings, @@ -185,8 +183,6 @@ def execute_query( robust=robust, ) - print("resulttttttt", result) - timer.mark("execute") stats.update( { diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 2495d37ad4..5479622140 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -137,7 +137,6 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: if attr_key.name in NORMALIZED_COLUMNS: # the second if statement allows Sentry to send TYPE_FLOAT to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS - print("attr_keyyy", attr_key.name, attr_key.type) if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or ( attr_key.type == AttributeKey.Type.TYPE_FLOAT and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_DOUBLE diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py index f48b8e7518..41a235f9b1 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py @@ -612,7 +612,9 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: alias = aggregation.label if aggregation.label else None alias_dict = {"alias": alias} if alias else {} function_map: dict[Function.ValueType, CurriedFunctionCall | FunctionCall] = { - Function.FUNCTION_SUM: f.sum(f.multiply(field, sign_column), **alias_dict), + Function.FUNCTION_SUM: f.round( + f.sum(f.multiply(field, sign_column)), 8, **alias_dict + ), Function.FUNCTION_AVERAGE: f.divide( f.sum(f.multiply(field, sign_column)), f.sumIf(sign_column, get_field_existence_expression(aggregation)), @@ -628,7 +630,7 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: Function.FUNCTION_P90: cf.quantile(0.9)(field, **alias_dict), Function.FUNCTION_P95: cf.quantile(0.95)(field, **alias_dict), Function.FUNCTION_P99: cf.quantile(0.99)(field, **alias_dict), - Function.FUNCTION_AVG: f.round(f.avg(field), 9, **alias_dict), + Function.FUNCTION_AVG: f.round(f.avg(field), 8, **alias_dict), Function.FUNCTION_MAX: f.max(field, **alias_dict), Function.FUNCTION_MIN: f.min(field, **alias_dict), Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 99b14d296f..05da1f3a5c 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -148,10 +148,8 @@ def _build_query(request: TraceItemTableRequest) -> Query: ) elif column.HasField("aggregation"): function_expr = aggregation_to_expression(column.aggregation) - print("function_expr_1", function_expr) # aggregation label may not be set and the column label takes priority anyways. function_expr = replace(function_expr, alias=column.label) - print("function_expr_2", function_expr) selected_columns.append( SelectedExpression(name=column.label, expression=function_expr) ) @@ -188,7 +186,6 @@ def _build_query(request: TraceItemTableRequest) -> Query: raise BadSnubaRPCRequestException( "Column is neither an aggregate or an attribute" ) - print("selected_columnsss", selected_columns) res = Query( from_clause=entity, selected_columns=selected_columns, @@ -244,7 +241,6 @@ def _convert_results( for column in request.columns: if column.HasField("key"): - print("columnnnnkey", column, column.key, column.key.type) if column.key.type == AttributeKey.TYPE_BOOLEAN: converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) elif column.key.type == AttributeKey.TYPE_STRING: @@ -262,8 +258,6 @@ def _convert_results( "column is neither an attribute or aggregation" ) - print("covertersss", converters) - res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) for row in data: for column_name, value in row.items(): @@ -301,16 +295,13 @@ def trace_item_type(cls) -> TraceItemType.ValueType: return TraceItemType.TRACE_ITEM_TYPE_SPAN def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: - print("in_msgggg", in_msg) snuba_request = _build_snuba_request(in_msg) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), request=snuba_request, timer=self._timer, ) - print("ressss", res) column_values = _convert_results(in_msg, res.result.get("data", [])) - print("column_valuesss", column_values) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index 19b97180f3..c25971ebe1 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -557,37 +557,37 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: ) response = EndpointTraceItemTable().execute(message) - print("responseee", response.column_values) - - print( - "RESULTSSS", - [ - TraceItemColumnValues( - attribute_name="location", - results=[ - AttributeValue(val_str="backend"), - AttributeValue(val_str="frontend"), - AttributeValue(val_str="mobile"), - ], - ), - TraceItemColumnValues( - attribute_name="max(my.float.field)", - results=[ - AttributeValue(val_double=101.2), - AttributeValue(val_double=101.2), - AttributeValue(val_double=101.2), - ], - ), - TraceItemColumnValues( - attribute_name="avg(my.float.field)", - results=[ - AttributeValue(val_double=101.2), - AttributeValue(val_double=101.2), - AttributeValue(val_double=101.2), - ], - ), - ], - ) + # print("responseee", response.column_values) + # + # print( + # "RESULTSSS", + # [ + # TraceItemColumnValues( + # attribute_name="location", + # results=[ + # AttributeValue(val_str="backend"), + # AttributeValue(val_str="frontend"), + # AttributeValue(val_str="mobile"), + # ], + # ), + # TraceItemColumnValues( + # attribute_name="max(my.float.field)", + # results=[ + # AttributeValue(val_double=101.2), + # AttributeValue(val_double=101.2), + # AttributeValue(val_double=101.2), + # ], + # ), + # TraceItemColumnValues( + # attribute_name="avg(my.float.field)", + # results=[ + # AttributeValue(val_double=101.2), + # AttributeValue(val_double=101.2), + # AttributeValue(val_double=101.2), + # ], + # ), + # ], + # ) assert response.column_values == [ TraceItemColumnValues( @@ -1159,7 +1159,7 @@ def test_aggregation_filter_basic(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_SUM, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="sum(my.float.field)", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -1179,7 +1179,7 @@ def test_aggregation_filter_basic(self, setup_teardown: Any) -> None: aggregation=AttributeAggregation( aggregate=Function.FUNCTION_SUM, key=AttributeKey( - type=AttributeKey.TYPE_FLOAT, name="my.float.field" + type=AttributeKey.TYPE_DOUBLE, name="my.float.field" ), label="this-doesnt-matter-and-can-be-left-out", ), @@ -1200,8 +1200,8 @@ def test_aggregation_filter_basic(self, setup_teardown: Any) -> None: TraceItemColumnValues( attribute_name="sum(my.float.field)", results=[ - AttributeValue(val_float=1214.4), - AttributeValue(val_float=3036), + AttributeValue(val_double=1214.4), + AttributeValue(val_double=3036), ], ), ] @@ -1311,8 +1311,8 @@ def test_aggregation_filter_and_or(self, setup_teardown: Any) -> None: TraceItemColumnValues( attribute_name="sum(my.float.field)", results=[ - AttributeValue(val_float=1214.4), - AttributeValue(val_float=3036), + AttributeValue(val_double=1214.4), + AttributeValue(val_double=3036), ], ), ] @@ -1365,9 +1365,9 @@ def test_aggregation_filter_and_or(self, setup_teardown: Any) -> None: TraceItemColumnValues( attribute_name="sum(my.float.field)", results=[ - AttributeValue(val_float=303.6), - AttributeValue(val_float=1214.4), - AttributeValue(val_float=3036), + AttributeValue(val_double=303.6), + AttributeValue(val_double=1214.4), + AttributeValue(val_double=3036), ], ), ] From 07b5ddba1a9e3da01df2d6cd470e45b5ecaea438 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Fri, 17 Jan 2025 13:56:25 -0800 Subject: [PATCH 7/7] Trigger Build --- .../rpc/v1/resolvers/R_eap_spans/common/aggregation.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py index 41a235f9b1..d972c448cf 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py @@ -37,6 +37,8 @@ CUSTOM_COLUMN_PREFIX = "__snuba_custom_column__" +_FLOATING_POINT_PRECISION = 8 + @dataclass(frozen=True) class ExtrapolationContext(ABC): @@ -613,7 +615,9 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: alias_dict = {"alias": alias} if alias else {} function_map: dict[Function.ValueType, CurriedFunctionCall | FunctionCall] = { Function.FUNCTION_SUM: f.round( - f.sum(f.multiply(field, sign_column)), 8, **alias_dict + f.sum(f.multiply(field, sign_column)), + _FLOATING_POINT_PRECISION, + **alias_dict, ), Function.FUNCTION_AVERAGE: f.divide( f.sum(f.multiply(field, sign_column)), @@ -630,7 +634,9 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression: Function.FUNCTION_P90: cf.quantile(0.9)(field, **alias_dict), Function.FUNCTION_P95: cf.quantile(0.95)(field, **alias_dict), Function.FUNCTION_P99: cf.quantile(0.99)(field, **alias_dict), - Function.FUNCTION_AVG: f.round(f.avg(field), 8, **alias_dict), + Function.FUNCTION_AVG: f.round( + f.avg(field), _FLOATING_POINT_PRECISION, **alias_dict + ), Function.FUNCTION_MAX: f.max(field, **alias_dict), Function.FUNCTION_MIN: f.min(field, **alias_dict), Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict),