Skip to content

Commit c6aaecd

Browse files
authored
Feature/client (#9)
* add client support * add client support * add client support * add client support
1 parent aa32932 commit c6aaecd

File tree

9 files changed

+389
-3
lines changed

9 files changed

+389
-3
lines changed

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,3 @@ Note: the `grpc_*` metrics will just show commented out (with their descriptions
4141
## TODO
4242

4343
- add stream support
44-
- add client metrics
45-
- add example
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from __future__ import print_function
2+
import logging
3+
import time
4+
5+
import grpc
6+
7+
import helloworld_pb2
8+
import helloworld_pb2_grpc
9+
10+
import prometheus_client
11+
from python_grpc_prometheus.prometheus_client_interceptor import PromClientInterceptor
12+
13+
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
14+
15+
16+
def run():
17+
interceptor = PromClientInterceptor()
18+
with grpc.insecure_channel('localhost:50051') as channel:
19+
intercept_channel = grpc.intercept_channel(channel,
20+
interceptor)
21+
stub = helloworld_pb2_grpc.GreeterStub(intercept_channel)
22+
response = stub.SayHello.future(helloworld_pb2.HelloRequest(name='you'))
23+
response=response.result()
24+
print("Greeter client received: " + response.message)
25+
26+
27+
if __name__ == '__main__':
28+
logging.basicConfig()
29+
try:
30+
run()
31+
except Exception as e:
32+
print(e)
33+
prometheus_client.start_http_server(8001)
34+
try:
35+
while True:
36+
time.sleep(_ONE_DAY_IN_SECONDS)
37+
except KeyboardInterrupt:
38+
pass
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from concurrent import futures
2+
import time
3+
import logging
4+
5+
import grpc
6+
7+
import helloworld_pb2
8+
import helloworld_pb2_grpc
9+
10+
import prometheus_client
11+
from python_grpc_prometheus.prometheus_server_interceptor import PromServerInterceptor
12+
13+
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
14+
15+
16+
class Greeter(helloworld_pb2_grpc.GreeterServicer):
17+
18+
def SayHello(self, request, context):
19+
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
20+
21+
22+
def serve():
23+
# Add the required interceptor(s) where you create your grpc server, e.g.
24+
psi = PromServerInterceptor()
25+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=(psi,))
26+
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
27+
server.add_insecure_port('[::]:50051')
28+
server.start()
29+
prometheus_client.start_http_server(8000)
30+
try:
31+
while True:
32+
time.sleep(_ONE_DAY_IN_SECONDS)
33+
except KeyboardInterrupt:
34+
server.stop(0)
35+
36+
37+
if __name__ == '__main__':
38+
logging.basicConfig()
39+
serve()

examples/helloworld/helloworld_pb2.py

Lines changed: 134 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
2+
import grpc
3+
4+
import helloworld_pb2 as helloworld__pb2
5+
6+
7+
class GreeterStub(object):
8+
"""The greeting service definition.
9+
"""
10+
11+
def __init__(self, channel):
12+
"""Constructor.
13+
14+
Args:
15+
channel: A grpc.Channel.
16+
"""
17+
self.SayHello = channel.unary_unary(
18+
'/helloworld.Greeter/SayHello',
19+
request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
20+
response_deserializer=helloworld__pb2.HelloReply.FromString,
21+
)
22+
23+
24+
class GreeterServicer(object):
25+
"""The greeting service definition.
26+
"""
27+
28+
def SayHello(self, request, context):
29+
"""Sends a greeting
30+
"""
31+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
32+
context.set_details('Method not implemented!')
33+
raise NotImplementedError('Method not implemented!')
34+
35+
36+
def add_GreeterServicer_to_server(servicer, server):
37+
rpc_method_handlers = {
38+
'SayHello': grpc.unary_unary_rpc_method_handler(
39+
servicer.SayHello,
40+
request_deserializer=helloworld__pb2.HelloRequest.FromString,
41+
response_serializer=helloworld__pb2.HelloReply.SerializeToString,
42+
),
43+
}
44+
generic_handler = grpc.method_handlers_generic_handler(
45+
'helloworld.Greeter', rpc_method_handlers)
46+
server.add_generic_rpc_handlers((generic_handler,))
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from prometheus_client import Counter
2+
from prometheus_client import Histogram
3+
4+
5+
CLIENT_STARTED_COUNTER = Counter(
6+
'grpc_client_started_total',
7+
'Total number of RPCs started on the client.',
8+
["grpc_type", "grpc_service", "grpc_method"])
9+
10+
CLIENT_HANDLED_COUNTER = Counter(
11+
'grpc_client_handled_total',
12+
'Total number of RPCs completed by the client, regardless of success or failure.',
13+
["grpc_type", "grpc_service", "grpc_method", "grpc_code"])
14+
15+
CLIENT_HANDLED_LATENCY_SECONDS = Histogram(
16+
'grpc_client_handling_seconds',
17+
'Histogram of response latency (seconds) of the gRPC until it is finished by the application.',
18+
["grpc_type", "grpc_service", "grpc_method"])
19+
20+
CLIENT_MSG_RECEIVED_TOTAL = Counter(
21+
'grpc_client_msg_received_total',
22+
'Total number of RPC stream messages received by the client.',
23+
["grpc_type", "grpc_service", "grpc_method"])
24+
25+
CLIENT_MSG_SENT_TOTAL = Counter(
26+
'grpc_client_msg_sent_total',
27+
'Total number of gRPC stream messages sent by the client.',
28+
["grpc_type", "grpc_service", "grpc_method"])
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import grpc
2+
3+
import time
4+
5+
from python_grpc_prometheus.client_metrics import (CLIENT_HANDLED_LATENCY_SECONDS,
6+
CLIENT_HANDLED_COUNTER,
7+
CLIENT_STARTED_COUNTER,
8+
CLIENT_MSG_RECEIVED_TOTAL,
9+
CLIENT_MSG_SENT_TOTAL)
10+
11+
from python_grpc_prometheus import util
12+
from python_grpc_prometheus.util import split_call_details
13+
from python_grpc_prometheus.util import code_to_string
14+
15+
_ = CLIENT_MSG_RECEIVED_TOTAL
16+
17+
18+
class PromClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
19+
grpc.StreamUnaryClientInterceptor,
20+
grpc.StreamStreamClientInterceptor):
21+
22+
@staticmethod
23+
def _callback(grpc_type, grpc_service, grpc_method, start):
24+
def callback(future_response):
25+
exception = future_response.exception()
26+
code = code_to_string(grpc.StatusCode.OK)
27+
if exception is not None:
28+
if isinstance(exception, grpc.Call):
29+
code = code_to_string(exception.code())
30+
else:
31+
code = code_to_string(grpc.StatusCode.UNKNOWN)
32+
33+
CLIENT_HANDLED_COUNTER.labels(
34+
grpc_type=grpc_type,
35+
grpc_service=grpc_service,
36+
grpc_method=grpc_method,
37+
grpc_code=code
38+
).inc()
39+
40+
CLIENT_HANDLED_LATENCY_SECONDS.labels(
41+
grpc_type=grpc_type,
42+
grpc_service=grpc_service,
43+
grpc_method=grpc_method).observe(max(time.time() - start, 0))
44+
45+
return callback
46+
47+
def intercept_unary_unary(self, continuation, client_call_details, request):
48+
grpc_service, grpc_method, ok = split_call_details(client_call_details, 3)
49+
if not ok:
50+
return continuation(client_call_details, request)
51+
52+
start = time.time()
53+
grpc_type = util.Unary
54+
CLIENT_STARTED_COUNTER.labels(
55+
grpc_type=grpc_type,
56+
grpc_service=grpc_service,
57+
grpc_method=grpc_method).inc()
58+
CLIENT_MSG_SENT_TOTAL.labels(
59+
grpc_type=grpc_type,
60+
grpc_service=grpc_service,
61+
grpc_method=grpc_method).inc()
62+
63+
try:
64+
response = continuation(client_call_details, request)
65+
response.add_done_callback(self._callback(util.Unary, grpc_service, grpc_method, start))
66+
except grpc.RpcError as e:
67+
code = code_to_string(grpc.StatusCode.UNKNOWN)
68+
if isinstance(e, grpc.Call):
69+
code = code_to_string(e.code())
70+
CLIENT_HANDLED_COUNTER.labels(
71+
grpc_type=grpc_type,
72+
grpc_service=grpc_service,
73+
grpc_method=grpc_method,
74+
grpc_code=code
75+
).inc()
76+
77+
CLIENT_HANDLED_LATENCY_SECONDS.labels(
78+
grpc_type=grpc_type,
79+
grpc_service=grpc_service,
80+
grpc_method=grpc_method).observe(max(time.time() - start, 0))
81+
raise
82+
return response
83+
84+
def intercept_unary_stream(self, continuation, client_call_details, request):
85+
response = continuation(client_call_details, request)
86+
return response
87+
88+
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
89+
response = continuation(client_call_details, request_iterator)
90+
return response
91+
92+
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
93+
response = continuation(client_call_details, request_iterator)
94+
return response

0 commit comments

Comments
 (0)