Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,7 @@ To build the SDK from source for use as a dependency, the following prerequisite
* [uv](https://docs.astral.sh/uv/)
* [Rust](https://www.rust-lang.org/)
* [Protobuf Compiler](https://protobuf.dev/)
* [Node.js](https://nodejs.org/)

Use `uv` to install `poe`:

Expand Down Expand Up @@ -2074,6 +2075,12 @@ back from this downgrade, restore both of those files and run `uv sync --all-ext
run for protobuf version 3 by setting the `TEMPORAL_TEST_PROTO3` env var to `1` prior to running
tests.

The local build and lint flows also regenerate Temporal system Nexus models. By default this pulls
in `nexus-rpc-gen@0.1.0-alpha.4` via `npx`. To use an existing checkout instead, set
`TEMPORAL_NEXUS_RPC_GEN_DIR` to the `nexus-rpc-gen` repo root or its `src` directory before
running `poe build-develop`, `poe lint`, or `poe gen-protos`. The local checkout override path
also requires [`pnpm`](https://pnpm.io/) to be installed.

### Style

* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ gen-protos = [
{ cmd = "uv run scripts/gen_protos.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
gen-protos-docker = [
{ cmd = "uv run scripts/gen_protos_docker.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
lint = [
Expand All @@ -102,6 +104,7 @@ lint-types = [
{ cmd = "uv run mypy --namespace-packages --check-untyped-defs ." },
{ cmd = "uv run basedpyright" },
]
gen-nexus-system-models = "uv run scripts/gen_nexus_system_models.py"
run-bench = "uv run python scripts/run_bench.py"
test = "uv run pytest"

Expand Down Expand Up @@ -139,14 +142,17 @@ environment = { PATH = "$PATH:$HOME/.cargo/bin", CARGO_NET_GIT_FETCH_WITH_CLI =
ignore_missing_imports = true
exclude = [
# Ignore generated code
'build',
'temporalio/api',
'temporalio/bridge/proto',
'temporalio/nexus/system/_workflow_service_generated.py',
]

[tool.pydocstyle]
convention = "google"
# https://github.com/PyCQA/pydocstyle/issues/363#issuecomment-625563088
match_dir = "^(?!(docs|scripts|tests|api|proto|\\.)).*"
match_dir = "^(?!(build|docs|scripts|tests|api|proto|\\.)).*"
match = "^(?!_workflow_service_generated\\.py$).*\\.py"
add_ignore = [
# We like to wrap at a certain number of chars, even long summary sentences.
# https://github.com/PyCQA/pydocstyle/issues/184
Expand Down
120 changes: 120 additions & 0 deletions scripts/gen_nexus_system_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

import os
import subprocess
import sys
from pathlib import Path

NEXUS_RPC_GEN_ENV_VAR = "TEMPORAL_NEXUS_RPC_GEN_DIR"
NEXUS_RPC_GEN_VERSION = "0.1.0-alpha.4"


def main() -> None:
repo_root = Path(__file__).resolve().parent.parent
# TODO: Remove the local .nexusrpc.yaml shim once the upstream API repo
# checks in the Nexus definition we can consume directly.
override_root = normalize_nexus_rpc_gen_root(
Path.cwd(), env_value=NEXUS_RPC_GEN_ENV_VAR
)
input_schema = (
repo_root
/ "temporalio"
/ "nexus"
/ "system"
/ "_workflow_service.nexusrpc.yaml"
)
output_file = (
repo_root / "temporalio" / "nexus" / "system" / "_workflow_service_generated.py"
)

if not input_schema.is_file():
raise RuntimeError(f"Expected Nexus schema at {input_schema}")

run_nexus_rpc_gen(
override_root=override_root,
output_file=output_file,
input_schema=input_schema,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"check",
"--select",
"I",
"--fix",
str(output_file),
],
cwd=repo_root,
check=True,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"format",
str(output_file),
],
cwd=repo_root,
check=True,
)


def run_nexus_rpc_gen(
*, override_root: Path | None, output_file: Path, input_schema: Path
) -> None:
common_args = [
"--lang",
"py",
"--out-file",
str(output_file),
"--temporal-nexus-payload-codec-support",
str(input_schema),
]
if override_root is None:
subprocess.run(
["npx", "--yes", f"nexus-rpc-gen@{NEXUS_RPC_GEN_VERSION}", *common_args],
check=True,
)
return

subprocess.run(
[
"node",
"packages/nexus-rpc-gen/dist/index.js",
*common_args,
],
cwd=override_root,
check=True,
)


def normalize_nexus_rpc_gen_root(base_dir: Path, env_value: str) -> Path | None:
raw_root = env_get(env_value)
if raw_root is None:
return None
candidate = Path(raw_root)
if not candidate.is_absolute():
candidate = base_dir / candidate
candidate = candidate.resolve()
if (candidate / "package.json").is_file() and (candidate / "packages").is_dir():
return candidate
if (candidate / "src" / "package.json").is_file():
return candidate / "src"
raise RuntimeError(
f"{NEXUS_RPC_GEN_ENV_VAR} must point to the nexus-rpc-gen repo root or its src directory"
)


def env_get(name: str) -> str | None:
return os.environ.get(name)


if __name__ == "__main__":
try:
main()
except Exception as err:
print(f"Failed to generate Nexus system models: {err}", file=sys.stderr)
raise
51 changes: 49 additions & 2 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
import temporalio.converter
import temporalio.nexus.system
from temporalio.api.common.v1.message_pb2 import Payload
from temporalio.api.enums.v1.command_type_pb2 import CommandType
from temporalio.bridge._visitor import VisitorFunctions
from temporalio.bridge.temporal_sdk_bridge import (
CustomSlotSupplier as BridgeCustomSlotSupplier,
)
from temporalio.bridge.temporal_sdk_bridge import (
PollShutdownError, # type: ignore # noqa: F401
)
from temporalio.worker import _command_aware_visitor
from temporalio.worker._command_aware_visitor import CommandAwarePayloadVisitor


Expand Down Expand Up @@ -279,7 +282,10 @@ async def finalize_shutdown(self) -> None:


class _Visitor(VisitorFunctions):
def __init__(self, f: Callable[[Sequence[Payload]], Awaitable[list[Payload]]]):
def __init__(
self,
f: Callable[[Sequence[Payload]], Awaitable[list[Payload]]],
):
self._f = f

async def visit_payload(self, payload: Payload) -> None:
Expand All @@ -297,6 +303,42 @@ async def visit_payloads(self, payloads: MutableSequence[Payload]) -> None:
payloads.extend(new_payloads)


async def _encode_completion_payloads(
data_converter: temporalio.converter.DataConverter,
payloads: Sequence[Payload],
) -> list[Payload]:
if len(payloads) != 1:
return await data_converter._encode_payload_sequence(payloads)

# A single payload may be the outer envelope for a system Nexus operation.
# In that case we leave the envelope itself unencoded so the server can read
# it, but still route any nested Temporal payloads through normal payload
# processing via the generated operation-specific rewriter.
payload = payloads[0]
command_info = _command_aware_visitor.current_command_info.get()
if (
command_info is None
or command_info.command_type
!= CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION
or not command_info.nexus_service
or not command_info.nexus_operation
):
return await data_converter._encode_payload_sequence(payloads)

rewrite = temporalio.nexus.system.get_payload_rewriter(
command_info.nexus_service, command_info.nexus_operation
)
if rewrite is None:
return await data_converter._encode_payload_sequence(payloads)

Comment thread
tconley1428 marked this conversation as resolved.
Outdated
new_payload = await rewrite(
payload,
data_converter._encode_payload_sequence,
False,
)
return [new_payload]


async def decode_activation(
activation: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
data_converter: temporalio.converter.DataConverter,
Expand All @@ -316,4 +358,9 @@ async def encode_completion(
"""Encode all payloads in the completion."""
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
).visit(
_Visitor(
lambda payloads: _encode_completion_payloads(data_converter, payloads)
),
completion,
)
53 changes: 53 additions & 0 deletions temporalio/nexus/system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Generated system Nexus service models.

This package contains code generated from Temporal's system Nexus schemas.
Higher-level ergonomic APIs may wrap these generated types.
"""

from collections.abc import Awaitable, Callable, Sequence

import temporalio.api.common.v1
import temporalio.converter

from . import _workflow_service_generated as generated
from ._workflow_service_generated import __temporal_nexus_payload_rewriters__

TemporalNexusPayloadRewriter = Callable[
[
temporalio.api.common.v1.Payload,
Callable[
[Sequence[temporalio.api.common.v1.Payload]],
Awaitable[list[temporalio.api.common.v1.Payload]],
],
bool,
],
Awaitable[temporalio.api.common.v1.Payload],
]

_SYSTEM_NEXUS_PAYLOAD_CONVERTER = temporalio.converter.default().payload_converter


def get_payload_rewriter(
service: str,
operation: str,
) -> TemporalNexusPayloadRewriter | None:
"""Return the generated nested-payload rewriter for a system Nexus operation."""
return __temporal_nexus_payload_rewriters__.get((service, operation))


def is_system_operation(service: str, operation: str) -> bool:
"""Return whether a Nexus operation uses the generated system envelope."""
return get_payload_rewriter(service, operation) is not None


def get_payload_converter() -> temporalio.converter.PayloadConverter:
"""Return the fixed payload converter for system Nexus outer envelopes."""
return _SYSTEM_NEXUS_PAYLOAD_CONVERTER


__all__ = (
"generated",
"get_payload_converter",
"get_payload_rewriter",
"is_system_operation",
)
11 changes: 11 additions & 0 deletions temporalio/nexus/system/_workflow_service.nexusrpc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# TODO: Remove this local shim once the upstream API repo checks in the Nexus
# definition and the generator can consume it directly.
nexusrpc: 1.0.0
services:
WorkflowService:
operations:
SignalWithStartWorkflowExecution:
input:
$ref: ../../bridge/sdk-core/crates/common/protos/api_upstream/openapi/openapiv3.yaml#/components/schemas/SignalWithStartWorkflowExecutionRequest
output:
$ref: ../../bridge/sdk-core/crates/common/protos/api_upstream/openapi/openapiv3.yaml#/components/schemas/SignalWithStartWorkflowExecutionResponse
Loading