Skip to content

feat(worker): add WORKER_DISCONNECT_REQUESTED event and dispatcher eviction#16883

Open
fhussonnois wants to merge 1 commit into
developfrom
feat/worker-token-revocation-eviction
Open

feat(worker): add WORKER_DISCONNECT_REQUESTED event and dispatcher eviction#16883
fhussonnois wants to merge 1 commit into
developfrom
feat/worker-token-revocation-eviction

Conversation

@fhussonnois

Copy link
Copy Markdown
Member

Add a WORKER_DISCONNECT_REQUESTED cluster event and WorkerJobDispatcher#evictWorker(workerId), which unregisters a worker and closes its stream. The dispatcher handles the event by evicting the named worker (no-op if it is not connected here); the worker must reconnect and re-authenticate.

@github-project-automation github-project-automation Bot moved this to To review in Pull Requests Jun 17, 2026
@fhussonnois fhussonnois force-pushed the feat/worker-token-revocation-eviction branch from 71756d1 to 0a773ee Compare June 17, 2026 16:36
@fhussonnois fhussonnois requested a review from ammeek June 17, 2026 16:37
…iction

Add a WORKER_DISCONNECT_REQUESTED cluster event and
WorkerJobDispatcher#evictWorker(workerId), which unregisters a worker and closes
its stream. The dispatcher handles the event by evicting the named worker (no-op
if it is not connected here); the worker must reconnect and re-authenticate.
@fhussonnois fhussonnois force-pushed the feat/worker-token-revocation-eviction branch from 0a773ee to 427a1d0 Compare June 17, 2026 16:38
@github-actions

github-actions Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

📄 OpenAPI Spec Changes

Spec generated with EE branch feat/worker-token-revocation-eviction. Diff vs client-sdk:

3951a3952,3989
>   /api/v1/{tenant}/apps/preview/dispatch/{dispatch}:
>     post:
>       tags:
>       - Apps
>       summary: Dispatch for the given app source (preview).
>       operationId: previewDispatchApp
>       parameters:
>       - name: dispatch
>         in: path
>         description: The ID to dispatch
>         required: true
>         schema:
>           type: string
>       - name: tenant
>         in: path
>         required: true
>         schema:
>           type: string
>       requestBody:
>         description: The app source (part '__kestra_app_source__') and the dispatch
>           data
>         content:
>           multipart/form-data:
>             schema:
>               type: array
>               items:
>                 type: string
>                 format: binary
>       responses:
>         "200":
>           description: previewDispatchApp 200 response
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/AppResponse"
>       security:
>       - bearerAuth: []
>       - basicAuth: []
6883c6921
<           description: deleteExecutionsByQuery 200 response
---
>           description: On success
6887c6925,6931
<                 type: object
---
>                 $ref: "#/components/schemas/BulkResponse"
>         "422":
>           description: Deleted with errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
6937c6981
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
6977a7022,7027
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
6983c7033
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7153c7203
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7186a7237,7242
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7192c7248
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7235c7291
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7262a7319,7330
>         "202":
>           description: Accepted
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7268c7336
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7309c7377
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7351a7420,7425
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7357c7431
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7488c7562
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7521a7596,7601
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7527c7607
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7578c7658
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7620a7701,7706
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7626c7712
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7669c7755
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7702a7789,7794
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7708c7800
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7751c7843
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7784a7877,7882
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7790c7888
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7913c8011
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
7954a8053,8058
>         "400":
>           description: Validation errors
>           content:
>             application/json:
>               schema:
>                 $ref: "#/components/schemas/BulkErrorResponse"
7960c8064
<                 type: object
---
>                 $ref: "#/components/schemas/ApiAsyncOperationResponse"
9444c9548
<           description: deleteFlowsByIds 200 response
---
>           description: On success
9476c9580
<           description: deleteFlowsByQuery 200 response
---
>           description: On success
9538c9642
<           description: disableFlowsByIds 200 response
---
>           description: On success
9570c9674
<           description: disableFlowsByQuery 200 response
---
>           description: On success
9632c9736
<           description: enableFlowsByIds 200 response
---
>           description: On success
9664c9768
<           description: enableFlowsByQuery 200 response
---
>           description: On success
14245c14349
<           description: getTaskRunOutputs 200 response
---
>           description: The task run outputs as a map of output names to their values
14250,14251c14354
<                 additionalProperties:
<                   type: object
---
>                 additionalProperties: true
16712c16815
<           description: If the trigger is already unlocked
---
>           description: If the trigger is already unlocked or is a realtime trigger
17113a17217,17218
>           description: The operation identifier used to correlate logs and progress
>             indicators
17115a17221
>           description: The number of domain events submitted for asynchronous processing
17837a17944,17945
>         kind:
>           $ref: "#/components/schemas/TriggerType"
18733a18842
>           description: The error message
18735a18845
>           description: The list of items that failed validation
18740a18851
>           description: The number of items successfully processed
24211a24323,24328
>     TriggerType:
>       type: string
>       enum:
>       - SCHEDULE
>       - POLLING
>       - REALTIME

🐋 Docker image

ghcr.io/kestra-io/kestra-pr:16883
docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ghcr.io/kestra-io/kestra-pr:16883 server local

🧪 Java Unit Tests

TestsPassed ☑️Skipped ⚠️Failed ❌️Time ⏱
Java Tests Report6977 ran6956 ✅20 ⚠️1 ❌51m 12s 875ms
TestResultTime ⏱
Java Tests Report
DefaultSchedulerTest.shouldStopAndRestartSchedulingLoopWhenEnteringAndExitingMaintenanceMode()❌ failure6s 214ms

