Route typed workflow polls through the package bridge #269
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: Polyglot Validation | |
| on: | |
| push: | |
| branches: [main] | |
| pull_request: | |
| branches: [main] | |
| permissions: | |
| contents: read | |
| jobs: | |
| polyglot: | |
| name: Test PHP ↔ Python cross-language execution | |
| runs-on: ubuntu-latest | |
| timeout-minutes: 20 | |
| steps: | |
| - name: Checkout server | |
| uses: actions/checkout@v6 | |
| - name: Set up Docker Buildx | |
| uses: docker/setup-buildx-action@v4 | |
| - name: Generate APP_KEY | |
| id: app-key | |
| run: | | |
| KEY=$(openssl rand -base64 32) | |
| echo "key=base64:$KEY" >> $GITHUB_OUTPUT | |
| - name: Start server stack | |
| env: | |
| APP_KEY: ${{ steps.app-key.outputs.key }} | |
| DW_AUTH_TOKEN: test-token-123 | |
| APP_VERSION: 2.0.0 | |
| run: | | |
| cat > docker-compose.override.yml << 'EOF' | |
| services: | |
| server: | |
| environment: | |
| DW_WORKER_POLL_TIMEOUT: "1" | |
| DW_WORKER_POLL_INTERVAL_MS: "100" | |
| EOF | |
| docker compose up -d --wait | |
| sleep 5 | |
| # Verify server is healthy | |
| curl -f http://localhost:8080/api/health || exit 1 | |
| echo "✓ Server is healthy" | |
| - name: Copy PHP worker script | |
| run: | | |
| mkdir -p /tmp/php-worker | |
| cat > /tmp/php-worker/worker.php << 'EOF' | |
| <?php | |
| declare(strict_types=1); | |
| use Workflow\Serializers\CodecRegistry; | |
| use Workflow\Serializers\Serializer; | |
| require '/app/vendor/autoload.php'; | |
| ob_implicit_flush(true); | |
| const BASE_URL = 'http://localhost:8080/api'; | |
| const TOKEN = 'test-token-123'; | |
| const NAMESPACE_NAME = 'default'; | |
| const PROTOCOL_VERSION = '1.0'; | |
| const TASK_QUEUE = 'polyglot-test'; | |
| const WORKER_ID = 'php-polyglot-worker'; | |
| /** | |
| * @param list<int> $allowedStatuses | |
| */ | |
| function request(string $method, string $path, ?array $body = null, int $timeout = 10, array $allowedStatuses = []): array | |
| { | |
| $headers = [ | |
| 'Accept: application/json', | |
| 'Content-Type: application/json', | |
| 'Authorization: Bearer '.TOKEN, | |
| 'X-Namespace: '.NAMESPACE_NAME, | |
| 'X-Durable-Workflow-Protocol-Version: '.PROTOCOL_VERSION, | |
| ]; | |
| $options = [ | |
| 'http' => [ | |
| 'method' => $method, | |
| 'header' => implode("\r\n", $headers), | |
| 'ignore_errors' => true, | |
| 'timeout' => $timeout, | |
| ], | |
| ]; | |
| if ($body !== null) { | |
| $options['http']['content'] = json_encode($body, JSON_THROW_ON_ERROR); | |
| } | |
| for ($attempt = 1; $attempt <= 3; $attempt++) { | |
| unset($http_response_header); | |
| $response = file_get_contents(BASE_URL.$path, false, stream_context_create($options)); | |
| $status = 0; | |
| foreach ($http_response_header ?? [] as $header) { | |
| if (preg_match('/^HTTP\/\S+\s+(\d+)/', $header, $matches) === 1) { | |
| $status = (int) $matches[1]; | |
| break; | |
| } | |
| } | |
| $decoded = $response === false || $response === '' | |
| ? [] | |
| : json_decode($response, true, flags: JSON_THROW_ON_ERROR); | |
| $payload = is_array($decoded) ? $decoded : []; | |
| $payload['_http_status'] = $status; | |
| if ($status >= 500 && $attempt < 3) { | |
| usleep(250000 * $attempt); | |
| continue; | |
| } | |
| if (($status >= 400 || $status === 0) && ! in_array($status, $allowedStatuses, true)) { | |
| throw new RuntimeException(sprintf('%s %s failed with HTTP %d: %s', $method, $path, $status, $response ?: '')); | |
| } | |
| return $payload; | |
| } | |
| throw new RuntimeException(sprintf('%s %s failed after retries', $method, $path)); | |
| } | |
| function assert_completion_response(array $response, string $kind): void | |
| { | |
| $status = (int) ($response['_http_status'] ?? 200); | |
| if ($status < 400) { | |
| return; | |
| } | |
| $reason = (string) ($response['reason'] ?? ''); | |
| if ($status === 409 && in_array($reason, ['stale_attempt', 'stale_task', 'task_already_completed'], true)) { | |
| return; | |
| } | |
| throw new RuntimeException(sprintf('%s completion failed with HTTP %d: %s', $kind, $status, json_encode($response))); | |
| } | |
| function task_codec(array $task): string | |
| { | |
| $codec = $task['payload_codec'] ?? null; | |
| if (! is_string($codec) || $codec === '') { | |
| $codec = is_array($task['arguments'] ?? null) ? ($task['arguments']['codec'] ?? null) : null; | |
| } | |
| return is_string($codec) && $codec !== '' ? $codec : CodecRegistry::defaultCodec(); | |
| } | |
| function envelope(mixed $value, ?string $codec = null): array | |
| { | |
| $codec = $codec !== null && $codec !== '' ? $codec : CodecRegistry::defaultCodec(); | |
| return [ | |
| 'codec' => $codec, | |
| 'blob' => Serializer::serializeWithCodec($codec, $value), | |
| ]; | |
| } | |
| function decode_payload(mixed $value, ?string $codec = null): mixed | |
| { | |
| if ($value === null) { | |
| return $value; | |
| } | |
| if (is_array($value) && array_key_exists('codec', $value) && array_key_exists('blob', $value)) { | |
| return Serializer::unserializeWithCodec((string) $value['codec'], (string) $value['blob']); | |
| } | |
| if (is_string($value)) { | |
| return Serializer::unserializeWithCodec($codec ?? CodecRegistry::defaultCodec(), $value); | |
| } | |
| return $value; | |
| } | |
| function activity_result_from_history(array $history, string $fallbackCodec): mixed | |
| { | |
| for ($i = count($history) - 1; $i >= 0; $i--) { | |
| $event = $history[$i]; | |
| if (($event['event_type'] ?? null) !== 'ActivityCompleted') { | |
| continue; | |
| } | |
| $payload = $event['payload'] ?? []; | |
| if (! is_array($payload)) { | |
| return null; | |
| } | |
| $codec = is_string($payload['payload_codec'] ?? null) && $payload['payload_codec'] !== '' | |
| ? $payload['payload_codec'] | |
| : $fallbackCodec; | |
| return decode_payload($payload['result'] ?? null, $codec); | |
| } | |
| return null; | |
| } | |
| function complete_workflow_task(array $task, array $commands): void | |
| { | |
| $response = request('POST', '/worker/workflow-tasks/'.$task['task_id'].'/complete', [ | |
| 'lease_owner' => $task['lease_owner'], | |
| 'workflow_task_attempt' => $task['workflow_task_attempt'] ?? 1, | |
| 'commands' => $commands, | |
| ], 10, [409]); | |
| assert_completion_response($response, 'workflow task'); | |
| } | |
| function complete_activity_task(array $task, mixed $result, string $codec): void | |
| { | |
| $response = request('POST', '/worker/activity-tasks/'.$task['task_id'].'/complete', [ | |
| 'activity_attempt_id' => $task['activity_attempt_id'] ?? $task['attempt_id'] ?? '', | |
| 'lease_owner' => $task['lease_owner'], | |
| 'result' => envelope($result, $codec), | |
| ], 10, [409]); | |
| assert_completion_response($response, 'activity task'); | |
| } | |
| function handle_workflow_task(array $task): void | |
| { | |
| $workflowType = $task['workflow_type'] ?? ''; | |
| $codec = task_codec($task); | |
| $input = decode_payload($task['arguments'] ?? null, $codec); | |
| $input = is_array($input) && array_is_list($input) ? ($input[0] ?? []) : $input; | |
| $history = $task['history_events'] ?? []; | |
| $activityResult = activity_result_from_history($history, $codec); | |
| if ($workflowType === 'PhpWorkflowCallsPythonActivity') { | |
| if ($activityResult === null) { | |
| complete_workflow_task($task, [[ | |
| 'type' => 'schedule_activity', | |
| 'activity_type' => 'python_activity_for_php', | |
| 'queue' => TASK_QUEUE, | |
| 'arguments' => envelope([$input], $codec), | |
| ]]); | |
| return; | |
| } | |
| complete_workflow_task($task, [[ | |
| 'type' => 'complete_workflow', | |
| 'result' => envelope([ | |
| 'php_workflow_result' => $activityResult, | |
| 'input_received' => $input, | |
| ], $codec), | |
| ]]); | |
| return; | |
| } | |
| if ($workflowType === 'PhpWorkflowCallsPhpActivity') { | |
| if ($activityResult === null) { | |
| complete_workflow_task($task, [[ | |
| 'type' => 'schedule_activity', | |
| 'activity_type' => 'php_activity_for_php', | |
| 'queue' => TASK_QUEUE, | |
| 'arguments' => envelope([$input], $codec), | |
| ]]); | |
| return; | |
| } | |
| complete_workflow_task($task, [[ | |
| 'type' => 'complete_workflow', | |
| 'result' => envelope(['php_to_php_result' => $activityResult], $codec), | |
| ]]); | |
| return; | |
| } | |
| complete_workflow_task($task, [[ | |
| 'type' => 'fail_workflow', | |
| 'message' => 'Unknown PHP workflow type: '.$workflowType, | |
| ]]); | |
| } | |
| function handle_activity_task(array $task): void | |
| { | |
| $codec = task_codec($task); | |
| $input = decode_payload($task['arguments'] ?? null, $codec); | |
| $input = is_array($input) && array_is_list($input) ? ($input[0] ?? []) : $input; | |
| $activityType = $task['activity_type'] ?? ''; | |
| if ($activityType === 'php_activity_for_python') { | |
| complete_activity_task($task, [ | |
| 'php_activity_executed' => true, | |
| 'input' => $input, | |
| 'type_checks' => [ | |
| 'string' => is_string($input['string'] ?? null), | |
| 'int' => is_int($input['int'] ?? null), | |
| 'float' => is_float($input['float'] ?? null), | |
| 'bool' => is_bool($input['bool'] ?? null), | |
| 'null' => ($input['null'] ?? 'not_null') === null, | |
| 'array' => is_array($input['array'] ?? null), | |
| 'map' => is_array($input['map'] ?? null), | |
| ], | |
| 'php_version' => PHP_VERSION, | |
| ], $codec); | |
| return; | |
| } | |
| if ($activityType === 'php_activity_for_php') { | |
| complete_activity_task($task, [ | |
| 'php_activity' => 'success', | |
| 'data' => $input, | |
| ], $codec); | |
| return; | |
| } | |
| throw new RuntimeException('Unknown PHP activity type: '.$activityType); | |
| } | |
| request('POST', '/worker/register', [ | |
| 'worker_id' => WORKER_ID, | |
| 'task_queue' => TASK_QUEUE, | |
| 'runtime' => 'php', | |
| 'sdk_version' => 'durable-workflow-php/protocol-ci', | |
| 'supported_workflow_types' => [ | |
| 'PhpWorkflowCallsPythonActivity', | |
| 'PhpWorkflowCallsPhpActivity', | |
| ], | |
| 'supported_activity_types' => [ | |
| 'php_activity_for_python', | |
| 'php_activity_for_php', | |
| ], | |
| ]); | |
| echo "PHP worker starting\n"; | |
| while (true) { | |
| $workflowPoll = request('POST', '/worker/workflow-tasks/poll', [ | |
| 'worker_id' => WORKER_ID, | |
| 'task_queue' => TASK_QUEUE, | |
| ], 6); | |
| if (is_array($workflowPoll['task'] ?? null)) { | |
| handle_workflow_task($workflowPoll['task']); | |
| } | |
| $activityPoll = request('POST', '/worker/activity-tasks/poll', [ | |
| 'worker_id' => WORKER_ID, | |
| 'task_queue' => TASK_QUEUE, | |
| ], 6); | |
| if (is_array($activityPoll['task'] ?? null)) { | |
| handle_activity_task($activityPoll['task']); | |
| } | |
| usleep(100000); | |
| } | |
| EOF | |
| - name: Start PHP worker in background | |
| run: | | |
| SERVER_IMAGE="$(docker compose images -q server)" | |
| if [ -z "$SERVER_IMAGE" ]; then | |
| echo "Unable to resolve built server image" | |
| exit 1 | |
| fi | |
| docker rm -f polyglot-php-worker >/dev/null 2>&1 || true | |
| docker run -d --name polyglot-php-worker --network host \ | |
| -v /tmp/php-worker/worker.php:/worker.php:ro \ | |
| "$SERVER_IMAGE" php /worker.php > /tmp/php-worker.cid | |
| echo "✓ PHP worker started (container: $(cat /tmp/php-worker.cid))" | |
| sleep 2 | |
| docker logs polyglot-php-worker | |
| - name: Set up Python 3.10 | |
| uses: actions/setup-python@v6 | |
| with: | |
| python-version: '3.10' | |
| - name: Install Python SDK | |
| run: | | |
| pip install durable-workflow httpx | |
| - name: Copy Python worker script | |
| run: | | |
| cat > /tmp/python-worker.py << 'EOF' | |
| #!/usr/bin/env python3 | |
| import asyncio | |
| import sys | |
| from durable_workflow import Client, Worker, workflow, activity | |
| @workflow.defn(name="PythonWorkflowCallsPhpActivity") | |
| class PythonWorkflowCallsPhpActivity: | |
| def run(self, ctx, *args): | |
| input_data = args[0] if args else {} | |
| result = yield ctx.schedule_activity("php_activity_for_python", [input_data]) | |
| return {"python_workflow_result": result, "input_received": input_data} | |
| @workflow.defn(name="PythonWorkflowCallsPythonActivity") | |
| class PythonWorkflowCallsPythonActivity: | |
| def run(self, ctx, *args): | |
| input_data = args[0] if args else {} | |
| result = yield ctx.schedule_activity("python_activity_for_python", [input_data]) | |
| return {"python_to_python_result": result} | |
| @activity.defn(name="python_activity_for_php") | |
| def python_activity_for_php(input_data: dict) -> dict: | |
| return { | |
| "python_activity_executed": True, | |
| "input": input_data, | |
| "type_checks": { | |
| "string": isinstance(input_data.get("string"), str), | |
| "int": isinstance(input_data.get("int"), int), | |
| "float": isinstance(input_data.get("float"), float), | |
| "bool": isinstance(input_data.get("bool"), bool), | |
| "null": input_data.get("null") is None, | |
| "array": isinstance(input_data.get("array"), list), | |
| "map": isinstance(input_data.get("map"), dict), | |
| }, | |
| "python_version": sys.version, | |
| } | |
| @activity.defn(name="python_activity_for_python") | |
| def python_activity_for_python(input_data: dict) -> dict: | |
| return {"python_activity": "success", "data": input_data} | |
| async def main(): | |
| client = Client("http://localhost:8080", token="test-token-123", namespace="default") | |
| worker = Worker( | |
| client, | |
| task_queue="polyglot-test", | |
| workflows=[ | |
| PythonWorkflowCallsPhpActivity, | |
| PythonWorkflowCallsPythonActivity, | |
| ], | |
| activities=[ | |
| python_activity_for_php, | |
| python_activity_for_python, | |
| ], | |
| worker_id="python-polyglot-worker", | |
| ) | |
| print("Python worker starting") | |
| await worker.run() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| EOF | |
| chmod +x /tmp/python-worker.py | |
| - name: Start Python worker in background | |
| run: | | |
| python -u /tmp/python-worker.py > /tmp/python-worker.log 2>&1 & | |
| echo $! > /tmp/python-worker.pid | |
| echo "✓ Python worker started (PID: $(cat /tmp/python-worker.pid))" | |
| sleep 2 | |
| - name: Create test orchestrator | |
| run: | | |
| cat > /tmp/test-orchestrator.py << 'EOFTEST' | |
| #!/usr/bin/env python3 | |
| import asyncio | |
| import sys | |
| from durable_workflow import Client | |
| from durable_workflow import serializer | |
| async def start_workflow_and_wait(client, workflow_type, workflow_id, input_data, timeout=30): | |
| print(f"Starting {workflow_type}...") | |
| data = await client._request( | |
| "POST", | |
| "/workflows", | |
| json={ | |
| "workflow_type": workflow_type, | |
| "workflow_id": workflow_id, | |
| "task_queue": "polyglot-test", | |
| "input": [input_data], | |
| }, | |
| ) | |
| deadline = asyncio.get_running_loop().time() + timeout | |
| while True: | |
| desc = await client._request("GET", f"/workflows/{data['workflow_id']}") | |
| status = desc.get("status") | |
| if desc.get("is_terminal") or status in {"completed", "failed", "terminated", "canceled", "cancelled"}: | |
| if status != "completed": | |
| raise RuntimeError(f"workflow {workflow_id} ended with status={status}") | |
| envelope = desc.get("output_envelope") | |
| print(f" ✓ Completed") | |
| if envelope is not None: | |
| return serializer.decode_envelope(envelope) | |
| return desc.get("output") | |
| if asyncio.get_running_loop().time() > deadline: | |
| raise TimeoutError( | |
| f"workflow {workflow_id} not terminal after {timeout}s (status={status})" | |
| ) | |
| await asyncio.sleep(1) | |
| async def main(): | |
| client = Client("http://localhost:8080", token="test-token-123", namespace="default") | |
| # Wait for server | |
| for i in range(30): | |
| try: | |
| await client.get_cluster_info() | |
| break | |
| except Exception: | |
| await asyncio.sleep(1) | |
| # Wait for workers to register | |
| await asyncio.sleep(5) | |
| tests_passed = 0 | |
| tests_total = 0 | |
| # Test 1: PHP → Python | |
| print("\n=== Test 1: PHP → Python ===") | |
| tests_total += 1 | |
| try: | |
| result = await start_workflow_and_wait( | |
| client, "PhpWorkflowCallsPythonActivity", "php-to-python", | |
| {"string": "hello", "int": 42, "float": 3.14, "bool": True, "null": None, "array": [1,2,3], "map": {"k":"v"}} | |
| ) | |
| if result.get("php_workflow_result", {}).get("python_activity_executed"): | |
| print(" ✓ PASS") | |
| tests_passed += 1 | |
| else: | |
| print(" ✗ FAIL") | |
| except Exception as e: | |
| print(f" ✗ FAIL: {e}") | |
| # Test 2: Python → PHP | |
| print("\n=== Test 2: Python → PHP ===") | |
| tests_total += 1 | |
| try: | |
| result = await start_workflow_and_wait( | |
| client, "PythonWorkflowCallsPhpActivity", "python-to-php", | |
| {"string": "world", "int": 99, "float": 2.71, "bool": False, "null": None, "array": ["a","b"], "map": {"foo":"bar"}} | |
| ) | |
| if result.get("python_workflow_result", {}).get("php_activity_executed"): | |
| print(" ✓ PASS") | |
| tests_passed += 1 | |
| else: | |
| print(" ✗ FAIL") | |
| except Exception as e: | |
| print(f" ✗ FAIL: {e}") | |
| # Test 3: PHP → PHP | |
| print("\n=== Test 3: PHP → PHP ===") | |
| tests_total += 1 | |
| try: | |
| result = await start_workflow_and_wait( | |
| client, "PhpWorkflowCallsPhpActivity", "php-to-php", | |
| {"test": "php-to-php"} | |
| ) | |
| if result.get("php_to_php_result", {}).get("php_activity") == "success": | |
| print(" ✓ PASS") | |
| tests_passed += 1 | |
| else: | |
| print(" ✗ FAIL") | |
| except Exception as e: | |
| print(f" ✗ FAIL: {e}") | |
| # Test 4: Python → Python | |
| print("\n=== Test 4: Python → Python ===") | |
| tests_total += 1 | |
| try: | |
| result = await start_workflow_and_wait( | |
| client, "PythonWorkflowCallsPythonActivity", "python-to-python", | |
| {"test": "python-to-python"} | |
| ) | |
| if result.get("python_to_python_result", {}).get("python_activity") == "success": | |
| print(" ✓ PASS") | |
| tests_passed += 1 | |
| else: | |
| print(" ✗ FAIL") | |
| except Exception as e: | |
| print(f" ✗ FAIL: {e}") | |
| print(f"\n=== Summary: {tests_passed}/{tests_total} tests passed ===") | |
| await client.aclose() | |
| sys.exit(0 if tests_passed == tests_total else 1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| EOFTEST | |
| chmod +x /tmp/test-orchestrator.py | |
| - name: Run polyglot tests | |
| run: | | |
| python /tmp/test-orchestrator.py | |
| - name: Show PHP worker log on failure | |
| if: failure() | |
| run: | | |
| echo "=== PHP Worker Log ===" | |
| docker logs polyglot-php-worker || echo "No PHP worker log" | |
| - name: Show Python worker log on failure | |
| if: failure() | |
| run: | | |
| echo "=== Python Worker Log ===" | |
| cat /tmp/python-worker.log || echo "No Python worker log" | |
| - name: Show server logs on failure | |
| if: failure() | |
| run: | | |
| echo "=== Server Logs ===" | |
| docker compose logs server | |
| - name: Stop workers | |
| if: always() | |
| run: | | |
| docker rm -f polyglot-php-worker 2>/dev/null || true | |
| if [ -f /tmp/python-worker.pid ]; then | |
| kill $(cat /tmp/python-worker.pid) 2>/dev/null || true | |
| fi | |
| - name: Stop server | |
| if: always() | |
| run: docker compose down -v |