Skip to content

Commit 870137d

Browse files
committed
fix(triggers): scheduler-owned resubmission of triggers on worker loss
The liveness coordinator re-emitted the persisted WorkerTrigger of a dead worker, replaying a stale definition and resurrecting disabled or deleted triggers. It now sends a TriggerWorkerLost event instead; the scheduler releases the lock (guarded by the holding workerId) and resubmits from the current flow definition. As a safety net, TriggerReceived for a disabled or deleted trigger re-emits the kill.
1 parent aab3c60 commit 870137d

8 files changed

Lines changed: 212 additions & 72 deletions

File tree

core/src/main/java/io/kestra/core/scheduler/events/TriggerEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
@JsonSubTypes.Type(value = ResetTrigger.class, name = "RESET_TRIGGER"),
3737
@JsonSubTypes.Type(value = SetDisableTrigger.class, name = "SET_DISABLE_TRIGGER"),
3838
@JsonSubTypes.Type(value = TriggerReceived.class, name = "TRIGGER_RECEIVED"),
39+
@JsonSubTypes.Type(value = TriggerWorkerLost.class, name = "TRIGGER_WORKER_LOST"),
3940
@JsonSubTypes.Type(value = TriggerEvent.Invalid.class, name = "INVALID"),
4041
}
4142
)

core/src/main/java/io/kestra/core/scheduler/events/TriggerEventType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public enum TriggerEventType {
1616
TRIGGER_EVALUATED,
1717
TRIGGER_EXECUTION_TERMINATED,
1818
TRIGGER_RECEIVED,
19+
TRIGGER_WORKER_LOST,
1920
// COMMANDS,
2021
CREATE_BACKFILL_TRIGGER,
2122
DELETE_BACKFILL_TRIGGER,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.kestra.core.scheduler.events;
2+
3+
import java.time.Instant;
4+
5+
import io.kestra.core.events.EventId;
6+
import io.kestra.core.models.triggers.TriggerId;
7+
8+
/**
9+
* The worker holding a trigger is gone (crashed or terminated) without reporting a result.
10+
* <p>
11+
* The scheduler reacts by releasing the trigger's lock so it can be resubmitted from the
12+
* current flow definition — or left alone if it is disabled or deleted.
13+
*/
14+
public record TriggerWorkerLost(
15+
TriggerId id,
16+
String workerUid,
17+
Instant timestamp,
18+
EventId eventId) implements TriggerEvent {
19+
20+
public TriggerWorkerLost(TriggerId id, String workerUid) {
21+
this(id, workerUid, Instant.now(), EventId.create());
22+
}
23+
}

executor/src/main/java/io/kestra/executor/DefaultServiceLivenessCoordinator.java

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
import io.kestra.core.killswitch.KillSwitchService;
1616
import io.kestra.core.lock.LockService;
1717
import io.kestra.core.metrics.MetricRegistry;
18-
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
1918
import io.kestra.core.models.triggers.TriggerId;
2019
import io.kestra.core.queues.KeyedDispatchQueueInterface;
2120
import io.kestra.core.queues.QueueException;
2221
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
2322
import io.kestra.core.runners.*;
23+
import io.kestra.core.scheduler.events.TriggerWorkerLost;
24+
import io.kestra.core.scheduler.queue.TriggerEventQueue;
2425
import io.kestra.core.scheduler.vnodes.VNodeController;
2526
import io.kestra.core.server.*;
2627
import io.kestra.core.utils.IdUtils;
@@ -66,6 +67,7 @@ public class DefaultServiceLivenessCoordinator extends AbstractServiceLivenessTa
6667
private final KillSwitchService killSwitchService;
6768
private final KeyedDispatchQueueInterface<WorkerJobEvent> workerJobEventQueue;
6869
private final WorkerJobRunningStateStore workerJobRunningStateStore;
70+
private final TriggerEventQueue triggerEventQueue;
6971
private final MetricRegistry metricRegistry;
7072
private final VNodeController vNodeController;
7173
private Counter workerJobResubmitCounter;
@@ -92,6 +94,7 @@ public DefaultServiceLivenessCoordinator(final ServiceLivenessStore store,
9294
final KillSwitchService killSwitchService,
9395
final KeyedDispatchQueueInterface<WorkerJobEvent> workerJobEventQueue,
9496
final WorkerJobRunningStateStore workerJobRunningStateStore,
97+
final TriggerEventQueue triggerEventQueue,
9598
final ServerConfig serverConfig,
9699
final MetricRegistry metricRegistry,
97100
final VNodeController vNodeController) {
@@ -103,6 +106,7 @@ public DefaultServiceLivenessCoordinator(final ServiceLivenessStore store,
103106
this.killSwitchService = killSwitchService;
104107
this.workerJobEventQueue = workerJobEventQueue;
105108
this.workerJobRunningStateStore = workerJobRunningStateStore;
109+
this.triggerEventQueue = triggerEventQueue;
106110
this.lockService = lockService;
107111
this.metricRegistry = metricRegistry;
108112
this.purgeRetention = serverConfig.service() != null && serverConfig.service().purge() != null
@@ -176,10 +180,10 @@ protected void handleAllWorkersForUncleanShutdown(Instant now) {
176180
reEmitWorkerJobsForWorker(txContext, serviceInstance.uid());
177181
}
178182
} else if (serviceInstance.is(Service.ServiceState.TERMINATED_GRACEFULLY)) {
179-
// realtime triggers need to be resubmitted even when terminated gracefully
183+
// triggers whose terminal result was not flushed need to be released even when terminated gracefully
180184
if (serviceInstance.config().workerTaskRestartStrategy().isRestartable()) {
181-
log.info("Trigger realtime trigger restart for terminated gracefully worker: {}.", serviceInstance.uid());
182-
reEmitRealtimeTriggerForWorker(txContext, serviceInstance.uid());
185+
log.info("Trigger restart for terminated gracefully worker: {}.", serviceInstance.uid());
186+
notifyTriggersLostForWorker(txContext, serviceInstance.uid());
183187
}
184188
}
185189

@@ -257,10 +261,12 @@ private void reEmitWorkerJobsForWorker(final TransactionContext txContext, final
257261
});
258262
}
259263

