Skip to content

Commit b633409

Browse files
committed
fix(worker): handle oversized gRPC payloads cleanly instead of hanging
An oversized worker<->controller message left the execution stuck in RUNNING, in both directions. Now the task fails cleanly: - worker->controller: TaskRun.fail() no longer throws on an immutable attempts list, so the RESOURCE_EXHAUSTED fallback can resend the failed result. - controller->worker: the worker advertises its maxInboundMessageSize and the controller fails an oversized job before dispatch instead of sending a frame the worker can never receive.
1 parent 9069e14 commit b633409

7 files changed

Lines changed: 77 additions & 14 deletions

File tree

core/src/main/java/io/kestra/core/models/executions/TaskRun.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public TaskRun withStateAndAttempt(State.Type state) {
142142

143143
public TaskRun fail() {
144144
var attempt = TaskRunAttempt.builder().state(new State(State.Type.FAILED)).build();
145-
List<TaskRunAttempt> newAttempts = this.attempts == null ? new ArrayList<>(1) : this.attempts;
145+
// Copy defensively: this.attempts may be immutable (e.g. after deserialization), and fail()
146+
// must return a new TaskRun without mutating the caller's list in place.
147+
List<TaskRunAttempt> newAttempts = this.attempts == null ? new ArrayList<>(1) : new ArrayList<>(this.attempts);
146148
newAttempts.add(attempt);
147149

148150
return new TaskRun(

worker-controller/src/main/java/io/kestra/controller/grpc/services/GrpcWorkerControllerService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public void onNext(WorkerJobRequest request) {
129129
WorkerStreamContext<WorkerJobResponse> context = new WorkerStreamContext<>(
130130
workerId, workerGroupId, subscriptions, maxConcurrency, responseObserver, capacityPolicy
131131
);
132+
context.setMaxInboundMessageSize(connInfo.getMaxInboundMessageSize());
132133
contextRef.set(context);
133134

134135
// Register with dispatcher

worker-controller/src/main/java/io/kestra/controller/grpc/services/WorkerJobDispatcher.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,25 @@ private void dispatchJobToWorker(WorkerStreamContext<WorkerJobResponse> context,
866866

867867
// The permit and bucket slot were already atomically reserved in findAndReserveWorker.
868868

869+
WorkerJobResponse response = WorkerJobResponse.newBuilder()
870+
.setHeader(RequestOrResponseHeaderFactory.create(context.getWorkerId()))
871+
.addJobs(
872+
WorkerJobPayload.newBuilder()
873+
.setJobId(jobId)
874+
.setJobData(MessageFormats.JSON.toByteString(job))
875+
.build()
876+
)
877+
.build();
878+
879+
// Reject a payload the worker's gRPC channel could never receive: dispatching it would
880+
// trigger RESOURCE_EXHAUSTED on the worker stream, hot-loop reconnects, and hang the
881+
// execution in RUNNING forever. Fail the job cleanly instead.
882+
int workerLimit = context.getMaxInboundMessageSize();
883+
if (workerLimit > 0 && response.getSerializedSize() > workerLimit) {
884+
rejectOversizedJob(context, job, dispatchWorkerQueueId, bucket, response.getSerializedSize(), workerLimit);
885+
return;
886+
}
887+
869888
// 1. PERSIST before sending (critical for recovery)
870889
persistJobToStateStore(context, job, dispatchWorkerQueueId);
871890

@@ -874,16 +893,6 @@ private void dispatchJobToWorker(WorkerStreamContext<WorkerJobResponse> context,
874893

875894
// 3. Send to worker
876895
try {
877-
WorkerJobResponse response = WorkerJobResponse.newBuilder()
878-
.setHeader(RequestOrResponseHeaderFactory.create(context.getWorkerId()))
879-
.addJobs(
880-
WorkerJobPayload.newBuilder()
881-
.setJobId(jobId)
882-
.setJobData(MessageFormats.JSON.toByteString(job))
883-
.build()
884-
)
885-
.build();
886-
887896
context.sendResponse(response);
888897
log.debug("Dispatched job {} to worker {}", jobId, context.getWorkerId());
889898
metricRegistry.counter(
@@ -941,6 +950,38 @@ private void handleDispatchFailure(WorkerStreamContext<WorkerJobResponse> contex
941950
requeue(originalEvent);
942951
}
943952

953+
/**
954+
* Rejects a job whose serialized payload exceeds the worker's advertised gRPC inbound
955+
* limit. Releases the reserved permit and bucket and fails the job cleanly so the
956+
* execution terminates instead of hanging while the worker hot-loops reconnecting.
957+
*/
958+
private void rejectOversizedJob(WorkerStreamContext<WorkerJobResponse> context, WorkerJob job,
959+
String dispatchWorkerQueueId, String bucket, int payloadSize, int workerLimit) {
960+
log.error("Job {} payload ({} bytes) exceeds worker {} max inbound gRPC message size ({} bytes); failing the job instead of dispatching",
961+
job.uid(), payloadSize, context.getWorkerId(), workerLimit);
962+
963+
metricRegistry.counter(
964+
MetricRegistry.METRIC_CONTROLLER_JOB_DISPATCH_FAILED_TOTAL,
965+
MetricRegistry.METRIC_CONTROLLER_JOB_DISPATCH_FAILED_TOTAL_DESCRIPTION,
966+
metricRegistry.workerGroupAndQueueTags(context.getWorkerGroupId(), dispatchWorkerQueueId)
967+
).increment();
968+
969+
// Release the permit + bucket reserved in findAndReserveWorker (the job is not dispatched).
970+
context.addPermits(1);
971+
context.releaseBucket(bucket);
972+
973+
// Fail the job cleanly so the execution reaches a terminal state.
974+
if (job instanceof WorkerTask workerTask) {
975+
try {
976+
workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun().fail()));
977+
} catch (QueueException e) {
978+
log.error("Failed to emit FAILED result for oversized job {}: {}", job.uid(), e.getMessage(), e);
979+
}
980+
} else if (job instanceof WorkerTrigger workerTrigger) {
981+
triggerEventQueue.send(new TriggerEvaluated(workerTrigger.triggerId(), null));
982+
}
983+
}
984+
944985
/**
945986
* Re-queues a job event back to the queue.
946987
*/

worker-controller/src/main/java/io/kestra/controller/grpc/services/WorkerStreamContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public class WorkerStreamContext<T> {
4545
private final StreamObserver<T> responseObserver;
4646
private final WorkerCapacityPolicy capacityPolicy;
4747

48+
/** Worker's advertised gRPC max inbound message size (bytes); 0 = not advertised. */
49+
private volatile int maxInboundMessageSize;
50+
4851
/**
4952
* Number of jobs the worker has capacity to receive.
5053
* Decremented when a job is sent, incremented when worker sends permits.
@@ -308,6 +311,11 @@ public int getInFlightCount() {
308311
return inFlightJobs.size();
309312
}
310313

314+
/** Records the worker's advertised gRPC max inbound message size (bytes). */
315+
public void setMaxInboundMessageSize(int maxInboundMessageSize) {
316+
this.maxInboundMessageSize = maxInboundMessageSize;
317+
}
318+
311319
/**
312320
* Lock to serialize onNext() calls on the response stream observer.
313321
* gRPC's StreamObserver is not thread-safe for concurrent onNext() calls.

worker-controller/src/main/proto/worker_controller.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ message WorkerConnectionInfo {
4848

4949
// Maximum concurrent jobs this worker can handle (based on thread count)
5050
int32 maxConcurrency = 3;
51+
52+
// Worker's gRPC max inbound message size (bytes).
53+
int32 max_inbound_message_size = 4;
5154
}
5255

5356
// Controller -> Worker: Jobs to be executed

worker/src/main/java/io/kestra/worker/fetchers/WorkerJobFetcher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.google.protobuf.ByteString;
1414

1515
import io.kestra.controller.GrpcChannelManager;
16+
import io.kestra.controller.config.GrpcConfiguration;
1617
import io.kestra.controller.grpc.WorkerConnectionInfo;
1718
import io.kestra.controller.grpc.WorkerControllerServiceGrpc.WorkerControllerServiceStub;
1819
import io.kestra.controller.grpc.WorkerJobPayload;
@@ -92,6 +93,7 @@ public class WorkerJobFetcher extends WorkerLoop implements JobFetcher {
9293
private final GrpcChannelManager channelManager;
9394
private final WorkerQueueRegistry workerQueueRegistry;
9495
private final ExecutionKilledManager executionKilledManager;
96+
private final GrpcConfiguration grpcConfiguration;
9597
private final BroadcastQueueInterface<ClusterEvent> clusterEventQueue;
9698
private final List<WorkerMetadataChangeHandler> metadataChangeHandlers;
9799

@@ -172,12 +174,14 @@ public WorkerJobFetcher(final WorkerControllerServiceStub workerControllerServic
172174
final WorkerQueueRegistry workerQueueRegistry,
173175
final ExecutionKilledManager executionKilledManager,
174176
@Nullable @Named(WORKER_LOCAL_CLUSTER_EVENTS) final BroadcastQueueInterface<ClusterEvent> clusterEventQueue,
175-
final List<WorkerMetadataChangeHandler> metadataChangeHandlers) {
177+
final List<WorkerMetadataChangeHandler> metadataChangeHandlers,
178+
final GrpcConfiguration grpcConfiguration) {
176179
super(WorkerJobFetcher.class.getSimpleName());
177180
this.workerQueueRegistry = workerQueueRegistry;
178181
this.workerControllerServiceStub = workerControllerServiceStub;
179182
this.channelManager = channelManager;
180183
this.executionKilledManager = executionKilledManager;
184+
this.grpcConfiguration = grpcConfiguration;
181185
this.clusterEventQueue = clusterEventQueue;
182186
this.metadataChangeHandlers = metadataChangeHandlers;
183187
}
@@ -308,6 +312,7 @@ private void sendInitialRequest(ClientCallStreamObserver<WorkerJobRequest> reque
308312
.setWorkerId(workerContext.workerId())
309313
.setWorkerGroupId(workerGroupId)
310314
.setMaxConcurrency(maxConcurrency)
315+
.setMaxInboundMessageSize(grpcConfiguration.maxInboundMessageSize())
311316
.build()
312317
);
313318
addPendingCompletions(requestBuilder);

worker/src/test/java/io/kestra/worker/fetchers/WorkerJobFetcherMetadataChangeTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.kestra.worker.queues.WorkerQueueRegistry;
99
import io.kestra.worker.services.ExecutionKilledManager;
1010
import io.kestra.controller.GrpcChannelManager;
11+
import io.kestra.controller.config.GrpcConfiguration;
1112
import io.kestra.controller.grpc.WorkerControllerServiceGrpc.WorkerControllerServiceStub;
1213
import org.junit.jupiter.api.Test;
1314

@@ -29,7 +30,8 @@ private WorkerJobFetcher newFetcher(List<WorkerMetadataChangeHandler> handlers)
2930
mock(WorkerQueueRegistry.class),
3031
mock(ExecutionKilledManager.class),
3132
(BroadcastQueueInterface<ClusterEvent>) mock(BroadcastQueueInterface.class),
32-
handlers
33+
handlers,
34+
new GrpcConfiguration(false, 10485760)
3335
);
3436
}
3537

@@ -93,7 +95,8 @@ void shouldStillRouteKillEvents() {
9395
mock(WorkerQueueRegistry.class),
9496
killed,
9597
null,
96-
List.of(handler)
98+
List.of(handler),
99+
new GrpcConfiguration(false, 10485760)
97100
);
98101

99102
io.kestra.core.models.executions.ExecutionKilled k = io.kestra.core.models.executions.ExecutionKilledExecution.builder()

0 commit comments

Comments
 (0)