|
| 1 | +import grpc |
| 2 | +import time |
| 3 | + |
| 4 | +from timeit import default_timer |
| 5 | + |
| 6 | +from .server_metrics import SERVER_HANDLED_LATENCY_SECONDS |
| 7 | +from .server_metrics import SERVER_HANDLED_COUNTER |
| 8 | +from .server_metrics import SERVER_STARTED_COUNTER |
| 9 | + |
| 10 | +from .util import type_from_method |
| 11 | +from .util import code_to_string |
| 12 | + |
| 13 | + |
| 14 | +def _wrap_rpc_behavior(handler, fn): |
| 15 | + if handler is None: |
| 16 | + return None |
| 17 | + |
| 18 | + if handler.request_streaming and handler.response_streaming: |
| 19 | + behavior_fn = handler.stream_stream |
| 20 | + handler_factory = grpc.stream_stream_rpc_method_handler |
| 21 | + elif handler.request_streaming and not handler.response_streaming: |
| 22 | + behavior_fn = handler.stream_unary |
| 23 | + handler_factory = grpc.stream_unary_rpc_method_handler |
| 24 | + elif not handler.request_streaming and handler.response_streaming: |
| 25 | + behavior_fn = handler.unary_stream |
| 26 | + handler_factory = grpc.unary_stream_rpc_method_handler |
| 27 | + else: |
| 28 | + behavior_fn = handler.unary_unary |
| 29 | + handler_factory = grpc.unary_unary_rpc_method_handler |
| 30 | + |
| 31 | + return handler_factory(fn(behavior_fn, |
| 32 | + handler.request_streaming, |
| 33 | + handler.response_streaming), |
| 34 | + request_deserializer=handler.request_deserializer, |
| 35 | + response_serializer=handler.response_serializer) |
| 36 | + |
| 37 | + |
| 38 | +class PromServerInterceptor(grpc.ServerInterceptor): |
| 39 | + |
| 40 | + def intercept_service(self, continuation, handler_call_details): |
| 41 | + |
| 42 | + handler = continuation(handler_call_details) |
| 43 | + |
| 44 | + # only support unary |
| 45 | + if handler.request_streaming or handler.response_streaming: |
| 46 | + return handler |
| 47 | + |
| 48 | + client_call_method = handler_call_details.method |
| 49 | + ss = client_call_method.split("/") |
| 50 | + if len(ss) < 3: |
| 51 | + return continuation(handler_call_details) |
| 52 | + grpc_service = ss[1] |
| 53 | + grpc_method = ss[2] |
| 54 | + grpc_type = type_from_method(handler.request_streaming, handler.response_streaming) |
| 55 | + |
| 56 | + SERVER_STARTED_COUNTER.labels( |
| 57 | + grpc_type=grpc_type, |
| 58 | + grpc_service=grpc_service, |
| 59 | + grpc_method=grpc_method).inc() |
| 60 | + |
| 61 | + def latency_wrapper(behavior, request_streaming, response_streaming): |
| 62 | + def new_behavior(request_or_iterator, service_context): |
| 63 | + start = default_timer() |
| 64 | + try: |
| 65 | + rsp = behavior(request_or_iterator, service_context) |
| 66 | + SERVER_HANDLED_COUNTER.labels( |
| 67 | + grpc_type=grpc_type, |
| 68 | + grpc_service=grpc_service, |
| 69 | + grpc_method=grpc_method, |
| 70 | + grpc_code=code_to_string(service_context._state.code) |
| 71 | + ).inc() |
| 72 | + return rsp |
| 73 | + except grpc.RpcError as e: |
| 74 | + if isinstance(e, grpc.Call): |
| 75 | + SERVER_HANDLED_COUNTER.labels( |
| 76 | + grpc_type=grpc_type, |
| 77 | + grpc_service=grpc_service, |
| 78 | + grpc_method=grpc_method, |
| 79 | + grpc_code=code_to_string(e.code()) |
| 80 | + ).inc() |
| 81 | + raise e |
| 82 | + finally: |
| 83 | + SERVER_HANDLED_LATENCY_SECONDS.labels( |
| 84 | + grpc_type=grpc_type, |
| 85 | + grpc_service=grpc_service, |
| 86 | + grpc_method=grpc_method).observe(max(default_timer() - start, 0)) |
| 87 | + |
| 88 | + return new_behavior |
| 89 | + |
| 90 | + return _wrap_rpc_behavior(continuation(handler_call_details), latency_wrapper) |
| 91 | + |
| 92 | + |
| 93 | +class ServiceLatencyInterceptor(grpc.ServerInterceptor): |
| 94 | + |
| 95 | + def __init__(self): |
| 96 | + pass |
| 97 | + |
| 98 | + def intercept_service(self, continuation, handler_call_details): |
| 99 | + client_call_method = handler_call_details.method |
| 100 | + parts = client_call_method.split("/") |
| 101 | + grpc_service = parts[1] |
| 102 | + grpc_method = parts[2] |
| 103 | + |
| 104 | + def latency_wrapper(behavior, request_streaming, response_streaming): |
| 105 | + def new_behavior(request_or_iterator, service_context): |
| 106 | + start = time.time() |
| 107 | + try: |
| 108 | + return behavior(request_or_iterator, service_context) |
| 109 | + finally: |
| 110 | + SERVER_HANDLED_LATENCY_SECONDS.labels( |
| 111 | + grpc_type='UNARY', |
| 112 | + grpc_service=grpc_service, |
| 113 | + grpc_method=grpc_method).observe(max(time.time() - start, 0)) |
| 114 | + |
| 115 | + return new_behavior |
| 116 | + |
| 117 | + return _wrap_rpc_behavior(continuation(handler_call_details), latency_wrapper) |
0 commit comments