Skip to content

Expose system rollout health snapshots #258

Expose system rollout health snapshots

Expose system rollout health snapshots #258

name: Polyglot Validation
on:
push:
branches: [main]
pull_request:
branches: [main]
permissions:
contents: read
jobs:
polyglot:
name: Test PHP ↔ Python cross-language execution
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- name: Checkout server
uses: actions/checkout@v6
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v4
- name: Generate APP_KEY
id: app-key
run: |
KEY=$(openssl rand -base64 32)
echo "key=base64:$KEY" >> $GITHUB_OUTPUT
- name: Start server stack
env:
APP_KEY: ${{ steps.app-key.outputs.key }}
DW_AUTH_TOKEN: test-token-123
APP_VERSION: 2.0.0
run: |
cat > docker-compose.override.yml << 'EOF'
services:
server:
environment:
DW_WORKER_POLL_TIMEOUT: "1"
DW_WORKER_POLL_INTERVAL_MS: "100"
EOF
docker compose up -d --wait
sleep 5
# Verify server is healthy
curl -f http://localhost:8080/api/health || exit 1
echo "✓ Server is healthy"
- name: Copy PHP worker script
run: |
mkdir -p /tmp/php-worker
cat > /tmp/php-worker/worker.php << 'EOF'
<?php
declare(strict_types=1);
use Workflow\Serializers\CodecRegistry;
use Workflow\Serializers\Serializer;
require '/app/vendor/autoload.php';
ob_implicit_flush(true);
const BASE_URL = 'http://localhost:8080/api';
const TOKEN = 'test-token-123';
const NAMESPACE_NAME = 'default';
const PROTOCOL_VERSION = '1.0';
const TASK_QUEUE = 'polyglot-test';
const WORKER_ID = 'php-polyglot-worker';
/**
* @param list<int> $allowedStatuses
*/
function request(string $method, string $path, ?array $body = null, int $timeout = 10, array $allowedStatuses = []): array
{
$headers = [
'Accept: application/json',
'Content-Type: application/json',
'Authorization: Bearer '.TOKEN,
'X-Namespace: '.NAMESPACE_NAME,
'X-Durable-Workflow-Protocol-Version: '.PROTOCOL_VERSION,
];
$options = [
'http' => [
'method' => $method,
'header' => implode("\r\n", $headers),
'ignore_errors' => true,
'timeout' => $timeout,
],
];
if ($body !== null) {
$options['http']['content'] = json_encode($body, JSON_THROW_ON_ERROR);
}
for ($attempt = 1; $attempt <= 3; $attempt++) {
unset($http_response_header);
$response = file_get_contents(BASE_URL.$path, false, stream_context_create($options));
$status = 0;
foreach ($http_response_header ?? [] as $header) {
if (preg_match('/^HTTP\/\S+\s+(\d+)/', $header, $matches) === 1) {
$status = (int) $matches[1];
break;
}
}
$decoded = $response === false || $response === ''
? []
: json_decode($response, true, flags: JSON_THROW_ON_ERROR);
$payload = is_array($decoded) ? $decoded : [];
$payload['_http_status'] = $status;
if ($status >= 500 && $attempt < 3) {
usleep(250000 * $attempt);
continue;
}
if (($status >= 400 || $status === 0) && ! in_array($status, $allowedStatuses, true)) {
throw new RuntimeException(sprintf('%s %s failed with HTTP %d: %s', $method, $path, $status, $response ?: ''));
}
return $payload;
}
throw new RuntimeException(sprintf('%s %s failed after retries', $method, $path));
}
function assert_completion_response(array $response, string $kind): void
{
$status = (int) ($response['_http_status'] ?? 200);
if ($status < 400) {
return;
}
$reason = (string) ($response['reason'] ?? '');
if ($status === 409 && in_array($reason, ['stale_attempt', 'stale_task', 'task_already_completed'], true)) {
return;
}
throw new RuntimeException(sprintf('%s completion failed with HTTP %d: %s', $kind, $status, json_encode($response)));
}
function task_codec(array $task): string
{
$codec = $task['payload_codec'] ?? null;
if (! is_string($codec) || $codec === '') {
$codec = is_array($task['arguments'] ?? null) ? ($task['arguments']['codec'] ?? null) : null;
}
return is_string($codec) && $codec !== '' ? $codec : CodecRegistry::defaultCodec();
}
function envelope(mixed $value, ?string $codec = null): array
{
$codec = $codec !== null && $codec !== '' ? $codec : CodecRegistry::defaultCodec();
return [
'codec' => $codec,
'blob' => Serializer::serializeWithCodec($codec, $value),
];
}
function decode_payload(mixed $value, ?string $codec = null): mixed
{
if ($value === null) {
return $value;
}
if (is_array($value) && array_key_exists('codec', $value) && array_key_exists('blob', $value)) {
return Serializer::unserializeWithCodec((string) $value['codec'], (string) $value['blob']);
}
if (is_string($value)) {
return Serializer::unserializeWithCodec($codec ?? CodecRegistry::defaultCodec(), $value);
}
return $value;
}
function activity_result_from_history(array $history, string $fallbackCodec): mixed
{
for ($i = count($history) - 1; $i >= 0; $i--) {
$event = $history[$i];
if (($event['event_type'] ?? null) !== 'ActivityCompleted') {
continue;
}
$payload = $event['payload'] ?? [];
if (! is_array($payload)) {
return null;
}
$codec = is_string($payload['payload_codec'] ?? null) && $payload['payload_codec'] !== ''
? $payload['payload_codec']
: $fallbackCodec;
return decode_payload($payload['result'] ?? null, $codec);
}
return null;
}
function complete_workflow_task(array $task, array $commands): void
{
$response = request('POST', '/worker/workflow-tasks/'.$task['task_id'].'/complete', [
'lease_owner' => $task['lease_owner'],
'workflow_task_attempt' => $task['workflow_task_attempt'] ?? 1,
'commands' => $commands,
], 10, [409]);
assert_completion_response($response, 'workflow task');
}
function complete_activity_task(array $task, mixed $result, string $codec): void
{
$response = request('POST', '/worker/activity-tasks/'.$task['task_id'].'/complete', [
'activity_attempt_id' => $task['activity_attempt_id'] ?? $task['attempt_id'] ?? '',
'lease_owner' => $task['lease_owner'],
'result' => envelope($result, $codec),
], 10, [409]);
assert_completion_response($response, 'activity task');
}
function handle_workflow_task(array $task): void
{
$workflowType = $task['workflow_type'] ?? '';
$codec = task_codec($task);
$input = decode_payload($task['arguments'] ?? null, $codec);
$input = is_array($input) && array_is_list($input) ? ($input[0] ?? []) : $input;
$history = $task['history_events'] ?? [];
$activityResult = activity_result_from_history($history, $codec);
if ($workflowType === 'PhpWorkflowCallsPythonActivity') {
if ($activityResult === null) {
complete_workflow_task($task, [[
'type' => 'schedule_activity',
'activity_type' => 'python_activity_for_php',
'queue' => TASK_QUEUE,
'arguments' => envelope([$input], $codec),
]]);
return;
}
complete_workflow_task($task, [[
'type' => 'complete_workflow',
'result' => envelope([
'php_workflow_result' => $activityResult,
'input_received' => $input,
], $codec),
]]);
return;
}
if ($workflowType === 'PhpWorkflowCallsPhpActivity') {
if ($activityResult === null) {
complete_workflow_task($task, [[
'type' => 'schedule_activity',
'activity_type' => 'php_activity_for_php',
'queue' => TASK_QUEUE,
'arguments' => envelope([$input], $codec),
]]);
return;
}
complete_workflow_task($task, [[
'type' => 'complete_workflow',
'result' => envelope(['php_to_php_result' => $activityResult], $codec),
]]);
return;
}
complete_workflow_task($task, [[
'type' => 'fail_workflow',
'message' => 'Unknown PHP workflow type: '.$workflowType,
]]);
}
function handle_activity_task(array $task): void
{
$codec = task_codec($task);
$input = decode_payload($task['arguments'] ?? null, $codec);
$input = is_array($input) && array_is_list($input) ? ($input[0] ?? []) : $input;
$activityType = $task['activity_type'] ?? '';
if ($activityType === 'php_activity_for_python') {
complete_activity_task($task, [
'php_activity_executed' => true,
'input' => $input,
'type_checks' => [
'string' => is_string($input['string'] ?? null),
'int' => is_int($input['int'] ?? null),
'float' => is_float($input['float'] ?? null),
'bool' => is_bool($input['bool'] ?? null),
'null' => ($input['null'] ?? 'not_null') === null,
'array' => is_array($input['array'] ?? null),
'map' => is_array($input['map'] ?? null),
],
'php_version' => PHP_VERSION,
], $codec);
return;
}
if ($activityType === 'php_activity_for_php') {
complete_activity_task($task, [
'php_activity' => 'success',
'data' => $input,
], $codec);
return;
}
throw new RuntimeException('Unknown PHP activity type: '.$activityType);
}
request('POST', '/worker/register', [
'worker_id' => WORKER_ID,
'task_queue' => TASK_QUEUE,
'runtime' => 'php',
'sdk_version' => 'durable-workflow-php/protocol-ci',
'supported_workflow_types' => [
'PhpWorkflowCallsPythonActivity',
'PhpWorkflowCallsPhpActivity',
],
'supported_activity_types' => [
'php_activity_for_python',
'php_activity_for_php',
],
]);
echo "PHP worker starting\n";
while (true) {
$workflowPoll = request('POST', '/worker/workflow-tasks/poll', [
'worker_id' => WORKER_ID,
'task_queue' => TASK_QUEUE,
], 6);
if (is_array($workflowPoll['task'] ?? null)) {
handle_workflow_task($workflowPoll['task']);
}
$activityPoll = request('POST', '/worker/activity-tasks/poll', [
'worker_id' => WORKER_ID,
'task_queue' => TASK_QUEUE,
], 6);
if (is_array($activityPoll['task'] ?? null)) {
handle_activity_task($activityPoll['task']);
}
usleep(100000);
}
EOF
- name: Start PHP worker in background
run: |
SERVER_IMAGE="$(docker compose images -q server)"
if [ -z "$SERVER_IMAGE" ]; then
echo "Unable to resolve built server image"
exit 1
fi
docker rm -f polyglot-php-worker >/dev/null 2>&1 || true
docker run -d --name polyglot-php-worker --network host \
-v /tmp/php-worker/worker.php:/worker.php:ro \
"$SERVER_IMAGE" php /worker.php > /tmp/php-worker.cid
echo "✓ PHP worker started (container: $(cat /tmp/php-worker.cid))"
sleep 2
docker logs polyglot-php-worker
- name: Set up Python 3.10
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install Python SDK
run: |
pip install durable-workflow httpx
- name: Copy Python worker script
run: |
cat > /tmp/python-worker.py << 'EOF'
#!/usr/bin/env python3
import asyncio
import sys
from durable_workflow import Client, Worker, workflow, activity
@workflow.defn(name="PythonWorkflowCallsPhpActivity")
class PythonWorkflowCallsPhpActivity:
def run(self, ctx, *args):
input_data = args[0] if args else {}
result = yield ctx.schedule_activity("php_activity_for_python", [input_data])
return {"python_workflow_result": result, "input_received": input_data}
@workflow.defn(name="PythonWorkflowCallsPythonActivity")
class PythonWorkflowCallsPythonActivity:
def run(self, ctx, *args):
input_data = args[0] if args else {}
result = yield ctx.schedule_activity("python_activity_for_python", [input_data])
return {"python_to_python_result": result}
@activity.defn(name="python_activity_for_php")
def python_activity_for_php(input_data: dict) -> dict:
return {
"python_activity_executed": True,
"input": input_data,
"type_checks": {
"string": isinstance(input_data.get("string"), str),
"int": isinstance(input_data.get("int"), int),
"float": isinstance(input_data.get("float"), float),
"bool": isinstance(input_data.get("bool"), bool),
"null": input_data.get("null") is None,
"array": isinstance(input_data.get("array"), list),
"map": isinstance(input_data.get("map"), dict),
},
"python_version": sys.version,
}
@activity.defn(name="python_activity_for_python")
def python_activity_for_python(input_data: dict) -> dict:
return {"python_activity": "success", "data": input_data}
async def main():
client = Client("http://localhost:8080", token="test-token-123", namespace="default")
worker = Worker(
client,
task_queue="polyglot-test",
workflows=[
PythonWorkflowCallsPhpActivity,
PythonWorkflowCallsPythonActivity,
],
activities=[
python_activity_for_php,
python_activity_for_python,
],
worker_id="python-polyglot-worker",
)
print("Python worker starting")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
EOF
chmod +x /tmp/python-worker.py
- name: Start Python worker in background
run: |
python -u /tmp/python-worker.py > /tmp/python-worker.log 2>&1 &
echo $! > /tmp/python-worker.pid
echo "✓ Python worker started (PID: $(cat /tmp/python-worker.pid))"
sleep 2
- name: Create test orchestrator
run: |
cat > /tmp/test-orchestrator.py << 'EOFTEST'
#!/usr/bin/env python3
import asyncio
import sys
from durable_workflow import Client
from durable_workflow import serializer
async def start_workflow_and_wait(client, workflow_type, workflow_id, input_data, timeout=30):
print(f"Starting {workflow_type}...")
data = await client._request(
"POST",
"/workflows",
json={
"workflow_type": workflow_type,
"workflow_id": workflow_id,
"task_queue": "polyglot-test",
"input": [input_data],
},
)
deadline = asyncio.get_running_loop().time() + timeout
while True:
desc = await client._request("GET", f"/workflows/{data['workflow_id']}")
status = desc.get("status")
if desc.get("is_terminal") or status in {"completed", "failed", "terminated", "canceled", "cancelled"}:
if status != "completed":
raise RuntimeError(f"workflow {workflow_id} ended with status={status}")
envelope = desc.get("output_envelope")
print(f" ✓ Completed")
if envelope is not None:
return serializer.decode_envelope(envelope)
return desc.get("output")
if asyncio.get_running_loop().time() > deadline:
raise TimeoutError(
f"workflow {workflow_id} not terminal after {timeout}s (status={status})"
)
await asyncio.sleep(1)
async def main():
client = Client("http://localhost:8080", token="test-token-123", namespace="default")
# Wait for server
for i in range(30):
try:
await client.get_cluster_info()
break
except Exception:
await asyncio.sleep(1)
# Wait for workers to register
await asyncio.sleep(5)
tests_passed = 0
tests_total = 0
# Test 1: PHP → Python
print("\n=== Test 1: PHP → Python ===")
tests_total += 1
try:
result = await start_workflow_and_wait(
client, "PhpWorkflowCallsPythonActivity", "php-to-python",
{"string": "hello", "int": 42, "float": 3.14, "bool": True, "null": None, "array": [1,2,3], "map": {"k":"v"}}
)
if result.get("php_workflow_result", {}).get("python_activity_executed"):
print(" ✓ PASS")
tests_passed += 1
else:
print(" ✗ FAIL")
except Exception as e:
print(f" ✗ FAIL: {e}")
# Test 2: Python → PHP
print("\n=== Test 2: Python → PHP ===")
tests_total += 1
try:
result = await start_workflow_and_wait(
client, "PythonWorkflowCallsPhpActivity", "python-to-php",
{"string": "world", "int": 99, "float": 2.71, "bool": False, "null": None, "array": ["a","b"], "map": {"foo":"bar"}}
)
if result.get("python_workflow_result", {}).get("php_activity_executed"):
print(" ✓ PASS")
tests_passed += 1
else:
print(" ✗ FAIL")
except Exception as e:
print(f" ✗ FAIL: {e}")
# Test 3: PHP → PHP
print("\n=== Test 3: PHP → PHP ===")
tests_total += 1
try:
result = await start_workflow_and_wait(
client, "PhpWorkflowCallsPhpActivity", "php-to-php",
{"test": "php-to-php"}
)
if result.get("php_to_php_result", {}).get("php_activity") == "success":
print(" ✓ PASS")
tests_passed += 1
else:
print(" ✗ FAIL")
except Exception as e:
print(f" ✗ FAIL: {e}")
# Test 4: Python → Python
print("\n=== Test 4: Python → Python ===")
tests_total += 1
try:
result = await start_workflow_and_wait(
client, "PythonWorkflowCallsPythonActivity", "python-to-python",
{"test": "python-to-python"}
)
if result.get("python_to_python_result", {}).get("python_activity") == "success":
print(" ✓ PASS")
tests_passed += 1
else:
print(" ✗ FAIL")
except Exception as e:
print(f" ✗ FAIL: {e}")
print(f"\n=== Summary: {tests_passed}/{tests_total} tests passed ===")
await client.aclose()
sys.exit(0 if tests_passed == tests_total else 1)
if __name__ == "__main__":
asyncio.run(main())
EOFTEST
chmod +x /tmp/test-orchestrator.py
- name: Run polyglot tests
run: |
python /tmp/test-orchestrator.py
- name: Show PHP worker log on failure
if: failure()
run: |
echo "=== PHP Worker Log ==="
docker logs polyglot-php-worker || echo "No PHP worker log"
- name: Show Python worker log on failure
if: failure()
run: |
echo "=== Python Worker Log ==="
cat /tmp/python-worker.log || echo "No Python worker log"
- name: Show server logs on failure
if: failure()
run: |
echo "=== Server Logs ==="
docker compose logs server
- name: Stop workers
if: always()
run: |
docker rm -f polyglot-php-worker 2>/dev/null || true
if [ -f /tmp/python-worker.pid ]; then
kill $(cat /tmp/python-worker.pid) 2>/dev/null || true
fi
- name: Stop server
if: always()
run: docker compose down -v