Skip to content

Commit ded0623

Browse files
[executors] feat: add support for gRPC server
1 parent 687c426 commit ded0623

File tree

7 files changed

+281
-0
lines changed

7 files changed

+281
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""gRPC endpoint for garf."""
16+
17+
import argparse
18+
import logging
19+
from concurrent import futures
20+
21+
import grpc
22+
from google.protobuf.json_format import MessageToDict
23+
from grpc_reflection.v1alpha import reflection
24+
25+
import garf_executors
26+
from garf_executors import garf_pb2, garf_pb2_grpc
27+
from garf_executors.entrypoints.tracer import initialize_tracer
28+
29+
30+
class GarfService(garf_pb2_grpc.GarfService):
31+
def Execute(self, request, context):
32+
query_executor = garf_executors.setup_executor(
33+
request.source, request.context.fetcher_parameters
34+
)
35+
execution_context = garf_executors.execution_context.ExecutionContext(
36+
**MessageToDict(request.context, preserving_proto_field_name=True)
37+
)
38+
result = query_executor.execute(
39+
query=request.query,
40+
title=request.title,
41+
context=execution_context,
42+
)
43+
return garf_pb2.ExecuteResponse(results=[result])
44+
45+
46+
if __name__ == '__main__':
47+
parser = argparse.ArgumentParser()
48+
parser.add_argument('--port', dest='port', default=50051, type=int)
49+
parser.add_argument(
50+
'--parallel-threshold', dest='parallel_threshold', default=10, type=int
51+
)
52+
args, _ = parser.parse_known_args()
53+
initialize_tracer()
54+
server = grpc.server(
55+
futures.ThreadPoolExecutor(max_workers=args.parallel_threshold)
56+
)
57+
58+
service = GarfService()
59+
garf_pb2_grpc.add_GarfServiceServicer_to_server(service, server)
60+
SERVICE_NAMES = (
61+
garf_pb2.DESCRIPTOR.services_by_name['GarfService'].full_name,
62+
reflection.SERVICE_NAME,
63+
)
64+
reflection.enable_server_reflection(SERVICE_NAMES, server)
65+
server.add_insecure_port(f'[::]:{args.port}')
66+
server.start()
67+
logging.info('Garf service started, listening on port %d', 50051)
68+
server.wait_for_termination()

libs/executors/garf_executors/garf_pb2.py

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
2+
"""Client and server classes corresponding to protobuf-defined services."""
3+
import grpc
4+
import warnings
5+
6+
from . import garf_pb2 as garf__pb2
7+
8+
GRPC_GENERATED_VERSION = '1.75.0'
9+
GRPC_VERSION = grpc.__version__
10+
_version_not_supported = False
11+
12+
try:
13+
from grpc._utilities import first_version_is_lower
14+
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
15+
except ImportError:
16+
_version_not_supported = True
17+
18+
if _version_not_supported:
19+
raise RuntimeError(
20+
f'The grpc package installed is at version {GRPC_VERSION},'
21+
+ f' but the generated code in garf_pb2_grpc.py depends on'
22+
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
23+
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
24+
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
25+
)
26+
27+
28+
class GarfServiceStub(object):
29+
"""Missing associated documentation comment in .proto file."""
30+
31+
def __init__(self, channel):
32+
"""Constructor.
33+
34+
Args:
35+
channel: A grpc.Channel.
36+
"""
37+
self.Execute = channel.unary_unary(
38+
'/garf.GarfService/Execute',
39+
request_serializer=garf__pb2.ExecuteRequest.SerializeToString,
40+
response_deserializer=garf__pb2.ExecuteResponse.FromString,
41+
_registered_method=True)
42+
43+
44+
class GarfServiceServicer(object):
45+
"""Missing associated documentation comment in .proto file."""
46+
47+
def Execute(self, request, context):
48+
"""Missing associated documentation comment in .proto file."""
49+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
50+
context.set_details('Method not implemented!')
51+
raise NotImplementedError('Method not implemented!')
52+
53+
54+
def add_GarfServiceServicer_to_server(servicer, server):
55+
rpc_method_handlers = {
56+
'Execute': grpc.unary_unary_rpc_method_handler(
57+
servicer.Execute,
58+
request_deserializer=garf__pb2.ExecuteRequest.FromString,
59+
response_serializer=garf__pb2.ExecuteResponse.SerializeToString,
60+
),
61+
}
62+
generic_handler = grpc.method_handlers_generic_handler(
63+
'garf.GarfService', rpc_method_handlers)
64+
server.add_generic_rpc_handlers((generic_handler,))
65+
server.add_registered_method_handlers('garf.GarfService', rpc_method_handlers)
66+
67+
68+
# This class is part of an EXPERIMENTAL API.
69+
class GarfService(object):
70+
"""Missing associated documentation comment in .proto file."""
71+
72+
@staticmethod
73+
def Execute(request,
74+
target,
75+
options=(),
76+
channel_credentials=None,
77+
call_credentials=None,
78+
insecure=False,
79+
compression=None,
80+
wait_for_ready=None,
81+
timeout=None,
82+
metadata=None):
83+
return grpc.experimental.unary_unary(
84+
request,
85+
target,
86+
'/garf.GarfService/Execute',
87+
garf__pb2.ExecuteRequest.SerializeToString,
88+
garf__pb2.ExecuteResponse.FromString,
89+
options,
90+
channel_credentials,
91+
insecure,
92+
call_credentials,
93+
compression,
94+
wait_for_ready,
95+
timeout,
96+
metadata,
97+
_registered_method=True)

libs/executors/generate_protos.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
python -m grpc_tools.protoc -I=../../protos/ \
2+
--python_out=./garf_executors --grpc_python_out=./garf_executors \
3+
../../protos/garf.proto
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import grpc
16+
from garf_executors import garf_pb2 as pb
17+
from garf_executors import garf_pb2_grpc
18+
19+
20+
def test_grpc_call():
21+
with grpc.insecure_channel('localhost:50051') as channel:
22+
stub = garf_pb2_grpc.GarfServiceStub(channel)
23+
request = pb.ExecuteRequest(
24+
source='rest',
25+
title='example',
26+
query='SELECT id, name AS model, data.color AS color FROM objects',
27+
context=pb.ExecutionContext(
28+
fetcher_parameters={'endpoint': 'https://api.restful-api.dev'},
29+
writer='csv',
30+
),
31+
)
32+
result = stub.Execute(request)
33+
assert 'CSV' in result.results[0]

libs/ruff.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ exclude = [
2626
"node_modules",
2727
"site-packages",
2828
"venv",
29+
"*pb2.py",
30+
"*pb2_grpc.py",
2931
]
3032

3133
line-length = 80

protos/garf.proto

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
syntax = "proto3";
2+
package garf;
3+
import "google/protobuf/struct.proto";
4+
5+
6+
service GarfService {
7+
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}
8+
}
9+
10+
message ExecuteRequest {
11+
string source = 1;
12+
string title = 2;
13+
string query = 3;
14+
ExecutionContext context = 4;
15+
}
16+
17+
18+
message ExecutionContext {
19+
QueryParameters query_parameters = 1;
20+
google.protobuf.Struct fetcher_parameters = 2;
21+
string writer = 3;
22+
google.protobuf.Struct writer_parameters = 4;
23+
}
24+
25+
26+
message QueryParameters {
27+
google.protobuf.Struct macro = 1;
28+
google.protobuf.Struct template = 2;
29+
}
30+
31+
message ExecuteResponse {
32+
repeated string results = 1;
33+
}

0 commit comments

Comments
 (0)