Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
run: |
uv pip install -e libs/core/.[all]
uv pip install -e libs/io/.[test,all]
uv pip install -e libs/executors/.[all]
uv pip install -e libs/executors/.[tests,all]
cd libs/${{ matrix.library }}
pytest tests/unit
- name: Generate Report for ${{ matrix.library }}
Expand Down Expand Up @@ -71,6 +71,6 @@ jobs:
run: |
uv pip install -e libs/core/.[all]
uv pip install -e libs/io/.[all]
uv pip install -e libs/executors/.[all]
uv pip install -e libs/executors/.[tests,all]
cd libs/executors/
pytest tests/end-to-end/
16 changes: 16 additions & 0 deletions libs/executors/garf/executors/entrypoints/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ def Execute(self, request, context):
)
return garf_pb2.ExecuteResponse(results=[result])

def Fetch(self, request, context):
query_executor = garf.executors.setup_executor(
request.source, request.context.fetcher_parameters
)
execution_context = garf.executors.execution_context.ExecutionContext(
**MessageToDict(request.context, preserving_proto_field_name=True)
)
result = query_executor.fetcher.fetch(
query_specification=request.query,
title=request.title,
args=execution_context.query_parameters,
)
return garf_pb2.FetchResponse(
columns=result.column_names, rows=result.to_list(row_type='dict')
)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand Down
28 changes: 17 additions & 11 deletions libs/executors/garf/executors/garf_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 45 additions & 2 deletions libs/executors/garf/executors/garf_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from . import garf_pb2 as garf__pb2

GRPC_GENERATED_VERSION = '1.75.0'
GRPC_GENERATED_VERSION = '1.76.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

Expand All @@ -18,7 +18,7 @@
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in garf_pb2_grpc.py depends on'
+ ' but the generated code in garf_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
Expand All @@ -39,6 +39,11 @@ def __init__(self, channel):
request_serializer=garf__pb2.ExecuteRequest.SerializeToString,
response_deserializer=garf__pb2.ExecuteResponse.FromString,
_registered_method=True)
self.Fetch = channel.unary_unary(
'/garf.GarfService/Fetch',
request_serializer=garf__pb2.FetchRequest.SerializeToString,
response_deserializer=garf__pb2.FetchResponse.FromString,
_registered_method=True)


class GarfServiceServicer(object):
Expand All @@ -50,6 +55,12 @@ def Execute(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Fetch(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_GarfServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand All @@ -58,6 +69,11 @@ def add_GarfServiceServicer_to_server(servicer, server):
request_deserializer=garf__pb2.ExecuteRequest.FromString,
response_serializer=garf__pb2.ExecuteResponse.SerializeToString,
),
'Fetch': grpc.unary_unary_rpc_method_handler(
servicer.Fetch,
request_deserializer=garf__pb2.FetchRequest.FromString,
response_serializer=garf__pb2.FetchResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'garf.GarfService', rpc_method_handlers)
Expand Down Expand Up @@ -95,3 +111,30 @@ def Execute(request,
timeout,
metadata,
_registered_method=True)

@staticmethod
def Fetch(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/garf.GarfService/Fetch',
garf__pb2.FetchRequest.SerializeToString,
garf__pb2.FetchResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
2 changes: 1 addition & 1 deletion libs/executors/generate_protos.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
python -m grpc_tools.protoc -I=../../protos/ \
--python_out=./garf_executors --grpc_python_out=./garf_executors \
--python_out=./garf/executors --grpc_python_out=./garf/executors \
../../protos/garf.proto
7 changes: 7 additions & 0 deletions libs/executors/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ server=[
"fastapi[standard]",
"opentelemetry-instrumentation-fastapi",
"typer",
"grpcio-reflection",
]
tests = [
"pytest",
"pytest-mock",
"pytest-xdist",
"pytest-grpc",
]
all = [
"garf-executors[bq,sql,server,gcp]"
Expand Down
98 changes: 83 additions & 15 deletions libs/executors/tests/end-to-end/test_grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,92 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib

import grpc
import pytest
from garf.executors import garf_pb2 as pb
from garf.executors import garf_pb2_grpc
from garf.executors.entrypoints import grpc_server
from google.protobuf.json_format import MessageToDict

_SCRIPT_PATH = pathlib.Path(__file__).parent
_QUERY = """
SELECT
resource,
dimensions.name AS name,
metrics.clicks AS clicks
FROM resource
"""
expected_output = [
{
'resource': 'Campaign A',
'name': 'Ad Group 1',
'clicks': 1500,
},
{
'resource': 'Campaign B',
'name': 'Ad Group 2',
'clicks': 2300,
},
{
'resource': 'Campaign C',
'name': 'Ad Group 3',
'clicks': 800,
},
{
'resource': 'Campaign A',
'name': 'Ad Group 4',
'clicks': 3200,
},
]


@pytest.fixture(scope='module')
def grpc_add_to_server():
return garf_pb2_grpc.add_GarfServiceServicer_to_server


@pytest.fixture(scope='module')
def grpc_servicer():
return grpc_server.GarfService()


@pytest.fixture(scope='module')
def grpc_stub_cls(grpc_channel):
return garf_pb2_grpc.GarfServiceStub


def test_execute(grpc_stub):
fake_data = _SCRIPT_PATH / 'test.json'
request = pb.ExecuteRequest(
source='fake',
title='example',
query=_QUERY,
context=pb.ExecutionContext(
fetcher_parameters={
'data_location': str(fake_data),
},
writer='csv',
),
)
result = grpc_stub.Execute(request)
assert 'CSV' in result.results[0]


@pytest.mark.skip
def test_grpc_call():
with grpc.insecure_channel('localhost:50051') as channel:
stub = garf_pb2_grpc.GarfServiceStub(channel)
request = pb.ExecuteRequest(
source='rest',
title='example',
query='SELECT id, name AS model, data.color AS color FROM objects',
context=pb.ExecutionContext(
fetcher_parameters={'endpoint': 'https://api.restful-api.dev'},
writer='csv',
),
)
result = stub.Execute(request)
assert 'CSV' in result.results[0]
def test_fetch(grpc_stub):
fake_data = _SCRIPT_PATH / 'test.json'
request = pb.FetchRequest(
source='fake',
title='example',
query=_QUERY,
context=pb.FetchContext(
fetcher_parameters={
'data_location': str(fake_data),
}
),
)
result = grpc_stub.Fetch(request)
data = [MessageToDict(r) for r in result.rows]
assert result.columns == ['resource', 'name', 'clicks']
assert data == expected_output
19 changes: 19 additions & 0 deletions protos/garf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ import "google/protobuf/struct.proto";

service GarfService {
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}
rpc Fetch(FetchRequest) returns (FetchResponse) {}

}

message FetchRequest {
string source = 1;
string title = 2;
string query = 3;
FetchContext context = 4;
}

message FetchResponse {
repeated string columns = 1;
repeated google.protobuf.Struct rows = 2;
}

message FetchContext {
QueryParameters query_parameters = 1;
google.protobuf.Struct fetcher_parameters = 2;
}

message ExecuteRequest {
Expand Down