Skip to content

Commit 9722a95

Browse files
Map invocable carrier results to activity reports
1 parent 1af21a2 commit 9722a95

2 files changed

Lines changed: 375 additions & 0 deletions

File tree

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
final class InvocableCarrierResultMapper
6+
{
7+
public const RESULT_SCHEMA = 'durable-workflow.v2.external-task-result';
8+
9+
/**
10+
* @return array{action: string, payload: array<string, mixed>, reason: string}
11+
*/
12+
public function map(array $envelope, string $taskId, string $attemptId, string $leaseOwner): array
13+
{
14+
if (! $this->validEnvelopeForTask($envelope, $taskId, $attemptId)) {
15+
return $this->malformed(
16+
$taskId,
17+
$attemptId,
18+
$leaseOwner,
19+
'Handler returned an invalid external task result envelope.',
20+
'invalid_envelope',
21+
);
22+
}
23+
24+
$status = $this->stringValue($envelope['outcome']['status'] ?? null);
25+
26+
if ($status === 'succeeded') {
27+
return [
28+
'action' => 'complete',
29+
'payload' => [
30+
'activity_attempt_id' => $attemptId,
31+
'lease_owner' => $leaseOwner,
32+
'result' => $this->payloadEnvelope($envelope['result']['payload'] ?? null),
33+
],
34+
'reason' => 'handler_succeeded',
35+
];
36+
}
37+
38+
if ($status === 'failed') {
39+
return [
40+
'action' => 'fail',
41+
'payload' => [
42+
'activity_attempt_id' => $attemptId,
43+
'lease_owner' => $leaseOwner,
44+
'failure' => $this->failurePayload($envelope),
45+
],
46+
'reason' => 'handler_failed',
47+
];
48+
}
49+
50+
return $this->malformed(
51+
$taskId,
52+
$attemptId,
53+
$leaseOwner,
54+
'Handler result envelope has an unsupported outcome status.',
55+
'unsupported_status',
56+
);
57+
}
58+
59+
/**
60+
* @param array<string, mixed> $rawOutput
61+
* @return array{action: string, payload: array<string, mixed>, reason: string}
62+
*/
63+
public function malformedTransportResult(
64+
string $taskId,
65+
string $attemptId,
66+
string $leaseOwner,
67+
string $message,
68+
array $rawOutput = [],
69+
bool $retryable = false,
70+
): array {
71+
return [
72+
'action' => 'fail',
73+
'payload' => [
74+
'activity_attempt_id' => $attemptId,
75+
'lease_owner' => $leaseOwner,
76+
'failure' => array_filter([
77+
'message' => $message,
78+
'type' => 'MalformedExternalTaskOutput',
79+
'kind' => 'malformed_output',
80+
'retryable' => $retryable,
81+
'non_retryable' => ! $retryable,
82+
'cancelled' => false,
83+
'malformed_output' => true,
84+
'details' => $rawOutput === [] ? null : [
85+
'codec' => 'json/plain',
86+
'blob' => (string) json_encode($rawOutput, JSON_UNESCAPED_SLASHES),
87+
],
88+
], static fn (mixed $value): bool => $value !== null),
89+
],
90+
'reason' => 'malformed_transport_output',
91+
];
92+
}
93+
94+
private function validEnvelopeForTask(array $envelope, string $taskId, string $attemptId): bool
95+
{
96+
return ($envelope['schema'] ?? null) === self::RESULT_SCHEMA
97+
&& ($envelope['version'] ?? null) === ExternalTaskResultContract::VERSION
98+
&& ($envelope['task']['kind'] ?? null) === 'activity_task'
99+
&& ($envelope['task']['id'] ?? null) === $taskId
100+
&& ($envelope['task']['idempotency_key'] ?? null) === $attemptId
101+
&& is_array($envelope['outcome'] ?? null);
102+
}
103+
104+
/**
105+
* @return array<string, mixed>
106+
*/
107+
private function failurePayload(array $envelope): array
108+
{
109+
$failure = is_array($envelope['failure'] ?? null) ? $envelope['failure'] : [];
110+
$retryable = (bool) ($envelope['outcome']['retryable'] ?? false);
111+
112+
return array_filter([
113+
'message' => $this->stringValue($failure['message'] ?? null) ?? 'External activity handler failed.',
114+
'type' => $this->stringValue($failure['type'] ?? null),
115+
'stack_trace' => $this->stringValue($failure['stack_trace'] ?? null),
116+
'kind' => $this->failureKind($failure['kind'] ?? null),
117+
'timeout_type' => $this->timeoutType($failure['timeout_type'] ?? null),
118+
'retryable' => $retryable,
119+
'non_retryable' => ! $retryable,
120+
'cancelled' => (bool) ($failure['cancelled'] ?? false),
121+
'malformed_output' => ($failure['kind'] ?? null) === 'malformed_output',
122+
'details' => $this->payloadEnvelope($failure['details'] ?? null),
123+
], static fn (mixed $value): bool => $value !== null);
124+
}
125+
126+
/**
127+
* @return array<string, mixed>|null
128+
*/
129+
private function payloadEnvelope(mixed $payload): ?array
130+
{
131+
if (! is_array($payload)) {
132+
return null;
133+
}
134+
135+
$codec = $this->stringValue($payload['codec'] ?? null);
136+
$blob = $payload['blob'] ?? null;
137+
138+
if ($codec === null || ! is_string($blob)) {
139+
return null;
140+
}
141+
142+
return [
143+
'codec' => $codec,
144+
'blob' => $blob,
145+
];
146+
}
147+
148+
private function failureKind(mixed $kind): string
149+
{
150+
$kind = $this->stringValue($kind);
151+
152+
return in_array($kind, [
153+
'application',
154+
'timeout',
155+
'cancellation',
156+
'malformed_output',
157+
'handler_crash',
158+
'decode_failure',
159+
'unsupported_payload',
160+
], true) ? $kind : 'application';
161+
}
162+
163+
private function timeoutType(mixed $timeoutType): ?string
164+
{
165+
$timeoutType = $this->stringValue($timeoutType);
166+
167+
return in_array($timeoutType, [
168+
'schedule_to_start',
169+
'start_to_close',
170+
'schedule_to_close',
171+
'heartbeat',
172+
'deadline_exceeded',
173+
], true) ? $timeoutType : null;
174+
}
175+
176+
/**
177+
* @return array{action: string, payload: array<string, mixed>, reason: string}
178+
*/
179+
private function malformed(
180+
string $taskId,
181+
string $attemptId,
182+
string $leaseOwner,
183+
string $message,
184+
string $reason,
185+
): array {
186+
return $this->malformedTransportResult(
187+
$taskId,
188+
$attemptId,
189+
$leaseOwner,
190+
$message,
191+
['reason' => $reason],
192+
);
193+
}
194+
195+
private function stringValue(mixed $value): ?string
196+
{
197+
if (! is_string($value)) {
198+
return null;
199+
}
200+
201+
$value = trim($value);
202+
203+
return $value === '' ? null : $value;
204+
}
205+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
<?php
2+
3+
namespace Tests\Unit;
4+
5+
use App\Support\InvocableCarrierResultMapper;
6+
use PHPUnit\Framework\TestCase;
7+
8+
class InvocableCarrierResultMapperTest extends TestCase
9+
{
10+
public function test_success_envelope_maps_to_activity_complete_payload(): void
11+
{
12+
$mapped = (new InvocableCarrierResultMapper)->map(
13+
$this->successEnvelope(),
14+
'acttask_123',
15+
'attempt_123',
16+
'invocable-carrier',
17+
);
18+
19+
$this->assertSame('complete', $mapped['action']);
20+
$this->assertSame('handler_succeeded', $mapped['reason']);
21+
$this->assertSame('attempt_123', $mapped['payload']['activity_attempt_id']);
22+
$this->assertSame('invocable-carrier', $mapped['payload']['lease_owner']);
23+
$this->assertSame([
24+
'codec' => 'json/plain',
25+
'blob' => '{"ok":true}',
26+
], $mapped['payload']['result']);
27+
}
28+
29+
public function test_failure_envelope_maps_to_activity_fail_payload(): void
30+
{
31+
$mapped = (new InvocableCarrierResultMapper)->map(
32+
$this->failureEnvelope(),
33+
'acttask_123',
34+
'attempt_123',
35+
'invocable-carrier',
36+
);
37+
38+
$this->assertSame('fail', $mapped['action']);
39+
$this->assertSame('handler_failed', $mapped['reason']);
40+
$this->assertSame('attempt_123', $mapped['payload']['activity_attempt_id']);
41+
$this->assertSame('invocable-carrier', $mapped['payload']['lease_owner']);
42+
$this->assertSame('ProviderTimeout', $mapped['payload']['failure']['type']);
43+
$this->assertSame('timeout', $mapped['payload']['failure']['kind']);
44+
$this->assertSame('deadline_exceeded', $mapped['payload']['failure']['timeout_type']);
45+
$this->assertTrue($mapped['payload']['failure']['retryable']);
46+
$this->assertFalse($mapped['payload']['failure']['non_retryable']);
47+
$this->assertSame([
48+
'codec' => 'json/plain',
49+
'blob' => '{"provider":"billing"}',
50+
], $mapped['payload']['failure']['details']);
51+
}
52+
53+
public function test_invalid_envelope_fails_closed_as_malformed_output(): void
54+
{
55+
$mapped = (new InvocableCarrierResultMapper)->map(
56+
[
57+
'schema' => 'durable-workflow.v2.external-task-result',
58+
'version' => 1,
59+
'outcome' => ['status' => 'succeeded'],
60+
'task' => [
61+
'id' => 'different-task',
62+
'kind' => 'activity_task',
63+
'attempt' => 1,
64+
'idempotency_key' => 'attempt_123',
65+
],
66+
],
67+
'acttask_123',
68+
'attempt_123',
69+
'invocable-carrier',
70+
);
71+
72+
$this->assertSame('fail', $mapped['action']);
73+
$this->assertSame('malformed_transport_output', $mapped['reason']);
74+
$this->assertSame('malformed_output', $mapped['payload']['failure']['kind']);
75+
$this->assertSame('MalformedExternalTaskOutput', $mapped['payload']['failure']['type']);
76+
$this->assertFalse($mapped['payload']['failure']['retryable']);
77+
$this->assertTrue($mapped['payload']['failure']['non_retryable']);
78+
$this->assertTrue($mapped['payload']['failure']['malformed_output']);
79+
}
80+
81+
public function test_malformed_transport_output_can_preserve_raw_diagnostics(): void
82+
{
83+
$mapped = (new InvocableCarrierResultMapper)->malformedTransportResult(
84+
'acttask_123',
85+
'attempt_123',
86+
'invocable-carrier',
87+
'Handler returned invalid JSON.',
88+
[
89+
'status_code' => 502,
90+
'stdout_preview' => '{not-json',
91+
],
92+
retryable: true,
93+
);
94+
95+
$this->assertSame('fail', $mapped['action']);
96+
$this->assertSame('Handler returned invalid JSON.', $mapped['payload']['failure']['message']);
97+
$this->assertTrue($mapped['payload']['failure']['retryable']);
98+
$this->assertFalse($mapped['payload']['failure']['non_retryable']);
99+
$this->assertSame('json/plain', $mapped['payload']['failure']['details']['codec']);
100+
$this->assertStringContainsString('status_code', $mapped['payload']['failure']['details']['blob']);
101+
}
102+
103+
/**
104+
* @return array<string, mixed>
105+
*/
106+
private function successEnvelope(): array
107+
{
108+
return [
109+
'schema' => 'durable-workflow.v2.external-task-result',
110+
'version' => 1,
111+
'outcome' => [
112+
'status' => 'succeeded',
113+
'recorded' => false,
114+
],
115+
'task' => $this->task(),
116+
'result' => [
117+
'payload' => [
118+
'codec' => 'json/plain',
119+
'blob' => '{"ok":true}',
120+
],
121+
'metadata' => null,
122+
],
123+
'metadata' => ['duration_ms' => 42],
124+
];
125+
}
126+
127+
/**
128+
* @return array<string, mixed>
129+
*/
130+
private function failureEnvelope(): array
131+
{
132+
return [
133+
'schema' => 'durable-workflow.v2.external-task-result',
134+
'version' => 1,
135+
'outcome' => [
136+
'status' => 'failed',
137+
'retryable' => true,
138+
'recorded' => false,
139+
],
140+
'task' => $this->task(),
141+
'failure' => [
142+
'kind' => 'timeout',
143+
'classification' => 'deadline_exceeded',
144+
'message' => 'Deadline exceeded while waiting for billing provider.',
145+
'type' => 'ProviderTimeout',
146+
'stack_trace' => null,
147+
'timeout_type' => 'deadline_exceeded',
148+
'cancelled' => false,
149+
'details' => [
150+
'codec' => 'json/plain',
151+
'blob' => '{"provider":"billing"}',
152+
],
153+
],
154+
'metadata' => ['duration_ms' => 5000],
155+
];
156+
}
157+
158+
/**
159+
* @return array<string, mixed>
160+
*/
161+
private function task(): array
162+
{
163+
return [
164+
'id' => 'acttask_123',
165+
'kind' => 'activity_task',
166+
'attempt' => 1,
167+
'idempotency_key' => 'attempt_123',
168+
];
169+
}
170+
}

0 commit comments

Comments
 (0)