diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b9a88e2..2c0f2a9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -37,7 +37,7 @@ jobs: run: | uv pip install -e libs/core/.[all] uv pip install -e libs/io/.[test,all] - uv pip install -e libs/executors/.[all] + uv pip install -e libs/executors/.[tests,all] cd libs/${{ matrix.library }} pytest tests/unit - name: Generate Report for ${{ matrix.library }} @@ -71,6 +71,6 @@ jobs: run: | uv pip install -e libs/core/.[all] uv pip install -e libs/io/.[all] - uv pip install -e libs/executors/.[all] + uv pip install -e libs/executors/.[tests,all] cd libs/executors/ pytest tests/end-to-end/ diff --git a/libs/executors/garf/executors/entrypoints/grpc_server.py b/libs/executors/garf/executors/entrypoints/grpc_server.py index b7a7c47..f4aad2d 100644 --- a/libs/executors/garf/executors/entrypoints/grpc_server.py +++ b/libs/executors/garf/executors/entrypoints/grpc_server.py @@ -41,6 +41,22 @@ def Execute(self, request, context): ) return garf_pb2.ExecuteResponse(results=[result]) + def Fetch(self, request, context): + query_executor = garf.executors.setup_executor( + request.source, request.context.fetcher_parameters + ) + execution_context = garf.executors.execution_context.ExecutionContext( + **MessageToDict(request.context, preserving_proto_field_name=True) + ) + result = query_executor.fetcher.fetch( + query_specification=request.query, + title=request.title, + args=execution_context.query_parameters, + ) + return garf_pb2.FetchResponse( + columns=result.column_names, rows=result.to_list(row_type='dict') + ) + if __name__ == '__main__': parser = argparse.ArgumentParser() diff --git a/libs/executors/garf/executors/garf_pb2.py b/libs/executors/garf/executors/garf_pb2.py index c6d579f..bb69855 100644 --- a/libs/executors/garf/executors/garf_pb2.py +++ b/libs/executors/garf/executors/garf_pb2.py @@ -25,21 +25,27 @@ from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ngarf.proto\x12\x04garf\x1a\x1cgoogle/protobuf/struct.proto\"g\n\x0e\x45xecuteRequest\x12\x0e\n\x06source\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\r\n\x05query\x18\x03 \x01(\t\x12\'\n\x07\x63ontext\x18\x04 \x01(\x0b\x32\x16.garf.ExecutionContext\"\xbc\x01\n\x10\x45xecutionContext\x12/\n\x10query_parameters\x18\x01 \x01(\x0b\x32\x15.garf.QueryParameters\x12\x33\n\x12\x66\x65tcher_parameters\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x0e\n\x06writer\x18\x03 \x01(\t\x12\x32\n\x11writer_parameters\x18\x04 \x01(\x0b\x32\x17.google.protobuf.Struct\"d\n\x0fQueryParameters\x12&\n\x05macro\x18\x01 \x01(\x0b\x32\x17.google.protobuf.Struct\x12)\n\x08template\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\"\"\n\x0f\x45xecuteResponse\x12\x0f\n\x07results\x18\x01 \x03(\t2G\n\x0bGarfService\x12\x38\n\x07\x45xecute\x12\x14.garf.ExecuteRequest\x1a\x15.garf.ExecuteResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ngarf.proto\x12\x04garf\x1a\x1cgoogle/protobuf/struct.proto\"a\n\x0c\x46\x65tchRequest\x12\x0e\n\x06source\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\r\n\x05query\x18\x03 \x01(\t\x12#\n\x07\x63ontext\x18\x04 \x01(\x0b\x32\x12.garf.FetchContext\"G\n\rFetchResponse\x12\x0f\n\x07\x63olumns\x18\x01 \x03(\t\x12%\n\x04rows\x18\x02 \x03(\x0b\x32\x17.google.protobuf.Struct\"t\n\x0c\x46\x65tchContext\x12/\n\x10query_parameters\x18\x01 \x01(\x0b\x32\x15.garf.QueryParameters\x12\x33\n\x12\x66\x65tcher_parameters\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\"g\n\x0e\x45xecuteRequest\x12\x0e\n\x06source\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\r\n\x05query\x18\x03 \x01(\t\x12\'\n\x07\x63ontext\x18\x04 \x01(\x0b\x32\x16.garf.ExecutionContext\"\xbc\x01\n\x10\x45xecutionContext\x12/\n\x10query_parameters\x18\x01 \x01(\x0b\x32\x15.garf.QueryParameters\x12\x33\n\x12\x66\x65tcher_parameters\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x0e\n\x06writer\x18\x03 \x01(\t\x12\x32\n\x11writer_parameters\x18\x04 \x01(\x0b\x32\x17.google.protobuf.Struct\"d\n\x0fQueryParameters\x12&\n\x05macro\x18\x01 \x01(\x0b\x32\x17.google.protobuf.Struct\x12)\n\x08template\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\"\"\n\x0f\x45xecuteResponse\x12\x0f\n\x07results\x18\x01 \x03(\t2{\n\x0bGarfService\x12\x38\n\x07\x45xecute\x12\x14.garf.ExecuteRequest\x1a\x15.garf.ExecuteResponse\"\x00\x12\x32\n\x05\x46\x65tch\x12\x12.garf.FetchRequest\x1a\x13.garf.FetchResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'garf_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_EXECUTEREQUEST']._serialized_start=50 - _globals['_EXECUTEREQUEST']._serialized_end=153 - _globals['_EXECUTIONCONTEXT']._serialized_start=156 - _globals['_EXECUTIONCONTEXT']._serialized_end=344 - _globals['_QUERYPARAMETERS']._serialized_start=346 - _globals['_QUERYPARAMETERS']._serialized_end=446 - _globals['_EXECUTERESPONSE']._serialized_start=448 - _globals['_EXECUTERESPONSE']._serialized_end=482 - _globals['_GARFSERVICE']._serialized_start=484 - _globals['_GARFSERVICE']._serialized_end=555 + _globals['_FETCHREQUEST']._serialized_start=50 + _globals['_FETCHREQUEST']._serialized_end=147 + _globals['_FETCHRESPONSE']._serialized_start=149 + _globals['_FETCHRESPONSE']._serialized_end=220 + _globals['_FETCHCONTEXT']._serialized_start=222 + _globals['_FETCHCONTEXT']._serialized_end=338 + _globals['_EXECUTEREQUEST']._serialized_start=340 + _globals['_EXECUTEREQUEST']._serialized_end=443 + _globals['_EXECUTIONCONTEXT']._serialized_start=446 + _globals['_EXECUTIONCONTEXT']._serialized_end=634 + _globals['_QUERYPARAMETERS']._serialized_start=636 + _globals['_QUERYPARAMETERS']._serialized_end=736 + _globals['_EXECUTERESPONSE']._serialized_start=738 + _globals['_EXECUTERESPONSE']._serialized_end=772 + _globals['_GARFSERVICE']._serialized_start=774 + _globals['_GARFSERVICE']._serialized_end=897 # @@protoc_insertion_point(module_scope) diff --git a/libs/executors/garf/executors/garf_pb2_grpc.py b/libs/executors/garf/executors/garf_pb2_grpc.py index bead4d6..164c8fc 100644 --- a/libs/executors/garf/executors/garf_pb2_grpc.py +++ b/libs/executors/garf/executors/garf_pb2_grpc.py @@ -5,7 +5,7 @@ from . import garf_pb2 as garf__pb2 -GRPC_GENERATED_VERSION = '1.75.0' +GRPC_GENERATED_VERSION = '1.76.0' GRPC_VERSION = grpc.__version__ _version_not_supported = False @@ -18,7 +18,7 @@ if _version_not_supported: raise RuntimeError( f'The grpc package installed is at version {GRPC_VERSION},' - + f' but the generated code in garf_pb2_grpc.py depends on' + + ' but the generated code in garf_pb2_grpc.py depends on' + f' grpcio>={GRPC_GENERATED_VERSION}.' + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' @@ -39,6 +39,11 @@ def __init__(self, channel): request_serializer=garf__pb2.ExecuteRequest.SerializeToString, response_deserializer=garf__pb2.ExecuteResponse.FromString, _registered_method=True) + self.Fetch = channel.unary_unary( + '/garf.GarfService/Fetch', + request_serializer=garf__pb2.FetchRequest.SerializeToString, + response_deserializer=garf__pb2.FetchResponse.FromString, + _registered_method=True) class GarfServiceServicer(object): @@ -50,6 +55,12 @@ def Execute(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def Fetch(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_GarfServiceServicer_to_server(servicer, server): rpc_method_handlers = { @@ -58,6 +69,11 @@ def add_GarfServiceServicer_to_server(servicer, server): request_deserializer=garf__pb2.ExecuteRequest.FromString, response_serializer=garf__pb2.ExecuteResponse.SerializeToString, ), + 'Fetch': grpc.unary_unary_rpc_method_handler( + servicer.Fetch, + request_deserializer=garf__pb2.FetchRequest.FromString, + response_serializer=garf__pb2.FetchResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'garf.GarfService', rpc_method_handlers) @@ -95,3 +111,30 @@ def Execute(request, timeout, metadata, _registered_method=True) + + @staticmethod + def Fetch(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/garf.GarfService/Fetch', + garf__pb2.FetchRequest.SerializeToString, + garf__pb2.FetchResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/libs/executors/generate_protos.sh b/libs/executors/generate_protos.sh index bff497c..bf314a7 100644 --- a/libs/executors/generate_protos.sh +++ b/libs/executors/generate_protos.sh @@ -1,3 +1,3 @@ python -m grpc_tools.protoc -I=../../protos/ \ - --python_out=./garf_executors --grpc_python_out=./garf_executors \ + --python_out=./garf/executors --grpc_python_out=./garf/executors \ ../../protos/garf.proto diff --git a/libs/executors/pyproject.toml b/libs/executors/pyproject.toml index 0f63aeb..ea6eba3 100644 --- a/libs/executors/pyproject.toml +++ b/libs/executors/pyproject.toml @@ -57,6 +57,13 @@ server=[ "fastapi[standard]", "opentelemetry-instrumentation-fastapi", "typer", + "grpcio-reflection", +] +tests = [ + "pytest", + "pytest-mock", + "pytest-xdist", + "pytest-grpc", ] all = [ "garf-executors[bq,sql,server,gcp]" diff --git a/libs/executors/tests/end-to-end/test_grpc_server.py b/libs/executors/tests/end-to-end/test_grpc_server.py index 775aa93..cb1b656 100644 --- a/libs/executors/tests/end-to-end/test_grpc_server.py +++ b/libs/executors/tests/end-to-end/test_grpc_server.py @@ -11,24 +11,92 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pathlib + import grpc import pytest from garf.executors import garf_pb2 as pb from garf.executors import garf_pb2_grpc +from garf.executors.entrypoints import grpc_server +from google.protobuf.json_format import MessageToDict + +_SCRIPT_PATH = pathlib.Path(__file__).parent +_QUERY = """ + SELECT + resource, + dimensions.name AS name, + metrics.clicks AS clicks + FROM resource +""" +expected_output = [ + { + 'resource': 'Campaign A', + 'name': 'Ad Group 1', + 'clicks': 1500, + }, + { + 'resource': 'Campaign B', + 'name': 'Ad Group 2', + 'clicks': 2300, + }, + { + 'resource': 'Campaign C', + 'name': 'Ad Group 3', + 'clicks': 800, + }, + { + 'resource': 'Campaign A', + 'name': 'Ad Group 4', + 'clicks': 3200, + }, +] + + +@pytest.fixture(scope='module') +def grpc_add_to_server(): + return garf_pb2_grpc.add_GarfServiceServicer_to_server + + +@pytest.fixture(scope='module') +def grpc_servicer(): + return grpc_server.GarfService() + + +@pytest.fixture(scope='module') +def grpc_stub_cls(grpc_channel): + return garf_pb2_grpc.GarfServiceStub + + +def test_execute(grpc_stub): + fake_data = _SCRIPT_PATH / 'test.json' + request = pb.ExecuteRequest( + source='fake', + title='example', + query=_QUERY, + context=pb.ExecutionContext( + fetcher_parameters={ + 'data_location': str(fake_data), + }, + writer='csv', + ), + ) + result = grpc_stub.Execute(request) + assert 'CSV' in result.results[0] -@pytest.mark.skip -def test_grpc_call(): - with grpc.insecure_channel('localhost:50051') as channel: - stub = garf_pb2_grpc.GarfServiceStub(channel) - request = pb.ExecuteRequest( - source='rest', - title='example', - query='SELECT id, name AS model, data.color AS color FROM objects', - context=pb.ExecutionContext( - fetcher_parameters={'endpoint': 'https://api.restful-api.dev'}, - writer='csv', - ), - ) - result = stub.Execute(request) - assert 'CSV' in result.results[0] +def test_fetch(grpc_stub): + fake_data = _SCRIPT_PATH / 'test.json' + request = pb.FetchRequest( + source='fake', + title='example', + query=_QUERY, + context=pb.FetchContext( + fetcher_parameters={ + 'data_location': str(fake_data), + } + ), + ) + result = grpc_stub.Fetch(request) + data = [MessageToDict(r) for r in result.rows] + assert result.columns == ['resource', 'name', 'clicks'] + assert data == expected_output diff --git a/protos/garf.proto b/protos/garf.proto index c6cf60c..4e42507 100644 --- a/protos/garf.proto +++ b/protos/garf.proto @@ -5,6 +5,25 @@ import "google/protobuf/struct.proto"; service GarfService { rpc Execute(ExecuteRequest) returns (ExecuteResponse) {} + rpc Fetch(FetchRequest) returns (FetchResponse) {} + +} + +message FetchRequest { + string source = 1; + string title = 2; + string query = 3; + FetchContext context = 4; +} + +message FetchResponse { + repeated string columns = 1; + repeated google.protobuf.Struct rows = 2; +} + +message FetchContext { + QueryParameters query_parameters = 1; + google.protobuf.Struct fetcher_parameters = 2; } message ExecuteRequest {