Skip to content

Commit acfeb0e

Browse files
feat(executors): add Fetch endpoint to gRPC server
1 parent 10ee80a commit acfeb0e

File tree

7 files changed

+160
-19
lines changed

7 files changed

+160
-19
lines changed

libs/executors/garf/executors/entrypoints/grpc_server.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@ def Execute(self, request, context):
4141
)
4242
return garf_pb2.ExecuteResponse(results=[result])
4343

44+
def Fetch(self, request, context):
45+
query_executor = garf.executors.setup_executor(
46+
request.source, request.context.fetcher_parameters
47+
)
48+
execution_context = garf.executors.execution_context.ExecutionContext(
49+
**MessageToDict(request.context, preserving_proto_field_name=True)
50+
)
51+
result = query_executor.fetcher.fetch(
52+
query_specification=request.query,
53+
title=request.title,
54+
args=execution_context.query_parameters,
55+
)
56+
return garf_pb2.FetchResponse(
57+
columns=result.column_names, rows=result.to_list(row_type='dict')
58+
)
59+
4460

4561
if __name__ == '__main__':
4662
parser = argparse.ArgumentParser()

libs/executors/garf/executors/garf_pb2.py

Lines changed: 17 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/executors/garf/executors/garf_pb2_grpc.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from . import garf_pb2 as garf__pb2
77

8-
GRPC_GENERATED_VERSION = '1.75.0'
8+
GRPC_GENERATED_VERSION = '1.76.0'
99
GRPC_VERSION = grpc.__version__
1010
_version_not_supported = False
1111

@@ -18,7 +18,7 @@
1818
if _version_not_supported:
1919
raise RuntimeError(
2020
f'The grpc package installed is at version {GRPC_VERSION},'
21-
+ f' but the generated code in garf_pb2_grpc.py depends on'
21+
+ ' but the generated code in garf_pb2_grpc.py depends on'
2222
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
2323
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
2424
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
@@ -39,6 +39,11 @@ def __init__(self, channel):
3939
request_serializer=garf__pb2.ExecuteRequest.SerializeToString,
4040
response_deserializer=garf__pb2.ExecuteResponse.FromString,
4141
_registered_method=True)
42+
self.Fetch = channel.unary_unary(
43+
'/garf.GarfService/Fetch',
44+
request_serializer=garf__pb2.FetchRequest.SerializeToString,
45+
response_deserializer=garf__pb2.FetchResponse.FromString,
46+
_registered_method=True)
4247

4348

4449
class GarfServiceServicer(object):
@@ -50,6 +55,12 @@ def Execute(self, request, context):
5055
context.set_details('Method not implemented!')
5156
raise NotImplementedError('Method not implemented!')
5257

58+
def Fetch(self, request, context):
59+
"""Missing associated documentation comment in .proto file."""
60+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
61+
context.set_details('Method not implemented!')
62+
raise NotImplementedError('Method not implemented!')
63+
5364

5465
def add_GarfServiceServicer_to_server(servicer, server):
5566
rpc_method_handlers = {
@@ -58,6 +69,11 @@ def add_GarfServiceServicer_to_server(servicer, server):
5869
request_deserializer=garf__pb2.ExecuteRequest.FromString,
5970
response_serializer=garf__pb2.ExecuteResponse.SerializeToString,
6071
),
72+
'Fetch': grpc.unary_unary_rpc_method_handler(
73+
servicer.Fetch,
74+
request_deserializer=garf__pb2.FetchRequest.FromString,
75+
response_serializer=garf__pb2.FetchResponse.SerializeToString,
76+
),
6177
}
6278
generic_handler = grpc.method_handlers_generic_handler(
6379
'garf.GarfService', rpc_method_handlers)
@@ -95,3 +111,30 @@ def Execute(request,
95111
timeout,
96112
metadata,
97113
_registered_method=True)
114+
115+
@staticmethod
116+
def Fetch(request,
117+
target,
118+
options=(),
119+
channel_credentials=None,
120+
call_credentials=None,
121+
insecure=False,
122+
compression=None,
123+
wait_for_ready=None,
124+
timeout=None,
125+
metadata=None):
126+
return grpc.experimental.unary_unary(
127+
request,
128+
target,
129+
'/garf.GarfService/Fetch',
130+
garf__pb2.FetchRequest.SerializeToString,
131+
garf__pb2.FetchResponse.FromString,
132+
options,
133+
channel_credentials,
134+
insecure,
135+
call_credentials,
136+
compression,
137+
wait_for_ready,
138+
timeout,
139+
metadata,
140+
_registered_method=True)

