diff --git a/src/isolate/backends/remote.py b/src/isolate/backends/remote.py index 0d62893..a78964a 100644 --- a/src/isolate/backends/remote.py +++ b/src/isolate/backends/remote.py @@ -115,6 +115,7 @@ def run( was_it_raised=False, ), environments=self.definitions, + stream_logs=True, # Default to streaming logs ) return_value = [] diff --git a/src/isolate/connections/grpc/definitions/agent_pb2.py b/src/isolate/connections/grpc/definitions/agent_pb2.py index 92d5792..8d58f00 100644 --- a/src/isolate/connections/grpc/definitions/agent_pb2.py +++ b/src/isolate/connections/grpc/definitions/agent_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: agent.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -20,8 +20,8 @@ _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) -if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_FUNCTIONCALL']._serialized_start=29 _globals['_FUNCTIONCALL']._serialized_end=139 _globals['_AGENT']._serialized_start=141 diff --git a/src/isolate/connections/grpc/definitions/agent_pb2_grpc.py b/src/isolate/connections/grpc/definitions/agent_pb2_grpc.py index f06adb0..d1eaae8 100644 --- a/src/isolate/connections/grpc/definitions/agent_pb2_grpc.py +++ b/src/isolate/connections/grpc/definitions/agent_pb2_grpc.py @@ -1,35 +1,10 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc -import warnings from isolate.connections.grpc.definitions import agent_pb2 as agent__pb2 from isolate.connections.grpc.definitions import common_pb2 as common__pb2 -GRPC_GENERATED_VERSION = '1.64.0' -GRPC_VERSION = grpc.__version__ -EXPECTED_ERROR_RELEASE = '1.65.0' -SCHEDULED_RELEASE_DATE = 'June 25, 2024' -_version_not_supported = False - -try: - from grpc._utilities import first_version_is_lower - _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) -except ImportError: - _version_not_supported = True - -if _version_not_supported: - warnings.warn( - f'The grpc package installed is at version {GRPC_VERSION},' - + f' but the generated code in agent_pb2_grpc.py depends on' - + f' grpcio>={GRPC_GENERATED_VERSION}.' - + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' - + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' - + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' - + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', - RuntimeWarning - ) - class AgentStub(object): """Missing associated documentation comment in .proto file.""" @@ -44,7 +19,7 @@ def __init__(self, channel): '/Agent/Run', request_serializer=agent__pb2.FunctionCall.SerializeToString, response_deserializer=common__pb2.PartialRunResult.FromString, - _registered_method=True) + ) class AgentServicer(object): @@ -69,7 +44,6 @@ def add_AgentServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'Agent', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - server.add_registered_method_handlers('Agent', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -87,18 +61,8 @@ def Run(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream( - request, - target, - '/Agent/Run', + return grpc.experimental.unary_stream(request, target, '/Agent/Run', agent__pb2.FunctionCall.SerializeToString, common__pb2.PartialRunResult.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/isolate/connections/grpc/definitions/common_pb2.py b/src/isolate/connections/grpc/definitions/common_pb2.py index 0b93077..83a70f2 100644 --- a/src/isolate/connections/grpc/definitions/common_pb2.py +++ b/src/isolate/connections/grpc/definitions/common_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: common.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -20,8 +20,8 @@ _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'common_pb2', _globals) -if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_LOGSOURCE']._serialized_start=426 _globals['_LOGSOURCE']._serialized_end=472 _globals['_LOGLEVEL']._serialized_start=474 diff --git a/src/isolate/connections/grpc/definitions/common_pb2_grpc.py b/src/isolate/connections/grpc/definitions/common_pb2_grpc.py index 6dae6e8..2daafff 100644 --- a/src/isolate/connections/grpc/definitions/common_pb2_grpc.py +++ b/src/isolate/connections/grpc/definitions/common_pb2_grpc.py @@ -1,29 +1,4 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc -import warnings - -GRPC_GENERATED_VERSION = '1.64.0' -GRPC_VERSION = grpc.__version__ -EXPECTED_ERROR_RELEASE = '1.65.0' -SCHEDULED_RELEASE_DATE = 'June 25, 2024' -_version_not_supported = False - -try: - from grpc._utilities import first_version_is_lower - _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) -except ImportError: - _version_not_supported = True - -if _version_not_supported: - warnings.warn( - f'The grpc package installed is at version {GRPC_VERSION},' - + f' but the generated code in common_pb2_grpc.py depends on' - + f' grpcio>={GRPC_GENERATED_VERSION}.' - + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' - + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' - + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' - + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', - RuntimeWarning - ) diff --git a/src/isolate/logger.py b/src/isolate/logger.py index 23e4f5e..9727b0b 100644 --- a/src/isolate/logger.py +++ b/src/isolate/logger.py @@ -51,6 +51,3 @@ def from_env(cls) -> "IsolateLogger": print("Failed to parse ISOLATE_LOG_LABELS") return cls.with_env_expanded(labels=_labels) - - -ENV_LOGGER = IsolateLogger.from_env() diff --git a/src/isolate/server/definitions/server.proto b/src/isolate/server/definitions/server.proto index b8d3b96..1df105a 100644 --- a/src/isolate/server/definitions/server.proto +++ b/src/isolate/server/definitions/server.proto @@ -25,6 +25,7 @@ message BoundFunction { repeated EnvironmentDefinition environments = 1; SerializedObject function = 2; optional SerializedObject setup_func = 3; + bool stream_logs = 4; } message EnvironmentDefinition { diff --git a/src/isolate/server/definitions/server_pb2.py b/src/isolate/server/definitions/server_pb2.py index ef61c2f..dbab29d 100644 --- a/src/isolate/server/definitions/server_pb2.py +++ b/src/isolate/server/definitions/server_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: server.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -16,41 +16,41 @@ from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\x9d\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"{\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xf4\x01\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\xb2\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x12\x13\n\x0bstream_logs\x18\x04 \x01(\x08\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"{\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xf4\x01\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'server_pb2', _globals) -if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._loaded_options = None +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._options = None _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_options = b'8\001' _globals['_BOUNDFUNCTION']._serialized_start=61 - _globals['_BOUNDFUNCTION']._serialized_end=218 - _globals['_ENVIRONMENTDEFINITION']._serialized_start=220 - _globals['_ENVIRONMENTDEFINITION']._serialized_end=320 - _globals['_SUBMITREQUEST']._serialized_start=322 - _globals['_SUBMITREQUEST']._serialized_end=404 - _globals['_TASKMETADATA']._serialized_start=406 - _globals['_TASKMETADATA']._serialized_end=529 - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_start=478 - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_end=529 - _globals['_SUBMITRESPONSE']._serialized_start=531 - _globals['_SUBMITRESPONSE']._serialized_end=564 - _globals['_SETMETADATAREQUEST']._serialized_start=566 - _globals['_SETMETADATAREQUEST']._serialized_end=636 - _globals['_SETMETADATARESPONSE']._serialized_start=638 - _globals['_SETMETADATARESPONSE']._serialized_end=659 - _globals['_LISTREQUEST']._serialized_start=661 - _globals['_LISTREQUEST']._serialized_end=674 - _globals['_TASKINFO']._serialized_start=676 - _globals['_TASKINFO']._serialized_end=703 - _globals['_LISTRESPONSE']._serialized_start=705 - _globals['_LISTRESPONSE']._serialized_end=745 - _globals['_CANCELREQUEST']._serialized_start=747 - _globals['_CANCELREQUEST']._serialized_end=779 - _globals['_CANCELRESPONSE']._serialized_start=781 - _globals['_CANCELRESPONSE']._serialized_end=797 - _globals['_ISOLATE']._serialized_start=800 - _globals['_ISOLATE']._serialized_end=1044 + _globals['_BOUNDFUNCTION']._serialized_end=239 + _globals['_ENVIRONMENTDEFINITION']._serialized_start=241 + _globals['_ENVIRONMENTDEFINITION']._serialized_end=341 + _globals['_SUBMITREQUEST']._serialized_start=343 + _globals['_SUBMITREQUEST']._serialized_end=425 + _globals['_TASKMETADATA']._serialized_start=427 + _globals['_TASKMETADATA']._serialized_end=550 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_start=499 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_end=550 + _globals['_SUBMITRESPONSE']._serialized_start=552 + _globals['_SUBMITRESPONSE']._serialized_end=585 + _globals['_SETMETADATAREQUEST']._serialized_start=587 + _globals['_SETMETADATAREQUEST']._serialized_end=657 + _globals['_SETMETADATARESPONSE']._serialized_start=659 + _globals['_SETMETADATARESPONSE']._serialized_end=680 + _globals['_LISTREQUEST']._serialized_start=682 + _globals['_LISTREQUEST']._serialized_end=695 + _globals['_TASKINFO']._serialized_start=697 + _globals['_TASKINFO']._serialized_end=724 + _globals['_LISTRESPONSE']._serialized_start=726 + _globals['_LISTRESPONSE']._serialized_end=766 + _globals['_CANCELREQUEST']._serialized_start=768 + _globals['_CANCELREQUEST']._serialized_end=800 + _globals['_CANCELRESPONSE']._serialized_start=802 + _globals['_CANCELRESPONSE']._serialized_end=818 + _globals['_ISOLATE']._serialized_start=821 + _globals['_ISOLATE']._serialized_end=1065 # @@protoc_insertion_point(module_scope) diff --git a/src/isolate/server/definitions/server_pb2.pyi b/src/isolate/server/definitions/server_pb2.pyi index 7aa5986..30504e7 100644 --- a/src/isolate/server/definitions/server_pb2.pyi +++ b/src/isolate/server/definitions/server_pb2.pyi @@ -25,21 +25,24 @@ class BoundFunction(google.protobuf.message.Message): ENVIRONMENTS_FIELD_NUMBER: builtins.int FUNCTION_FIELD_NUMBER: builtins.int SETUP_FUNC_FIELD_NUMBER: builtins.int + STREAM_LOGS_FIELD_NUMBER: builtins.int @property def environments(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EnvironmentDefinition]: ... @property def function(self) -> common_pb2.SerializedObject: ... @property def setup_func(self) -> common_pb2.SerializedObject: ... + stream_logs: builtins.bool def __init__( self, *, environments: collections.abc.Iterable[global___EnvironmentDefinition] | None = ..., function: common_pb2.SerializedObject | None = ..., setup_func: common_pb2.SerializedObject | None = ..., + stream_logs: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func", "stream_logs", b"stream_logs"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["_setup_func", b"_setup_func"]) -> typing_extensions.Literal["setup_func"] | None: ... global___BoundFunction = BoundFunction diff --git a/src/isolate/server/definitions/server_pb2_grpc.py b/src/isolate/server/definitions/server_pb2_grpc.py index 86740be..429034e 100644 --- a/src/isolate/server/definitions/server_pb2_grpc.py +++ b/src/isolate/server/definitions/server_pb2_grpc.py @@ -1,35 +1,10 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc -import warnings from isolate.connections.grpc.definitions import common_pb2 as common__pb2 from isolate.server.definitions import server_pb2 as server__pb2 -GRPC_GENERATED_VERSION = '1.64.0' -GRPC_VERSION = grpc.__version__ -EXPECTED_ERROR_RELEASE = '1.65.0' -SCHEDULED_RELEASE_DATE = 'June 25, 2024' -_version_not_supported = False - -try: - from grpc._utilities import first_version_is_lower - _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) -except ImportError: - _version_not_supported = True - -if _version_not_supported: - warnings.warn( - f'The grpc package installed is at version {GRPC_VERSION},' - + f' but the generated code in server_pb2_grpc.py depends on' - + f' grpcio>={GRPC_GENERATED_VERSION}.' - + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' - + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' - + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' - + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', - RuntimeWarning - ) - class IsolateStub(object): """Missing associated documentation comment in .proto file.""" @@ -44,27 +19,27 @@ def __init__(self, channel): '/Isolate/Run', request_serializer=server__pb2.BoundFunction.SerializeToString, response_deserializer=common__pb2.PartialRunResult.FromString, - _registered_method=True) + ) self.Submit = channel.unary_unary( '/Isolate/Submit', request_serializer=server__pb2.SubmitRequest.SerializeToString, response_deserializer=server__pb2.SubmitResponse.FromString, - _registered_method=True) + ) self.SetMetadata = channel.unary_unary( '/Isolate/SetMetadata', request_serializer=server__pb2.SetMetadataRequest.SerializeToString, response_deserializer=server__pb2.SetMetadataResponse.FromString, - _registered_method=True) + ) self.List = channel.unary_unary( '/Isolate/List', request_serializer=server__pb2.ListRequest.SerializeToString, response_deserializer=server__pb2.ListResponse.FromString, - _registered_method=True) + ) self.Cancel = channel.unary_unary( '/Isolate/Cancel', request_serializer=server__pb2.CancelRequest.SerializeToString, response_deserializer=server__pb2.CancelResponse.FromString, - _registered_method=True) + ) class IsolateServicer(object): @@ -138,7 +113,6 @@ def add_IsolateServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'Isolate', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - server.add_registered_method_handlers('Isolate', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -156,21 +130,11 @@ def Run(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream( - request, - target, - '/Isolate/Run', + return grpc.experimental.unary_stream(request, target, '/Isolate/Run', server__pb2.BoundFunction.SerializeToString, common__pb2.PartialRunResult.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Submit(request, @@ -183,21 +147,11 @@ def Submit(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/Isolate/Submit', + return grpc.experimental.unary_unary(request, target, '/Isolate/Submit', server__pb2.SubmitRequest.SerializeToString, server__pb2.SubmitResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def SetMetadata(request, @@ -210,21 +164,11 @@ def SetMetadata(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/Isolate/SetMetadata', + return grpc.experimental.unary_unary(request, target, '/Isolate/SetMetadata', server__pb2.SetMetadataRequest.SerializeToString, server__pb2.SetMetadataResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def List(request, @@ -237,21 +181,11 @@ def List(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/Isolate/List', + return grpc.experimental.unary_unary(request, target, '/Isolate/List', server__pb2.ListRequest.SerializeToString, server__pb2.ListResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Cancel(request, @@ -264,18 +198,8 @@ def Cancel(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/Isolate/Cancel', + return grpc.experimental.unary_unary(request, target, '/Isolate/Cancel', server__pb2.CancelRequest.SerializeToString, server__pb2.CancelResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/isolate/server/health/health_pb2.py b/src/isolate/server/health/health_pb2.py index eb5e42d..361dc4b 100644 --- a/src/isolate/server/health/health_pb2.py +++ b/src/isolate/server/health/health_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: health.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -19,8 +19,8 @@ _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'health_pb2', _globals) -if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_HEALTHCHECKREQUEST']._serialized_start=32 _globals['_HEALTHCHECKREQUEST']._serialized_end=69 _globals['_HEALTHCHECKRESPONSE']._serialized_start=72 diff --git a/src/isolate/server/health/health_pb2_grpc.py b/src/isolate/server/health/health_pb2_grpc.py index 70500f3..ee795a7 100644 --- a/src/isolate/server/health/health_pb2_grpc.py +++ b/src/isolate/server/health/health_pb2_grpc.py @@ -1,34 +1,9 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc -import warnings from isolate.server.health import health_pb2 as health__pb2 -GRPC_GENERATED_VERSION = '1.64.0' -GRPC_VERSION = grpc.__version__ -EXPECTED_ERROR_RELEASE = '1.65.0' -SCHEDULED_RELEASE_DATE = 'June 25, 2024' -_version_not_supported = False - -try: - from grpc._utilities import first_version_is_lower - _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) -except ImportError: - _version_not_supported = True - -if _version_not_supported: - warnings.warn( - f'The grpc package installed is at version {GRPC_VERSION},' - + f' but the generated code in health_pb2_grpc.py depends on' - + f' grpcio>={GRPC_GENERATED_VERSION}.' - + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' - + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' - + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' - + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', - RuntimeWarning - ) - class HealthStub(object): """Missing associated documentation comment in .proto file.""" @@ -43,12 +18,12 @@ def __init__(self, channel): '/grpc.health.v1.Health/Check', request_serializer=health__pb2.HealthCheckRequest.SerializeToString, response_deserializer=health__pb2.HealthCheckResponse.FromString, - _registered_method=True) + ) self.Watch = channel.unary_stream( '/grpc.health.v1.Health/Watch', request_serializer=health__pb2.HealthCheckRequest.SerializeToString, response_deserializer=health__pb2.HealthCheckResponse.FromString, - _registered_method=True) + ) class HealthServicer(object): @@ -83,7 +58,6 @@ def add_HealthServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'grpc.health.v1.Health', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - server.add_registered_method_handlers('grpc.health.v1.Health', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -101,21 +75,11 @@ def Check(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/grpc.health.v1.Health/Check', + return grpc.experimental.unary_unary(request, target, '/grpc.health.v1.Health/Check', health__pb2.HealthCheckRequest.SerializeToString, health__pb2.HealthCheckResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Watch(request, @@ -128,18 +92,8 @@ def Watch(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream( - request, - target, - '/grpc.health.v1.Health/Watch', + return grpc.experimental.unary_stream(request, target, '/grpc.health.v1.Health/Watch', health__pb2.HealthCheckRequest.SerializeToString, health__pb2.HealthCheckResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/isolate/server/server.py b/src/isolate/server/server.py index 903fed2..7d2eb2c 100644 --- a/src/isolate/server/server.py +++ b/src/isolate/server/server.py @@ -29,7 +29,7 @@ from isolate.backends.virtualenv import VirtualPythonEnvironment from isolate.connections.grpc import AgentError, LocalPythonGRPC from isolate.connections.grpc.configuration import get_default_options -from isolate.logger import ENV_LOGGER, IsolateLogger +from isolate.logger import IsolateLogger from isolate.logs import Log, LogLevel, LogSource from isolate.server import definitions, health from isolate.server.health_server import HealthServicer @@ -72,7 +72,7 @@ def __str__(self) -> str: @dataclass class RunnerAgent: stub: definitions.AgentStub - message_queue: Queue + message_queue: Queue[definitions.PartialRunResult] _bound_context: ExitStack _channel_state_history: list[grpc.ChannelConnectivity] = field(default_factory=list) @@ -175,7 +175,7 @@ class RunTask: request: definitions.BoundFunction future: futures.Future | None = None agent: RunnerAgent | None = None - logger: IsolateLogger = ENV_LOGGER + logger: IsolateLogger = field(default_factory=IsolateLogger.from_env) def cancel(self): while True: @@ -188,6 +188,10 @@ def cancel(self): except futures.TimeoutError: pass + @property + def stream_logs(self) -> bool: + return self.request.stream_logs + @dataclass class IsolateServicer(definitions.IsolateServicer): @@ -277,6 +281,7 @@ def _run_task(self, task: RunTask) -> Iterator[definitions.PartialRunResult]: future = local_pool.submit( _proxy_to_queue, + # The agent may have been cached, so use the agent's message queue queue=agent.message_queue, bridge=agent.stub, input=function_call, @@ -324,16 +329,9 @@ def Submit( request: definitions.SubmitRequest, context: ServicerContext, ) -> definitions.SubmitResponse: - logger = ENV_LOGGER - if request.metadata.logger_labels: - logger_labels_dict = dict(request.metadata.logger_labels) - try: - logger = IsolateLogger.with_env_expanded(logger_labels_dict) - except BaseException: - # Ignore the error if the logger couldn't be created. - pass + task = RunTask(request=request.function) + self.set_metadata(task, request.metadata) - task = RunTask(request=request.function, logger=logger) task.future = RUNNER_THREAD_POOL.submit(self._run_task_in_background, task) task_id = str(uuid.uuid4()) @@ -365,21 +363,23 @@ def SetMetadata( StatusCode.NOT_FOUND, ) - task = self.background_tasks[request.task_id] - - task.logger.extra_labels = dict(request.metadata.logger_labels) + self.set_metadata(self.background_tasks[request.task_id], request.metadata) return definitions.SetMetadataResponse() + def set_metadata(self, task: RunTask, metadata: definitions.TaskMetadata) -> None: + task.logger.extra_labels = dict(metadata.logger_labels) + def Run( self, request: definitions.BoundFunction, context: ServicerContext, ) -> Iterator[definitions.PartialRunResult]: try: - # HACK: we can support only one task at a time for Run - # TODO: move away from this when we use submit for env-aware tasks task = RunTask(request=request) + + # HACK: we can support only one task at a time + # TODO: move away from this when we use submit for env-aware tasks self.background_tasks["RUN"] = task yield from self._run_task(task) except GRPCException as exc: @@ -495,6 +495,11 @@ def handle(self, log: Log) -> None: self._add_log_to_queue(log) def _add_log_to_queue(self, log: Log) -> None: + if not self.task.stream_logs: + # We do not queue the logs if the stream_logs is disabled + # but still log them to the logger. + return + grpc_log = cast(definitions.Log, to_grpc(log)) grpc_result = definitions.PartialRunResult( is_complete=False, diff --git a/tests/test_server.py b/tests/test_server.py index e937c94..d41b55e 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -120,6 +120,7 @@ def run_request( stub: definitions.IsolateStub, request: definitions.BoundFunction, *, + stream_logs: bool = True, build_logs: Optional[List[Log]] = None, bridge_logs: Optional[List[Log]] = None, user_logs: Optional[List[Log]] = None, @@ -130,6 +131,8 @@ def run_request( LogSource.USER: user_logs if user_logs is not None else [], } + request.stream_logs = stream_logs + return_value = _NOT_SET for result in stub.Run(request): for _log in result.logs: @@ -272,6 +275,44 @@ def test_user_logs_immediate(stub: definitions.IsolateStub, monkeypatch: Any) -> assert by_stream[LogLevel.DEBUG] == "[debug] error!" +def test_no_stream_logs(stub: definitions.IsolateStub, monkeypatch: Any) -> None: + inherit_from_local(monkeypatch) + + env_definition = define_environment("virtualenv", requirements=["pyjokes==0.6.0"]) + request = definitions.BoundFunction( + function=to_serialized_object( + partial( + exec, + textwrap.dedent( + """ + import sys, pyjokes + print(pyjokes.__version__) + print("error error!", file=sys.stderr) + """ + ), + ), + method="dill", + ), + environments=[env_definition], + ) + + user_logs: List[Log] = [] + build_logs: List[Log] = [] + bridge_logs: List[Log] = [] + run_request( + stub, + request, + user_logs=user_logs, + build_logs=build_logs, + bridge_logs=bridge_logs, + stream_logs=False, + ) + + assert len(user_logs) == 0 + assert len(build_logs) == 0 + assert len(bridge_logs) == 0 + + def test_unknown_environment(stub: definitions.IsolateStub, monkeypatch: Any) -> None: inherit_from_local(monkeypatch)