260-
private void reEmitRealtimeTriggerForWorker(final TransactionContext txContext, final String id) {
264+
private void notifyTriggersLostForWorker(final TransactionContext txContext, final String id) {
261265
workerJobRunningStateStore.processWorkerJobsForDeadWorker(txContext, id, (txContext2, workerJobRunning) ->
262266
{
263-
resubmitRealtimeTrigger(txContext2, workerJobRunning);
267+
if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) {
268+
notifyTriggerWorkerLost(txContext2, workerTriggerRunning);
269+
}
264270
});
265271
}
266272

@@ -393,14 +399,7 @@ private void resubmitWorkerJobRunning(TransactionContext txContext, WorkerJobRun
393399

394400
// WorkerTriggerRunning
395401
if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) {
396-
resubmitWorkerTrigger(workerTriggerRunning);
397-
}
398-
}
399-
400-
private void resubmitRealtimeTrigger(TransactionContext txContext, WorkerJobRunning workerJobRunning) {
401-
// we only resubmit realtime triggers
402-
if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning && workerTriggerRunning.getTrigger() instanceof RealtimeTriggerInterface) {
403-
resubmitWorkerTrigger(workerTriggerRunning);
402+
notifyTriggerWorkerLost(txContext, workerTriggerRunning);
404403
}
405404
}
406405

@@ -435,30 +434,25 @@ private void resubmitWorkerTask(TransactionContext txContext, WorkerTaskRunning
435434
}
436435
}
437436

438-
private void resubmitWorkerTrigger(WorkerTriggerRunning workerTriggerRunning) {
439-
try {
440-
String raw = workerTriggerRunning.getWorkerInstance().workerQueueId();
441-
String workerQueueId = (raw == null || raw.isEmpty()) ? null : raw;
442-
WorkerTrigger workerTrigger = WorkerTrigger.builder()
443-
.trigger(workerTriggerRunning.getTrigger())
444-
.data(workerTriggerRunning.getData())
445-
.build();
446-
workerJobEventQueue.emit(workerQueueId, WorkerJobEvent.of(workerTrigger, workerQueueId));
447-
Logs.logTrigger(
448-
workerTrigger.triggerId(),
449-
Level.WARN,
450-
"Re-emitting WorkerTrigger."
451-
);
452-
} catch (QueueException e) {
453-
Logs.logTrigger(
454-
TriggerId.of(
455-
workerTriggerRunning.getData().tenantId(), workerTriggerRunning.getData().namespace(), workerTriggerRunning.getData().flowId(),
456-
workerTriggerRunning.getTrigger().getId()
457-
),
458-
Level.ERROR,
459-
"Unable to re-emit WorkerTrigger.",
460-
e
461-
);
462-
}
437+
/**
438+
* The worker holding the trigger is gone: delete its running entry and notify the scheduler,
439+
* which owns resubmission. Re-emitting the persisted job from here would replay a stale
440+
* trigger definition and ignore a disabled or deleted state.
441+
*/
442+
private void notifyTriggerWorkerLost(TransactionContext txContext, WorkerTriggerRunning workerTriggerRunning) {
443+
TriggerId triggerId = TriggerId.of(
444+
workerTriggerRunning.getData().tenantId(),
445+
workerTriggerRunning.getData().namespace(),
446+
workerTriggerRunning.getData().flowId(),
447+
workerTriggerRunning.getTrigger().getId()
448+
);
449+
workerJobRunningStateStore.deleteByKey(txContext, workerTriggerRunning.uid());
450+
triggerEventQueue.send(new TriggerWorkerLost(triggerId, workerTriggerRunning.getWorkerInstance().uid()));
451+
Logs.logTrigger(
452+
triggerId,
453+
Level.WARN,
454+
"Worker '{}' lost, notifying the scheduler.",
455+
workerTriggerRunning.getWorkerInstance().uid()
456+
);
463457
}
464458
}

