Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/isolate/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,3 @@ def from_env(cls) -> "IsolateLogger":
print("Failed to parse ISOLATE_LOG_LABELS")

return cls.with_env_expanded(labels=_labels)


ENV_LOGGER = IsolateLogger.from_env()
8 changes: 8 additions & 0 deletions src/isolate/server/definitions/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ service Isolate {
// and the result originating from that function.
rpc Run (BoundFunction) returns (stream PartialRunResult) {}

rpc RunFunction (RunRequest) returns (stream PartialRunResult) {}

// Submit a function to be run without waiting for results.
rpc Submit (SubmitRequest) returns (SubmitResponse) {}

Expand All @@ -27,6 +29,11 @@ message BoundFunction {
optional SerializedObject setup_func = 3;
}

message RunRequest {
BoundFunction function = 1;
TaskMetadata metadata = 2;
}

message EnvironmentDefinition {
// Kind of the isolate environment.
string kind = 1;
Expand All @@ -46,6 +53,7 @@ message SubmitRequest {
message TaskMetadata {
// Labels to attach to the logs.
map<string, string> logger_labels = 1;
bool stream_logs = 2;
}

message SubmitResponse {
Expand Down
56 changes: 29 additions & 27 deletions src/isolate/server/definitions/server_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion src/isolate/server/definitions/server_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ class BoundFunction(google.protobuf.message.Message):

global___BoundFunction = BoundFunction

@typing_extensions.final
class RunRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

FUNCTION_FIELD_NUMBER: builtins.int
METADATA_FIELD_NUMBER: builtins.int
@property
def function(self) -> global___BoundFunction: ...
@property
def metadata(self) -> global___TaskMetadata: ...
def __init__(
self,
*,
function: global___BoundFunction | None = ...,
metadata: global___TaskMetadata | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> None: ...

global___RunRequest = RunRequest

@typing_extensions.final
class EnvironmentDefinition(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down Expand Up @@ -114,15 +135,18 @@ class TaskMetadata(google.protobuf.message.Message):
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

LOGGER_LABELS_FIELD_NUMBER: builtins.int
STREAM_LOGS_FIELD_NUMBER: builtins.int
@property
def logger_labels(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
"""Labels to attach to the logs."""
stream_logs: builtins.bool
def __init__(
self,
*,
logger_labels: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
stream_logs: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["logger_labels", b"logger_labels"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["logger_labels", b"logger_labels", "stream_logs", b"stream_logs"]) -> None: ...

global___TaskMetadata = TaskMetadata

Expand Down
43 changes: 43 additions & 0 deletions src/isolate/server/definitions/server_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def __init__(self, channel):
request_serializer=server__pb2.BoundFunction.SerializeToString,
response_deserializer=common__pb2.PartialRunResult.FromString,
_registered_method=True)
self.RunFunction = channel.unary_stream(
'/Isolate/RunFunction',
request_serializer=server__pb2.RunRequest.SerializeToString,
response_deserializer=common__pb2.PartialRunResult.FromString,
_registered_method=True)
self.Submit = channel.unary_unary(
'/Isolate/Submit',
request_serializer=server__pb2.SubmitRequest.SerializeToString,
Expand Down Expand Up @@ -78,6 +83,12 @@ def Run(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def RunFunction(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Submit(self, request, context):
"""Submit a function to be run without waiting for results.
"""
Expand Down Expand Up @@ -114,6 +125,11 @@ def add_IsolateServicer_to_server(servicer, server):
request_deserializer=server__pb2.BoundFunction.FromString,
response_serializer=common__pb2.PartialRunResult.SerializeToString,
),
'RunFunction': grpc.unary_stream_rpc_method_handler(
servicer.RunFunction,
request_deserializer=server__pb2.RunRequest.FromString,
response_serializer=common__pb2.PartialRunResult.SerializeToString,
),
'Submit': grpc.unary_unary_rpc_method_handler(
servicer.Submit,
request_deserializer=server__pb2.SubmitRequest.FromString,
Expand Down Expand Up @@ -172,6 +188,33 @@ def Run(request,
metadata,
_registered_method=True)

@staticmethod
def RunFunction(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/Isolate/RunFunction',
server__pb2.RunRequest.SerializeToString,
common__pb2.PartialRunResult.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def Submit(request,
target,
Expand Down
60 changes: 43 additions & 17 deletions src/isolate/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from isolate.backends.virtualenv import VirtualPythonEnvironment
from isolate.connections.grpc import AgentError, LocalPythonGRPC
from isolate.connections.grpc.configuration import get_default_options
from isolate.logger import ENV_LOGGER, IsolateLogger
from isolate.logger import IsolateLogger
from isolate.logs import Log, LogLevel, LogSource
from isolate.server import definitions, health
from isolate.server.health_server import HealthServicer
Expand Down Expand Up @@ -72,7 +72,7 @@ def __str__(self) -> str:
@dataclass
class RunnerAgent:
stub: definitions.AgentStub
message_queue: Queue
message_queue: Queue[definitions.PartialRunResult]
_bound_context: ExitStack
_channel_state_history: list[grpc.ChannelConnectivity] = field(default_factory=list)

Expand Down Expand Up @@ -175,7 +175,8 @@ class RunTask:
request: definitions.BoundFunction
future: futures.Future | None = None
agent: RunnerAgent | None = None
logger: IsolateLogger = ENV_LOGGER
logger: IsolateLogger = field(default_factory=IsolateLogger.from_env)
stream_logs: bool = True

def cancel(self):
while True:
Expand Down Expand Up @@ -277,6 +278,7 @@ def _run_task(self, task: RunTask) -> Iterator[definitions.PartialRunResult]:

future = local_pool.submit(
_proxy_to_queue,
# The agent may have been cached, so use the agent's message queue
queue=agent.message_queue,
bridge=agent.stub,
input=function_call,
Expand Down Expand Up @@ -324,16 +326,9 @@ def Submit(
request: definitions.SubmitRequest,
context: ServicerContext,
) -> definitions.SubmitResponse:
logger = ENV_LOGGER
if request.metadata.logger_labels:
logger_labels_dict = dict(request.metadata.logger_labels)
try:
logger = IsolateLogger.with_env_expanded(logger_labels_dict)
except BaseException:
# Ignore the error if the logger couldn't be created.
pass
task = RunTask(request=request.function, stream_logs=False)
self.set_metadata(task, request.metadata)

task = RunTask(request=request.function, logger=logger)
task.future = RUNNER_THREAD_POOL.submit(self._run_task_in_background, task)
task_id = str(uuid.uuid4())

Expand Down Expand Up @@ -365,21 +360,47 @@ def SetMetadata(
StatusCode.NOT_FOUND,
)

task = self.background_tasks[request.task_id]

task.logger.extra_labels = dict(request.metadata.logger_labels)
self.set_metadata(self.background_tasks[request.task_id], request.metadata)

return definitions.SetMetadataResponse()

def set_metadata(self, task: RunTask, metadata: definitions.TaskMetadata) -> None:
task.logger.extra_labels = dict(metadata.logger_labels)
# Stream_logs defaults to False if not set
task.stream_logs = metadata.stream_logs

def RunFunction(
self,
request: definitions.RunRequest,
context: ServicerContext,
) -> Iterator[definitions.PartialRunResult]:
try:
task = RunTask(request=request.function)
self.set_metadata(task, request.metadata)

# HACK: we can support only one task at a time
# TODO: move away from this when we use submit for env-aware tasks
self.background_tasks["RUN"] = task
yield from self._run_task(task)
except GRPCException as exc:
return self.abort_with_msg(
exc.message,
context,
code=exc.code,
)
finally:
self.background_tasks.pop("RUN", None)

def Run(
self,
request: definitions.BoundFunction,
context: ServicerContext,
) -> Iterator[definitions.PartialRunResult]:
try:
# HACK: we can support only one task at a time for Run
# TODO: move away from this when we use submit for env-aware tasks
task = RunTask(request=request)

# HACK: we can support only one task at a time
# TODO: move away from this when we use submit for env-aware tasks
self.background_tasks["RUN"] = task
yield from self._run_task(task)
except GRPCException as exc:
Expand Down Expand Up @@ -495,6 +516,11 @@ def handle(self, log: Log) -> None:
self._add_log_to_queue(log)

def _add_log_to_queue(self, log: Log) -> None:
if not self.task.stream_logs:
# We do not queue the logs if the stream_logs is disabled
# but still log them to the logger.
return

grpc_log = cast(definitions.Log, to_grpc(log))
grpc_result = definitions.PartialRunResult(
is_complete=False,
Expand Down
Loading