Skip to content

fix(eap-api): Snuba receives and returns doubles, while still supporting floats #6782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.4
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.49
sentry-protos==0.1.51
sentry-redis-tools==0.3.0
sentry-relay==0.9.4
sentry-sdk==2.18.0
Expand Down
30 changes: 23 additions & 7 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ def transform(exp: Expression) -> Expression:
"sentry.segment_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage
"sentry.segment_name": AttributeKey.Type.TYPE_STRING,
"sentry.is_segment": AttributeKey.Type.TYPE_BOOLEAN,
"sentry.duration_ms": AttributeKey.Type.TYPE_FLOAT,
"sentry.exclusive_time_ms": AttributeKey.Type.TYPE_FLOAT,
"sentry.duration_ms": AttributeKey.Type.TYPE_DOUBLE,
"sentry.exclusive_time_ms": AttributeKey.Type.TYPE_DOUBLE,
"sentry.retention_days": AttributeKey.Type.TYPE_INT,
"sentry.name": AttributeKey.Type.TYPE_STRING,
"sentry.sampling_weight": AttributeKey.Type.TYPE_FLOAT,
"sentry.sampling_factor": AttributeKey.Type.TYPE_FLOAT,
"sentry.sampling_weight": AttributeKey.Type.TYPE_DOUBLE,
"sentry.sampling_factor": AttributeKey.Type.TYPE_DOUBLE,
"sentry.timestamp": AttributeKey.Type.TYPE_UNSPECIFIED,
"sentry.start_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED,
"sentry.end_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED,
Expand Down Expand Up @@ -124,7 +124,10 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str:
)
if attr_key.type == AttributeKey.Type.TYPE_INT:
return f.CAST(column(attr_key.name[len("sentry.") :]), "Int64", alias=alias)
if attr_key.type == AttributeKey.Type.TYPE_FLOAT:
if (
attr_key.type == AttributeKey.Type.TYPE_FLOAT
or attr_key.type == AttributeKey.Type.TYPE_DOUBLE
):
return f.CAST(
column(attr_key.name[len("sentry.") :]), "Float64", alias=alias
)
Expand All @@ -133,7 +136,11 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str:
)

if attr_key.name in NORMALIZED_COLUMNS:
if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type:
# the second if statement allows Sentry to send TYPE_FLOAT to Snuba when Snuba still has to be backward compatible with TYPE_FLOATS
if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type or (
attr_key.type == AttributeKey.Type.TYPE_FLOAT
and NORMALIZED_COLUMNS[attr_key.name] == AttributeKey.Type.TYPE_DOUBLE
):
return column(attr_key.name[len("sentry.") :], alias=attr_key.name)
raise BadSnubaRPCRequestException(
f"Attribute {attr_key.name} must be requested as {NORMALIZED_COLUMNS[attr_key.name]}, got {attr_key.type}"
Expand All @@ -144,7 +151,10 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str:
return SubscriptableReference(
alias=alias, column=column("attr_str"), key=literal(attr_key.name)
)
if attr_key.type == AttributeKey.Type.TYPE_FLOAT:
if (
attr_key.type == AttributeKey.Type.TYPE_FLOAT
or attr_key.type == AttributeKey.Type.TYPE_DOUBLE
):
return SubscriptableReference(
alias=alias, column=column("attr_num"), key=literal(attr_key.name)
)
Expand Down Expand Up @@ -284,6 +294,8 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression
v_expression = literal(v.val_str)
case "val_float":
v_expression = literal(v.val_float)
case "val_double":
v_expression = literal(v.val_double)
case "val_int":
v_expression = literal(v.val_int)
case "val_null":
Expand All @@ -300,6 +312,10 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_float_array.values))
)
case "val_double_array":
v_expression = literals_array(
None, list(map(lambda x: literal(x), v.val_double_array.values))
)
case default:
raise NotImplementedError(
f"translation of AttributeValue type {default} is not implemented"
Expand Down
9 changes: 8 additions & 1 deletion snuba/web/rpc/v1/endpoint_get_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
_DEFAULT_ROW_LIMIT = 10_000
_BUFFER_WINDOW = 2 * 3600 # 2 hours


_ATTRIBUTES: dict[
TraceAttribute.Key.ValueType,
tuple[str, AttributeKey.Type.ValueType],
Expand All @@ -58,7 +59,7 @@
),
TraceAttribute.Key.KEY_START_TIMESTAMP: (
"trace_start_timestamp",
AttributeKey.Type.TYPE_FLOAT,
AttributeKey.Type.TYPE_DOUBLE,
),
TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT: (
"total_item_count",
Expand All @@ -69,6 +70,8 @@
AttributeKey.Type.TYPE_STRING,
),
}


