diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/sampling_in_storage_util.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/sampling_in_storage_util.py index 7732b1a5fc3..aa466d61e10 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/sampling_in_storage_util.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/sampling_in_storage_util.py @@ -67,6 +67,26 @@ def _get_bytes_scanned_limit() -> int: ) # 150 gigabytes is default +def _get_bypass_seconds_threshold() -> int: + return cast( + int, + state.get_int_config( + _SAMPLING_IN_STORAGE_PREFIX + "bypass_seconds_threshold", + default=1 * 24 * 60 * 60, # a day + ), + ) + + +def _get_bypass_bytes_threshold() -> int: + return cast( + int, + state.get_int_config( + _SAMPLING_IN_STORAGE_PREFIX + "bypass_bytes_threshold", + default=int(5e9), # 5 gigabytes + ), + ) + + def _get_query_bytes_scanned(res: QueryResult) -> int: return cast(int, res.result.get("profile", {}).get("progress_bytes", 0)) # type: ignore @@ -95,18 +115,25 @@ def _get_target_tier( most_downsampled_res: QueryResult, metrics_backend: MetricsBackend, referrer: str, - timer: Timer, -) -> Tuple[Tier, int]: + request_time_range_secs: int, +) -> Tuple[Tier, int, bool]: with sentry_sdk.start_span(op="_get_target_tier") as span: + bypass = False most_downsampled_query_bytes_scanned = _get_query_bytes_scanned( most_downsampled_res ) - span.set_data( _SAMPLING_IN_STORAGE_PREFIX + "most_downsampled_query_bytes_scanned", most_downsampled_query_bytes_scanned, ) + if ( + request_time_range_secs <= _get_bypass_seconds_threshold() + or _get_query_bytes_scanned(most_downsampled_res) + <= _get_bypass_bytes_threshold() + ): + bypass = True + target_tier = _get_most_downsampled_tier() estimated_target_tier_bytes_scanned = ( most_downsampled_query_bytes_scanned @@ -122,7 +149,7 @@ def _get_target_tier( ) _record_value_in_span_and_DD( - span, + tier_specific_span, metrics_backend.distribution, "estimated_query_bytes_scanned", estimated_query_bytes_scanned_to_this_tier, @@ -155,7 +182,7 @@ def _get_target_tier( _SAMPLING_IN_STORAGE_PREFIX + "estimated_target_tier_bytes_scanned", estimated_target_tier_bytes_scanned, ) - return target_tier, estimated_target_tier_bytes_scanned + return target_tier, estimated_target_tier_bytes_scanned, bypass def _is_best_effort_mode(in_msg: T) -> bool: @@ -226,6 +253,40 @@ def _run_query_on_most_downsampled_tier( return res +def _emit_estimation_error_info( + span: Span, + metrics_backend: MetricsBackend, + estimated_target_tier_query_bytes_scanned: int, + res: QueryResult, + tags: Dict[str, str], +) -> None: + estimation_error = ( + estimated_target_tier_query_bytes_scanned - _get_query_bytes_scanned(res) + ) + _record_value_in_span_and_DD( + span, + metrics_backend.distribution, + "estimation_error_percentage", + abs(estimation_error) / _get_query_bytes_scanned(res), + tags, + ) + + estimation_error_metric_name = ( + "over_estimation_error" if estimation_error > 0 else "under_estimation_error" + ) + _record_value_in_span_and_DD( + span, + metrics_backend.distribution, + estimation_error_metric_name, + abs(estimation_error), + tags, + ) + + +def _get_time_range_secs(in_msg: T) -> int: + return in_msg.meta.end_timestamp.seconds - in_msg.meta.start_timestamp.seconds + + @with_span(op="function") def run_query_to_correct_tier( in_msg: T, @@ -245,6 +306,8 @@ def run_query_to_correct_tier( timer=timer, ) + print(in_msg) + with sentry_sdk.start_span(op="query_most_downsampled_tier"): timer.mark(_START_ESTIMATION_MARK) @@ -266,8 +329,12 @@ def run_query_to_correct_tier( _get_time_budget() / 1000, ) query_settings.push_clickhouse_setting("timeout_overflow_mode", "break") - target_tier, estimated_target_tier_query_bytes_scanned = _get_target_tier( - res, metrics_backend, referrer, timer + ( + target_tier, + estimated_target_tier_query_bytes_scanned, + bypassed, + ) = _get_target_tier( + res, metrics_backend, referrer, _get_time_range_secs(in_msg) ) timer.mark(_END_ESTIMATION_MARK) _record_value_in_span_and_DD( @@ -298,28 +365,11 @@ def run_query_to_correct_tier( timer=timer, ) - estimation_error = ( - estimated_target_tier_query_bytes_scanned - - _get_query_bytes_scanned(res) - ) - _record_value_in_span_and_DD( - span, - metrics_backend.distribution, - "estimation_error_percentage", - abs(estimation_error) / _get_query_bytes_scanned(res), - {"referrer": referrer, "tier": str(target_tier)}, - ) - - estimation_error_metric_name = ( - "over_estimation_error" - if estimation_error > 0 - else "under_estimation_error" - ) - _record_value_in_span_and_DD( + _emit_estimation_error_info( span, - metrics_backend.distribution, - estimation_error_metric_name, - abs(estimation_error), + metrics_backend, + estimated_target_tier_query_bytes_scanned, + res, {"referrer": referrer, "tier": str(target_tier)}, ) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index fb7b2d91ee1..858ceed3e62 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -3383,6 +3383,8 @@ def test_best_effort_route_to_tier_64(self) -> None: ) ) + assert False + def test_best_effort_end_to_end(self) -> None: items_storage = get_storage(StorageKey("eap_items")) msg_timestamp = BASE_TIME - timedelta(minutes=1)