libs/executors/generate_protos.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
python -m grpc_tools.protoc -I=../../protos/ \
2-
--python_out=./garf_executors --grpc_python_out=./garf_executors \
2+
--python_out=./garf/executors --grpc_python_out=./garf/executors \
33
../../protos/garf.proto

libs/executors/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ server=[
5757
"fastapi[standard]",
5858
"opentelemetry-instrumentation-fastapi",
5959
"typer",
60+
"grpcio-reflection",
6061
]
6162
all = [
6263
"garf-executors[bq,sql,server,gcp]"

libs/executors/tests/end-to-end/test_grpc_server.py

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,80 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import pathlib
15+
1416
import grpc
1517
import pytest
1618
from garf.executors import garf_pb2 as pb
1719
from garf.executors import garf_pb2_grpc
20+
from google.protobuf.json_format import MessageToDict
21+
22+
_SCRIPT_PATH = pathlib.Path(__file__).parent
23+
_QUERY = """
24+
SELECT
25+
resource,
26+
dimensions.name AS name,
27+
metrics.clicks AS clicks
28+
FROM resource
29+
"""
30+
expected_output = [
31+
{
32+
'resource': 'Campaign A',
33+
'name': 'Ad Group 1',
34+
'clicks': 1500,
35+
},
36+
{
37+
'resource': 'Campaign B',
38+
'name': 'Ad Group 2',
39+
'clicks': 2300,
40+
},
41+
{
42+
'resource': 'Campaign C',
43+
'name': 'Ad Group 3',
44+
'clicks': 800,
45+
},
46+
{
47+
'resource': 'Campaign A',
48+
'name': 'Ad Group 4',
49+
'clicks': 3200,
50+
},
51+
]
1852

1953

20-
@pytest.mark.skip
21-
def test_grpc_call():
54+
def test_grpc_execute():
55+
fake_data = _SCRIPT_PATH / 'test.json'
2256
with grpc.insecure_channel('localhost:50051') as channel:
2357
stub = garf_pb2_grpc.GarfServiceStub(channel)
2458
request = pb.ExecuteRequest(
25-
source='rest',
59+
source='fake',
2660
title='example',
27-
query='SELECT id, name AS model, data.color AS color FROM objects',
61+
query=_QUERY,
2862
context=pb.ExecutionContext(
29-
fetcher_parameters={'endpoint': 'https://api.restful-api.dev'},
63+
fetcher_parameters={
64+
'data_location': str(fake_data),
65+
},
3066
writer='csv',
3167
),
3268
)
3369
result = stub.Execute(request)
3470
assert 'CSV' in result.results[0]
71+
72+
73+
def test_grpc_fetch(tmp_path):
74+
fake_data = _SCRIPT_PATH / 'test.json'
75+
with grpc.insecure_channel('localhost:50051') as channel:
76+
stub = garf_pb2_grpc.GarfServiceStub(channel)
77+
request = pb.FetchRequest(
78+
source='fake',
79+
title='example',
80+
query=_QUERY,
81+
context=pb.FetchContext(
82+
fetcher_parameters={
83+
'data_location': str(fake_data),
84+
}
85+
),
86+
)
87+
result = stub.Fetch(request)
88+
data = [MessageToDict(r) for r in result.rows]
89+
assert result.columns == ['resource', 'name', 'clicks']
90+
assert data == expected_output

protos/garf.proto

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,25 @@ import "google/protobuf/struct.proto";
55

66
service GarfService {
77
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}
8+
rpc Fetch(FetchRequest) returns (FetchResponse) {}
9+
10+
}
11+
12+
message FetchRequest {
13+
string source = 1;
14+
string title = 2;
15+
string query = 3;
16+
FetchContext context = 4;
17+
}
18+
19+
message FetchResponse {
20+
repeated string columns = 1;
21+
repeated google.protobuf.Struct rows = 2;
22+
}
23+
24+
message FetchContext {
25+
QueryParameters query_parameters = 1;
26+
google.protobuf.Struct fetcher_parameters = 2;
827
}
928

1029
message ExecuteRequest {

0 commit comments

Comments
 (0)