@github-actions

github-actions Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Tests report quick summary:

failed ❌ > tests: 6977, success: 6956, skipped: 20, failed: 1 (🔄 6977 executed, 📦 0 from cache)

Project Status Success Skipped Failed
cli success ✅ 🔄 55 1 0
core success ✅ 🔄 2703 6 0
executor success ✅ 🔄 70 3 0
jdbc success ✅ 🔄 30 6 0
jdbc-h2 success ✅ 🔄 914 1 0
jdbc-mysql success ✅ 🔄 905 2 0
jdbc-postgres success ✅ 🔄 919 0 0
processor success ✅ 🔄 7 0 0
queue success ✅ 🔄 39 0 0
runner-memory success ✅ 🔄 1 0 0
scheduler failed ❌ 🔄 98 0 1
script success ✅ 🔄 37 0 0
storage-local success ✅ 🔄 66 0 0
tests success ✅ 🔄 2 0 0
webserver success ✅ 🔄 907 1 0
worker success ✅ 🔄 65 0 0
worker-controller success ✅ 🔄 138 0 0

Failed tests:

scheduler > io.kestra.scheduler.DefaultSchedulerTest > shouldStopAndRestartSchedulingLoopWhenEnteringAndExitingMaintenanceMode() failed ❌ in 6.214
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.kestra.scheduler.DefaultSchedulerTest &#10;Expecting all elements of:&#10;  [io.kestra.scheduler.TriggerSchedulingLoop@7e3b43f5,&#10;    io.kestra.scheduler.TriggerSchedulingLoop@7b7007e7]&#10;to match given predicate but these elements did not:&#10;  [io.kestra.scheduler.TriggerSchedulingLoop@7e3b43f5,&#10;    io.kestra.scheduler.TriggerSchedulingLoop@7b7007e7] within 5 seconds.

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.kestra.scheduler.DefaultSchedulerTest 
Expecting all elements of:
  [io.kestra.scheduler.TriggerSchedulingLoop@7e3b43f5,
    io.kestra.scheduler.TriggerSchedulingLoop@7b7007e7]
to match given predicate but these elements did not:
  [io.kestra.scheduler.TriggerSchedulingLoop@7e3b43f5,
    io.kestra.scheduler.TriggerSchedulingLoop@7b7007e7] within 5 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1160)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
	at io.kestra.scheduler.DefaultSchedulerTest.shouldStopAndRestartSchedulingLoopWhenEnteringAndExitingMaintenanceMode(DefaultSchedulerTest.java:232)
	at java.base/java.lang.reflect.Method.invoke(Method.java:565)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:154)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:171)
	at io.micronaut.test.extensions.AbstractMicronautExtension$3.proceed(AbstractMicronautExtension.java:183)
	at io.micronaut.test.context.TestMethodInterceptor.interceptTest(TestMethodInterceptor.java:46)
	at io.micronaut.transaction.test.DefaultTestTransactionExecutionListener.lambda$interceptTest$0(DefaultTestTransactionExecutionListener.java:93)
	at io.micronaut.transaction.support.AbstractPropagatedStatusTransactionOperations.lambda$execute$2(AbstractPropagatedStatusTransactionOperations.java:68)
	at io.micronaut.transaction.TransactionCallback.apply(TransactionCallback.java:37)
	at io.micronaut.transaction.support.AbstractTransactionOperations.executeTransactional(AbstractTransactionOperations.java:339)
	at io.micronaut.transaction.support.AbstractTransactionOperations.executeWithNewTransaction(AbstractTransactionOperations.java:321)
	at io.micronaut.transaction.support.AbstractTransactionOperations.executeNew(AbstractTransactionOperations.java:235)
	at io.micronaut.transaction.support.AbstractTransactionOperations.doExecute(AbstractTransactionOperations.java:137)
	at io.micronaut.transaction.support.AbstractTransactionOperations.lambda$doExecute$0(AbstractTransactionOperations.java:122)
	at io.micronaut.data.connection.support.AbstractConnectionOperations.executeWithNewConnection(AbstractConnectionOperations.java:174)
	at io.micronaut.data.connection.support.AbstractConnectionOperations.execute(AbstractConnectionOperations.java:114)
	at io.micronaut.transaction.support.AbstractTransactionOperations.doExecute(AbstractTransactionOperations.java:120)
	at io.micronaut.transaction.support.AbstractPropagatedStatusTransactionOperations.execute(AbstractPropagatedStatusTransactionOperations.java:65)
	at io.micronaut.transaction.test.DefaultTestTransactionExecutionListener.interceptTest(DefaultTestTransactionExecutionListener.java:91)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:175)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:128)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:141)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
Caused by: java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:101)
	at org.awaitility.core.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:81)
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:103)
	... 28 more


Develocity build scan: https://develocity.kestra.io/s/po6j6yhfxnn6y


Flaky tests report quick summary:

success ✅ > tests: 13, success: 13, skipped: 0, failed: 0

unfold for details
Project Status Success Skipped Failed
cli success ✅ 2 0 0
jdbc-h2 success ✅ 1 0 0
jdbc-mysql success ✅ 2 0 0
jdbc-postgres success ✅ 1 0 0
script success ✅ 2 0 0
webserver success ✅ 5 0 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: To review

Development

Successfully merging this pull request may close these issues.

1 participant