Skip to content

Commit cd14691

Browse files
Align server repair passes with matching-role scopes
Align server repair passes with matching-role scopes
1 parent debe72b commit cd14691

7 files changed

Lines changed: 208 additions & 7 deletions

File tree

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,11 @@ workflow-task command payload.
506506
- `GET /api/system/activity-timeouts` — Expired activity execution diagnostics
507507
- `POST /api/system/activity-timeouts/pass` — Enforce activity timeouts
508508

509+
`POST /api/system/repair/pass` accepts optional `connection`, `queue`,
510+
`run_ids`, and `instance_id` filters. Set `respect_throttle=true` when a
511+
dedicated matching-role loop should skip a pass rather than duplicate work
512+
already covered by another matching-role process holding the repair throttle.
513+
509514
### Namespaces
510515
- `GET /api/namespaces` — List namespaces
511516
- `POST /api/namespaces` — Create namespace

app/Http/Controllers/Api/SystemController.php

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@
99
use App\Support\WorkflowTaskFailureMetrics;
1010
use Illuminate\Http\JsonResponse;
1111
use Illuminate\Http\Request;
12+
use Workflow\V2\Contracts\MatchingRole;
1213
use Workflow\V2\Support\ActivityTimeoutEnforcer;
1314
use Workflow\V2\Support\HealthCheck;
1415
use Workflow\V2\Support\OperatorMetrics;
1516
use Workflow\V2\Support\TaskRepairCandidates;
1617
use Workflow\V2\Support\TaskRepairPolicy;
17-
use Workflow\V2\TaskWatchdog;
1818

1919
class SystemController
2020
{
2121
public function __construct(
2222
private readonly TaskQueueBuildIdRolloutSnapshot $buildIdRollouts,
23+
private readonly MatchingRole $matchingRole,
2324
) {}
2425

2526
public function repairPass(Request $request): JsonResponse
@@ -32,22 +33,34 @@ public function repairPass(Request $request): JsonResponse
3233
'run_ids' => ['nullable', 'array', 'max:100'],
3334
'run_ids.*' => ['string', 'min:1', 'max:128'],
3435
'instance_id' => ['nullable', 'string', 'min:1', 'max:128'],
36+
'connection' => ['nullable', 'string', 'max:128'],
37+
'queue' => ['nullable', 'string', 'max:128'],
38+
'respect_throttle' => ['nullable', 'boolean'],
3539
]);
3640

3741
$runIds = array_values(array_map(
3842
static fn (string $v): string => trim($v),
3943
$validated['run_ids'] ?? [],
4044
));
4145

42-
$instanceId = isset($validated['instance_id']) && is_string($validated['instance_id'])
43-
? trim($validated['instance_id'])
44-
: null;
46+
$instanceId = $this->trimmedString($validated['instance_id'] ?? null);
47+
$connection = $this->trimmedString($validated['connection'] ?? null);
48+
$queue = $this->trimmedString($validated['queue'] ?? null);
49+
$respectThrottle = (bool) ($validated['respect_throttle'] ?? false);
4550