executor/src/test/java/io/kestra/executor/AbstractServiceLivenessCoordinatorTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
import io.kestra.core.runners.WorkerTrigger;
4242
import io.kestra.core.runners.WorkerTriggerData;
4343
import io.kestra.core.worker.models.WorkerTriggerResult;
44-
import io.kestra.core.scheduler.events.TriggerEvaluated;
4544
import io.kestra.core.scheduler.events.TriggerEvent;
4645
import io.kestra.core.scheduler.events.TriggerReceived;
46+
import io.kestra.core.scheduler.events.TriggerWorkerLost;
4747
import io.kestra.core.scheduler.model.TriggerState;
4848
import io.kestra.core.server.ServerConfig;
4949
import io.kestra.core.server.ServiceStateChangeEvent;
@@ -218,15 +218,15 @@ void shouldNotResubmitTaskForIgnoredExecution() throws Exception {
218218

219219
@ParameterizedTest
220220
@ValueSource(strings = { WORKER_QUEUE_UID, "<null>" })
221-
public void shouldResubmitTriggerWhenWorkerIsStopped(String workerQueueId) throws Exception {
221+
public void shouldNotifyTriggerWorkerLostWhenWorkerIsStopped(String workerQueueId) throws Exception {
222222
workerQueueId = "<null>".equals(workerQueueId) ? null : workerQueueId;
223223
// Given - create first worker.
224224
WorkerAgent worker = (WorkerAgent) newWorker();
225225
worker.start(1);
226226

227227
WorkerTrigger workerTrigger = workerTrigger(Duration.ofSeconds(5), workerQueueId);
228228

229-
CountDownLatch evaluatedLatch = new CountDownLatch(1);
229+
CountDownLatch lostLatch = new CountDownLatch(1);
230230
CountDownLatch receivedLatch = new CountDownLatch(1);
231231
triggerEventQueue.addListener(event ->
232232
{
@@ -236,8 +236,8 @@ public void shouldResubmitTriggerWhenWorkerIsStopped(String workerQueueId) throw
236236
if (event instanceof TriggerReceived) {
237237
receivedLatch.countDown();
238238
}
239-
if (event instanceof TriggerEvaluated) {
240-
evaluatedLatch.countDown();
239+
if (event instanceof TriggerWorkerLost) {
240+
lostLatch.countDown();
241241
}
242242
});
243243

@@ -250,8 +250,8 @@ public void shouldResubmitTriggerWhenWorkerIsStopped(String workerQueueId) throw
250250
WorkerAgent newWorker = (WorkerAgent) newWorker();
251251
newWorker.start(1);
252252

253-
// THEN
254-
assertThat(evaluatedLatch.await(30, TimeUnit.SECONDS)).isTrue();
253+
// THEN - the scheduler is notified instead of the job being re-emitted to a worker.
254+
assertThat(lostLatch.await(30, TimeUnit.SECONDS)).isTrue();
255255
newWorker.close();
256256
}
257257

scheduler/src/main/java/io/kestra/scheduler/TriggerEventHandler.java

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.kestra.core.models.triggers.Backfill;
2626
import io.kestra.core.models.triggers.PollingTriggerInterface;
2727
import io.kestra.core.models.triggers.TriggerContext;
28+
import io.kestra.core.models.triggers.TriggerId;
2829
import io.kestra.core.queues.BroadcastQueueInterface;
2930
import io.kestra.core.queues.QueueException;
3031
import io.kestra.core.runners.RunContext;
@@ -42,6 +43,7 @@
4243
import io.kestra.core.scheduler.events.TriggerFlowRevisionUpdated;
4344
import io.kestra.core.scheduler.events.TriggerReceived;
4445
import io.kestra.core.scheduler.events.TriggerUpdated;
46+
import io.kestra.core.scheduler.events.TriggerWorkerLost;
4547
import io.kestra.core.scheduler.model.TriggerState;
4648
import io.kestra.core.scheduler.model.TriggerType;
4749
import io.kestra.core.scheduler.service.TriggerExecutionPublisher;
@@ -121,6 +123,7 @@ private void doHandle(Clock clock, Integer vNode, TriggerEvent event) {
121123
case TriggerExecutionTerminated evt -> onTriggerExecutionTerminated(clock, evt);
122124
case TriggerEvaluated evt -> onTriggerEvaluated(clock, evt);
123125
case TriggerReceived evt -> onTriggerReceived(clock, evt);
126+
case TriggerWorkerLost evt -> onTriggerWorkerLost(clock, evt);
124127
// Commands
125128
case CreateBackfillTrigger evt -> onCreateBackfill(clock, evt);
126129
case SetPauseBackfillTrigger evt -> onSetPauseBackfillTrigger(clock, evt);
@@ -317,17 +320,55 @@ void onTriggerEvaluated(Clock clock, TriggerEvaluated event) {
317320
}
318321

319322
/**
320-
* Handler method for {@link ResetTrigger}.
323+
* Handler method for {@link TriggerReceived}.
321324
*
322325
* @param event the event.
323326
*/
324327
void onTriggerReceived(Clock clock, TriggerReceived event) {
328+
Optional<TriggerState> maybeState = findTriggerState(event);
329+
if (maybeState.isEmpty()) {
330+
// The trigger was deleted while its worker job was in flight: kill the instance
331+
// the worker just started. Only do so when the state is truly missing, not when
332+
// the event was de-duplicated.
333+
if (triggerStateStore.findById(event.id()).isEmpty()) {
334+
sendExecutionKilled(event.id());
335+
}
336+
return;
337+
}
338+
TriggerState state = maybeState.get();
339+
// The trigger was disabled while its worker job was in flight: the kill broadcast
340+
// found no holder at that time, so kill the instance now that a worker reports it.
341+
if (state.isDisabled()) {
342+
maySendExecutionKilled(state);
343+
}
344+
triggerStateStore.save(
345+
state
346+
.lastEventId(clock, event.eventId())
347+
.workerId(clock, event.workerId())
348+
);
349+
}
350+
351+
/**
352+
* Handler method for {@link TriggerWorkerLost}.
353+
* <p>
354+
* The worker holding the trigger is gone without reporting a result: release the lock so an
355+
* eligible trigger is resubmitted from the current flow definition, while a disabled one stays off.
356+
*
357+
* @param event the event.
358+
*/
359+
void onTriggerWorkerLost(Clock clock, TriggerWorkerLost event) {
325360
findTriggerState(event).ifPresent(state ->
326361
{
327-
state = state
328-
.lastEventId(clock, event.eventId())
329-
.workerId(clock, event.workerId());
330-
triggerStateStore.save(state);
362+
if (state.getWorkerId() != null && !state.getWorkerId().equals(event.workerUid())) {
363+
// The trigger is already held by another worker.
364+
return;
365+
}
366+
triggerStateStore.save(
367+
state
368+
.lastEventId(clock, event.eventId())
369+
.locked(clock, false)
370+
.workerId(clock, null)
371+
);
331372
});
332373
}
333374

@@ -404,22 +445,26 @@ void onTriggerDeleted(TriggerDeleted event) {
404445
*/
405446
private void maySendExecutionKilled(TriggerState state) {
406447
if (TriggerType.REALTIME.equals(state.getType())) {
407-
try {
408-
this.executionKilledQueue.emit(
409-
ExecutionKilledTrigger
410-
.builder()
411-
// Trigger kills are not processed by the Executor: emit them directly in the
412-
// EXECUTED state, the only state forwarded to the workers.
413-
.state(ExecutionKilled.State.EXECUTED)
414-
.tenantId(state.getTenantId())
415-
.namespace(state.getNamespace())
416-
.flowId(state.getFlowId())
417-
.triggerId(state.getTriggerId())
418-
.build()
419-
);
420-
} catch (QueueException e) {
421-
Logs.logTrigger(state, Level.WARN, "Cannot kill a real-time trigger, it will continue processing until Kestra is restarted. Cause: {}", e.getMessage(), e);
422-
}
448+
sendExecutionKilled(state);
449+
}
450+
}
451+
452+
private void sendExecutionKilled(TriggerId id) {
453+
try {
454+
this.executionKilledQueue.emit(
455+
ExecutionKilledTrigger
456+
.builder()
457+
// Trigger kills are not processed by the Executor: emit them directly in the
458+
// EXECUTED state, the only state forwarded to the workers.
459+
.state(ExecutionKilled.State.EXECUTED)
460+
.tenantId(id.getTenantId())
461+
.namespace(id.getNamespace())
462+
.flowId(id.getFlowId())
463+
.triggerId(id.getTriggerId())
464+
.build()
465+
);
466+
} catch (QueueException e) {
467+
Logs.logTrigger(id, Level.WARN, "Cannot kill a real-time trigger, it will continue processing until Kestra is restarted. Cause: {}", e.getMessage(), e);
423468
}
424469
}
425470

0 commit comments

Comments
 (0)