_TYPES_TO_CLICKHOUSE: dict[
AttributeKey.Type.ValueType,
tuple[str, Callable[[Any], AttributeValue]],
Expand All @@ -85,6 +88,10 @@
"Float64",
lambda x: AttributeValue(val_float=float(x)),
),
AttributeKey.Type.TYPE_DOUBLE: (
"Float64",
lambda x: AttributeValue(val_double=float(x)),
),
}


Expand Down
1 change: 1 addition & 0 deletions snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def convert_to_snuba_request(req: TraceItemAttributeNamesRequest) -> SnubaReques
)
elif req.type in (
AttributeKey.Type.TYPE_FLOAT,
AttributeKey.Type.TYPE_DOUBLE,
AttributeKey.Type.TYPE_INT,
AttributeKey.Type.TYPE_BOOLEAN,
):
Expand Down
39 changes: 30 additions & 9 deletions snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

CUSTOM_COLUMN_PREFIX = "__snuba_custom_column__"

_FLOATING_POINT_PRECISION = 9


@dataclass(frozen=True)
class ExtrapolationContext(ABC):
Expand Down Expand Up @@ -612,23 +614,42 @@ def aggregation_to_expression(aggregation: AttributeAggregation) -> Expression:
alias = aggregation.label if aggregation.label else None
alias_dict = {"alias": alias} if alias else {}
function_map: dict[Function.ValueType, CurriedFunctionCall | FunctionCall] = {
Function.FUNCTION_SUM: f.sum(f.multiply(field, sign_column), **alias_dict),
Function.FUNCTION_AVERAGE: f.divide(
Function.FUNCTION_SUM: f.round(
f.sum(f.multiply(field, sign_column)),
f.sumIf(sign_column, get_field_existence_expression(aggregation)),
_FLOATING_POINT_PRECISION,
**alias_dict,
),
Function.FUNCTION_AVERAGE: f.round(
f.divide(
f.sum(f.multiply(field, sign_column)),
f.sumIf(sign_column, get_field_existence_expression(aggregation)),
),
_FLOATING_POINT_PRECISION,
**alias_dict,
),
Function.FUNCTION_COUNT: f.sumIf(
sign_column,
get_field_existence_expression(aggregation),
**alias_dict,
),
Function.FUNCTION_P50: cf.quantile(0.5)(field, **alias_dict),
Function.FUNCTION_P75: cf.quantile(0.75)(field, **alias_dict),
Function.FUNCTION_P90: cf.quantile(0.9)(field, **alias_dict),
Function.FUNCTION_P95: cf.quantile(0.95)(field, **alias_dict),
Function.FUNCTION_P99: cf.quantile(0.99)(field, **alias_dict),
Function.FUNCTION_AVG: f.avg(field, **alias_dict),
Function.FUNCTION_P50: f.round(
cf.quantile(0.5)(field), _FLOATING_POINT_PRECISION, **alias_dict
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to add this _FLOATING_POINT_PRECISION argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have floating point precision errors :(

),
Function.FUNCTION_P75: f.round(
cf.quantile(0.75)(field), _FLOATING_POINT_PRECISION, **alias_dict
),
Function.FUNCTION_P90: f.round(
cf.quantile(0.9)(field), _FLOATING_POINT_PRECISION, **alias_dict
),
Function.FUNCTION_P95: f.round(
cf.quantile(0.95)(field), _FLOATING_POINT_PRECISION, **alias_dict
),
Function.FUNCTION_P99: f.round(
cf.quantile(0.99)(field), _FLOATING_POINT_PRECISION, **alias_dict
),
Function.FUNCTION_AVG: f.round(
f.avg(field), _FLOATING_POINT_PRECISION, **alias_dict
),
Function.FUNCTION_MAX: f.max(field, **alias_dict),
Function.FUNCTION_MIN: f.min(field, **alias_dict),
Function.FUNCTION_UNIQ: f.uniq(field, **alias_dict),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def _build_query(request: TraceItemTableRequest) -> Query:
raise BadSnubaRPCRequestException(
"Column is neither an aggregate or an attribute"
)

res = Query(
from_clause=entity,
selected_columns=selected_columns,
Expand Down Expand Up @@ -250,8 +249,10 @@ def _convert_results(
converters[column.label] = lambda x: AttributeValue(val_int=int(x))
elif column.key.type == AttributeKey.TYPE_FLOAT:
converters[column.label] = lambda x: AttributeValue(val_float=float(x))
elif column.key.type == AttributeKey.TYPE_DOUBLE:
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("aggregation"):
converters[column.label] = lambda x: AttributeValue(val_float=float(x))
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
else:
raise BadSnubaRPCRequestException(
"column is neither an attribute or aggregation"
Expand Down
14 changes: 14 additions & 0 deletions tests/web/rpc/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ def test_timestamp_columns(self) -> None:
) == f.CAST(
column(col[len("sentry.") :]), "Float64", alias=col + "_TYPE_FLOAT"
)
assert attribute_key_to_expression(
AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name=col,
),
) == f.CAST(
column(col[len("sentry.") :]), "Float64", alias=col + "_TYPE_DOUBLE"
)

def test_normalized_col(self) -> None:
for col in [
Expand Down Expand Up @@ -71,6 +79,12 @@ def test_attributes(self) -> None:
alias="derp_TYPE_FLOAT", column=column("attr_num"), key=literal("derp")
)

assert attribute_key_to_expression(
AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="derp"),
) == SubscriptableReference(
alias="derp_TYPE_DOUBLE", column=column("attr_num"), key=literal("derp")
)

assert attribute_key_to_expression(
AttributeKey(type=AttributeKey.TYPE_INT, name="derp"),
) == f.CAST(
Expand Down
80 changes: 77 additions & 3 deletions tests/web/rpc/v1/test_endpoint_get_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None:
)
assert MessageToDict(response) == MessageToDict(expected_response)

def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None:
def test_with_data_and_aggregated_fields_backward_compat(
self, setup_teardown: Any
) -> None:
ts = Timestamp(seconds=int(_BASE_TIME.timestamp()))
three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp())
start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10)
Expand Down Expand Up @@ -411,9 +413,9 @@ def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None:
),
TraceAttribute(
key=TraceAttribute.Key.KEY_START_TIMESTAMP,
type=AttributeKey.TYPE_FLOAT,
type=AttributeKey.TYPE_DOUBLE,
value=AttributeValue(
val_float=start_timestamp_per_trace_id[
val_double=start_timestamp_per_trace_id[
trace_id_per_start_timestamp[start_timestamp]
],
),
Expand Down Expand Up @@ -451,3 +453,75 @@ def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None:
for start_timestamp in reversed(sorted(trace_id_per_start_timestamp.keys())):
print(start_timestamp, trace_id_per_start_timestamp[start_timestamp])
assert MessageToDict(response) == MessageToDict(expected_response)

def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None:
ts = Timestamp(seconds=int(_BASE_TIME.timestamp()))
three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp())
start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10)
for s in _SPANS:
start_timestamp_per_trace_id[s["trace_id"]] = min(
start_timestamp_per_trace_id[s["trace_id"]],
s["start_timestamp_precise"],
)
trace_id_per_start_timestamp: dict[float, str] = {
timestamp: trace_id
for trace_id, timestamp in start_timestamp_per_trace_id.items()
}
message = GetTracesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=ts,
end_timestamp=Timestamp(seconds=three_hours_later),
request_id=_REQUEST_ID,
),
attributes=[
TraceAttribute(
key=TraceAttribute.Key.KEY_START_TIMESTAMP,
type=AttributeKey.TYPE_DOUBLE,
),
],
filters=[
GetTracesRequest.TraceFilter(
item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
filter=TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="sentry.op",
type=AttributeKey.TYPE_STRING,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str="db"),
),
),
),
],
)
response = EndpointGetTraces().execute(message)
expected_response = GetTracesResponse(
traces=[
GetTracesResponse.Trace(
attributes=[
TraceAttribute(
key=TraceAttribute.Key.KEY_START_TIMESTAMP,
type=AttributeKey.TYPE_DOUBLE,
value=AttributeValue(
val_double=start_timestamp_per_trace_id[
trace_id_per_start_timestamp[start_timestamp]
],
),
),
],
)
for start_timestamp in reversed(
sorted(trace_id_per_start_timestamp.keys())
)
],
page_token=PageToken(offset=len(_TRACE_IDS)),
meta=ResponseMeta(request_id=_REQUEST_ID),
)
for start_timestamp in reversed(sorted(trace_id_per_start_timestamp.keys())):
print(start_timestamp, trace_id_per_start_timestamp[start_timestamp])
assert MessageToDict(response) == MessageToDict(expected_response)
31 changes: 30 additions & 1 deletion tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_basic(self) -> None:
)
assert res.attributes == expected

def test_simple_float(self) -> None:
def test_simple_float_backward_compat(self) -> None:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
Expand Down Expand Up @@ -143,6 +143,35 @@ def test_simple_float(self) -> None:
)
assert res.attributes == expected

def test_simple_double(self) -> None:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(
seconds=int((BASE_TIME - timedelta(days=1)).timestamp())
),
end_timestamp=Timestamp(
seconds=int((BASE_TIME + timedelta(days=1)).timestamp())
),
),
limit=TOTAL_GENERATED_ATTR_PER_TYPE,
type=AttributeKey.Type.TYPE_DOUBLE,
value_substring_match="",
)
res = EndpointTraceItemAttributeNames().execute(req)
expected = []
for i in range(TOTAL_GENERATED_ATTR_PER_TYPE):
expected.append(
TraceItemAttributeNamesResponse.Attribute(
name=f"b_measurement_{str(i).zfill(3)}",
type=AttributeKey.Type.TYPE_DOUBLE,
)
)
assert res.attributes == expected

def test_with_filter(self) -> None:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
Expand Down
Loading
Loading