46-
$report = TaskWatchdog::runPass(
51+
$report = $this->matchingRole->runPass(
52+
connection: $connection,
53+
queue: $queue,
54+
respectThrottle: $respectThrottle,
4755
runIds: $runIds,
4856
instanceId: $instanceId,
4957
);
5058
$report = array_replace([
59+
'connection' => $connection,
60+
'queue' => $queue,
61+
'run_ids' => $runIds,
62+
'instance_id' => $instanceId,
63+
'respect_throttle' => $respectThrottle,
5164
'selected_command_contract_candidates' => 0,
5265
'backfilled_command_contracts' => 0,
5366
'command_contract_backfill_unavailable' => 0,
@@ -281,4 +294,15 @@ public function retentionEnforcePass(Request $request): JsonResponse
281294

282295
return ControlPlaneProtocol::json($report, $hasFailures ? 207 : 200);
283296
}
297+
298+
private function trimmedString(mixed $value): ?string
299+
{
300+
if (! is_string($value)) {
301+
return null;
302+
}
303+
304+
$value = trim($value);
305+
306+
return $value === '' ? null : $value;
307+
}
284308
}

app/Support/WorkflowPackageApiFloor.php

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
use ReflectionMethod;
88
use RuntimeException;
99
use Workflow\Serializers\CodecRegistry;
10+
use Workflow\V2\Contracts\MatchingRole;
1011
use Workflow\V2\Support\BackendCapabilities;
1112
use Workflow\V2\Support\ChildWorkflowNamespaceProjection;
13+
use Workflow\V2\Support\DefaultMatchingRole;
1214
use Workflow\V2\Support\MatchingRoleSnapshot;
1315

1416
/**
@@ -55,6 +57,19 @@ final class WorkflowPackageApiFloor
5557
// local WorkflowLink / WorkflowRunLineageEntry observer glue.
5658
[ChildWorkflowNamespaceProjection::class, 'projectLink'],
5759
[ChildWorkflowNamespaceProjection::class, 'projectLineageEntry'],
60+
// Server control-plane repair passes resolve through the package's
61+
// dedicated matching-role implementation instead of hard-coding the
62+
// in-process watchdog.
63+
[DefaultMatchingRole::class, 'wake'],
64+
[DefaultMatchingRole::class, 'runPass'],
65+
];
66+
67+
/**
68+
* Interface methods the server type-hints directly.
69+
*/
70+
private const REQUIRED_INTERFACE_APIS = [
71+
[MatchingRole::class, 'wake'],
72+
[MatchingRole::class, 'runPass'],
5873
];
5974

6075
/**
@@ -96,6 +111,12 @@ public static function assert(): void
96111
}
97112
}
98113

114+
foreach (self::REQUIRED_INTERFACE_APIS as [$class, $method]) {
115+
if (! self::hasInterfaceMethod($class, $method)) {
116+
$missing[] = sprintf('%s::%s()', $class, $method);
117+
}
118+
}
119+
99120
if (! class_exists(self::POLL_MODE_DEMOTION_CLASS)) {
100121
$missing[] = self::POLL_MODE_DEMOTION_CLASS;
101122
} elseif (! self::confirmsPollModeDemotion(self::POLL_MODE_DEMOTION_CLASS, self::POLL_MODE_DEMOTION_METHOD)) {
@@ -114,8 +135,9 @@ public static function assert(): void
114135
"Installed durable-workflow/workflow package is older than the server's API floor. "
115136
.'Missing: %s. Re-run `composer update durable-workflow/workflow` against a v2 snapshot that '
116137
.'includes CodecRegistry::universal(), CodecRegistry::engineSpecific(), MatchingRoleSnapshot::current(), '
117-
.'and the poll-mode queue capability demotion, plus ChildWorkflowNamespaceProjection for package-owned '
118-
.'child namespace propagation (see repos/workflow commits 8e132d0, cfd8e95, and f666b25, or newer).',
138+
.'the poll-mode queue capability demotion, and the matching-role repair-pass contract, plus '
139+
.'ChildWorkflowNamespaceProjection for package-owned child namespace propagation '
140+
.'(see repos/workflow commits 8e132d0, cfd8e95, and f666b25, or newer).',
119141
implode(', ', $missing),
120142
));
121143
}
@@ -152,6 +174,22 @@ private static function hasInstanceMethod(string $class, string $method): bool
152174
return $methodReflection->isPublic() && ! $methodReflection->isStatic();
153175
}
154176

177+
private static function hasInterfaceMethod(string $class, string $method): bool
178+
{
179+
if (! interface_exists($class)) {
180+
return false;
181+
}
182+
183+
try {
184+
$reflection = new ReflectionClass($class);
185+
$methodReflection = $reflection->getMethod($method);
186+
} catch (ReflectionException) {
187+
return false;
188+
}
189+
190+
return $methodReflection->isPublic() && ! $methodReflection->isStatic();
191+
}
192+
155193
/**
156194
* Prove the installed BackendCapabilities::queue() contains the
157195
* poll-mode demotion logic from workflow@f666b25.

tests/Feature/ControlPlaneOperationalSuccessContractTest.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ public static function operationalSuccessProvider(): array
237237
'path' => '/api/system/repair/pass',
238238
'body' => [],
239239
'structure' => [
240+
'connection',
241+
'queue',
242+
'run_ids',
243+
'instance_id',
244+
'respect_throttle',
240245
'throttled',
241246
'selected_existing_task_candidates',
242247
'selected_missing_task_candidates',

tests/Feature/ControlPlaneValidationContractTest.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public static function validationEndpointProvider(): array
8484
'errorField' => 'run_ids',
8585
'controlOperation' => null,
8686
],
87+
'system.repair_pass_respect_throttle' => [
88+
'method' => 'post',
89+
'path' => '/api/system/repair/pass',
90+
'body' => ['respect_throttle' => 'not-a-bool'],
91+
'errorField' => 'respect_throttle',
92+
'controlOperation' => null,
93+
],
8794
'system.activity_timeout_status' => [
8895
'method' => 'get',
8996
'path' => '/api/system/activity-timeouts?limit=0',

tests/Feature/TransportRepairTest.php

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Tests\Feature\Concerns\ServerTestHelpers;
99
use Tests\Fixtures\ExternalGreetingWorkflow;
1010
use Tests\TestCase;
11+
use Workflow\V2\Contracts\MatchingRole;
1112
use Workflow\V2\Enums\TaskStatus;
1213
use Workflow\V2\Models\WorkflowTask;
1314

@@ -113,6 +114,11 @@ public function test_system_repair_pass_returns_report(): void
113114
->postJson('/api/system/repair/pass')
114115
->assertOk()
115116
->assertJsonStructure([
117+
'connection',
118+
'queue',
119+
'run_ids',
120+
'instance_id',
121+
'respect_throttle',
116122
'throttled',
117123
'selected_existing_task_candidates',
118124
'selected_missing_task_candidates',
@@ -135,6 +141,10 @@ public function test_system_repair_pass_with_empty_system_returns_zero_repairs()
135141
->postJson('/api/system/repair/pass');
136142

137143
$response->assertOk()
144+
->assertJsonPath('connection', null)
145+
->assertJsonPath('queue', null)
146+
->assertJsonPath('instance_id', null)
147+
->assertJsonPath('respect_throttle', false)
138148
->assertJsonPath('throttled', false)
139149
->assertJsonPath('selected_total_candidates', 0)
140150
->assertJsonPath('repaired_existing_tasks', 0)
@@ -144,6 +154,7 @@ public function test_system_repair_pass_with_empty_system_returns_zero_repairs()
144154
->assertJsonPath('backfilled_command_contracts', 0)
145155
->assertJsonPath('command_contract_backfill_unavailable', 0);
146156

157+
$this->assertSame([], $response->json('run_ids'));
147158
$this->assertSame([], $response->json('existing_task_failures'));
148159
$this->assertSame([], $response->json('missing_run_failures'));
149160
$this->assertSame([], $response->json('command_contract_failures'));
@@ -156,6 +167,7 @@ public function test_system_repair_pass_accepts_run_id_filter(): void
156167
'run_ids' => ['non-existent-run-id'],
157168
])
158169
->assertOk()
170+
->assertJsonPath('run_ids.0', 'non-existent-run-id')
159171
->assertJsonPath('selected_total_candidates', 0);
160172
}
161173

@@ -166,9 +178,93 @@ public function test_system_repair_pass_accepts_instance_id_filter(): void
166178
'instance_id' => 'non-existent-instance',
167179
])
168180
->assertOk()
181+
->assertJsonPath('instance_id', 'non-existent-instance')
169182
->assertJsonPath('selected_total_candidates', 0);
170183
}
171184

185+
public function test_system_repair_pass_uses_matching_role_binding_and_forwards_scope_options(): void
186+
{
187+
$fake = new class implements MatchingRole
188+
{
189+
/**
190+
* @var array{connection: string|null, queue: string|null, respectThrottle: bool, runIds: list<string>, instanceId: string|null}|null
191+
*/
192+
public ?array $lastRunPassArguments = null;
193+
194+
public function wake(?string $connection = null, ?string $queue = null): void {}
195+
196+
public function runPass(
197+
?string $connection = null,
198+
?string $queue = null,
199+
bool $respectThrottle = false,
200+
array $runIds = [],
201+
?string $instanceId = null,
202+
): array {
203+
$this->lastRunPassArguments = [
204+
'connection' => $connection,
205+
'queue' => $queue,
206+
'respectThrottle' => $respectThrottle,
207+
'runIds' => $runIds,
208+
'instanceId' => $instanceId,
209+
];
210+
211+
return [
212+
'connection' => $connection,
213+
'queue' => $queue,
214+
'run_ids' => $runIds,
215+
'instance_id' => $instanceId,
216+
'respect_throttle' => $respectThrottle,
217+
'throttled' => false,
218+
'selected_existing_task_candidates' => 2,
219+
'selected_missing_task_candidates' => 1,
220+
'selected_total_candidates' => 3,
221+
'repaired_existing_tasks' => 2,
222+
'repaired_missing_tasks' => 1,
223+
'dispatched_tasks' => 3,
224+
'existing_task_failures' => [],
225+
'missing_run_failures' => [],
226+
'deadline_expired_candidates' => 0,
227+
'deadline_expired_tasks_created' => 0,
228+
'deadline_expired_failures' => [],
229+
'activity_timeout_candidates' => 0,
230+
'activity_timeouts_enforced' => 0,
231+
'activity_timeout_failures' => [],
232+
];
233+
}
234+
};
235+
236+
$this->app->instance(MatchingRole::class, $fake);
237+
238+
$response = $this->withHeaders($this->apiHeaders())
239+
->postJson('/api/system/repair/pass', [
240+
'connection' => ' redis ',
241+
'queue' => ' critical ',
242+
'run_ids' => [' run-a ', 'run-b'],
243+
'instance_id' => ' instance-42 ',
244+
'respect_throttle' => true,
245+
]);
246+
247+
$response->assertOk()
248+
->assertJsonPath('connection', 'redis')
249+
->assertJsonPath('queue', 'critical')
250+
->assertJsonPath('run_ids.0', 'run-a')
251+
->assertJsonPath('run_ids.1', 'run-b')
252+
->assertJsonPath('instance_id', 'instance-42')
253+
->assertJsonPath('respect_throttle', true)
254+
->assertJsonPath('selected_total_candidates', 3)
255+
->assertJsonPath('repaired_existing_tasks', 2)
256+
->assertJsonPath('repaired_missing_tasks', 1)
257+
->assertJsonPath('dispatched_tasks', 3);
258+
259+
$this->assertSame([
260+
'connection' => 'redis',
261+
'queue' => 'critical',
262+
'respectThrottle' => true,
263+
'runIds' => ['run-a', 'run-b'],
264+
'instanceId' => 'instance-42',
265+
], $fake->lastRunPassArguments);
266+
}
267+
172268
public function test_system_repair_pass_recovers_expired_poll_mode_workflow_task_leases(): void
173269
{
174270
config(['server.polling.timeout' => 0]);

tests/Unit/Support/WorkflowPackageApiFloorTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
use ReflectionMethod;
99
use Tests\Fixtures\StaleBackendCapabilities;
1010
use Workflow\Serializers\CodecRegistry;
11+
use Workflow\V2\Contracts\MatchingRole;
1112
use Workflow\V2\Support\BackendCapabilities;
1213
use Workflow\V2\Support\ChildWorkflowNamespaceProjection;
14+
use Workflow\V2\Support\DefaultMatchingRole;
1315
use Workflow\V2\Support\MatchingRoleSnapshot;
1416

1517
/**
@@ -71,6 +73,30 @@ public function test_child_workflow_namespace_projection_is_public_instance_api(
7173
}
7274
}
7375

76+
public function test_matching_role_contract_is_public_instance_api(): void
77+
{
78+
$reflection = new ReflectionClass(MatchingRole::class);
79+
80+
foreach (['wake', 'runPass'] as $methodName) {
81+
$method = $reflection->getMethod($methodName);
82+
83+
$this->assertTrue($method->isPublic());
84+
$this->assertFalse($method->isStatic());
85+
}
86+
}
87+
88+
public function test_default_matching_role_exposes_public_instance_repair_methods(): void
89+
{
90+
$reflection = new ReflectionClass(DefaultMatchingRole::class);
91+
92+
foreach (['wake', 'runPass'] as $methodName) {
93+
$method = $reflection->getMethod($methodName);
94+
95+
$this->assertTrue($method->isPublic());
96+
$this->assertFalse($method->isStatic());
97+
}
98+
}
99+
74100
public function test_poll_mode_demotion_check_accepts_current_workflow_package(): void
75101
{
76102
$confirms = $this->invokeConfirmsPollModeDemotion(BackendCapabilities::class, 'queue');

0 commit comments

Comments
 (0)