fix(core): close emit-before-subscribe race in ExecutionStreamingService#16651
Open
AcevedoR wants to merge 2 commits into
Open
fix(core): close emit-before-subscribe race in ExecutionStreamingService#16651AcevedoR wants to merge 2 commits into
AcevedoR wants to merge 2 commits into
Conversation
When the queue was already running (other subscribers present), a FollowExecutionEvent for execution X could be consumed by the queue consumer before registerSubscriber() had registered X's subscriber. The event was silently dropped and Flux.last() in TestSuiteService (and any other caller) would hang forever, causing the HTTP response to never be sent and the test to time out. The existing synchronized(subscriberLock) + pause/resume pattern only protects the case where the queue is paused (no subscribers at all). It does not protect when the queue is already running. Fix: after registering the subscriber, check the execution repository. If the execution is already in a terminal state, deliver the "end" event directly. FluxSink is thread-safe and complete() is idempotent, so a concurrent delivery from the queue consumer is harmless. Affects MysqlTestSuiteControllerTest and any JDBC-backed test where the return-flow executes before the reactive subscriber is registered.
Contributor
🐋 Docker imagedocker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ghcr.io/kestra-io/kestra-pr:16651 server local🧪 Java Unit Tests
|
Contributor
Tests report quick summary:success ✅ > tests: 6859, success: 6839, skipped: 20, failed: 0 (🔄 6859 executed, 📦 0 from cache) unfold for details
Develocity build scan: https://develocity.kestra.io/s/t5bjx5r3tjog4 Flaky tests report quick summary:failed ❌ > tests: 14, success: 13, skipped: 0, failed: 1
Failed tests:webserver > io.kestra.webserver.controllers.api.MiscControllerTest > canTriggerAWebhookWithoutBasicAuth() failed ❌ in 30.680 |
…or signature WorkerJobFetcher gained a GrpcConfiguration parameter but WorkerJobFetcherBackoffTest was not updated, causing a compile error that blocked CI. Add a mock for the new parameter so the test compiles and passes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
FollowExecutionEventqueue was already running (other subscribers present), a terminal event for execution X could be consumed beforeregisterSubscriber()was called for X — silently dropping the event and leavingFlux.last()hanging foreversynchronized(subscriberLock)+ pause/resume pattern only handles the case where the queue is paused (no subscribers at all); it does not protect when the queue is already running"end"event immediately to recover from the missed eventFluxSinkis thread-safe andcomplete()is idempotent, so concurrent delivery from the queue consumer is harmlessTest plan
MysqlTestSuiteControllerTest.run_shouldBePersisted— previously intermittently got HTTP 404 because the result was never persisted (reactive chain hung); should now complete reliablyMysqlTestSuiteControllerTest.run— previously intermittently returnedtotal=1instead of2; both rows should now be presentWebhookService,ExecutionController,McpToolServicecallers — all use the sameregisterSubscribersignature with no change needed