Skip to content

Fix inconsistent ES8 workflow state refresh handling#1012

Open
rajeshwar-nu wants to merge 7 commits into
conductor-oss:mainfrom
steeleye:fix/inconsistent-es8-wf-state
Open

Fix inconsistent ES8 workflow state refresh handling#1012
rajeshwar-nu wants to merge 7 commits into
conductor-oss:mainfrom
steeleye:fix/inconsistent-es8-wf-state

Conversation

@rajeshwar-nu

@rajeshwar-nu rajeshwar-nu commented Apr 14, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR fixes ES8 indexing consistency in two separate layers:

  1. Routes ES8 workflow/task writes through the ILM-aware write path used by the ES8 backend.
  2. Adds an opt-in ES8 setting, conductor.elasticsearch.refreshOnWrite, for deployments that require immediate search visibility after every ES8 write/delete.

By default, ES8 remains async/bulk-oriented. Forced refresh is only used when refreshOnWrite=true.

Issue

ES8 workflow writes had diverged from other ES8 write paths. They did not consistently go through the helper that resolves the concrete ILM index/write index before delegating to the shared indexing path. This could leave workflow indexing behavior different from task, message, event, and log indexing behavior.

Separately, some deployments need read-after-write search visibility for workflow/task state. waitForIndexRefresh is not equivalent to that requirement because it maps to Elasticsearch refresh=wait_for, which waits for the next refresh cycle. It does not force an immediate refresh. Lowering index.refresh_interval can shorten the wait, but it still cannot mean "refresh after every write".

In practice, these gaps could leave Elasticsearch temporarily showing stale workflow/task state even after the execution store had already been updated.

Why a fix is needed

Conductor relies on indexed workflow/task state for search, archival, and operational visibility. When ES8 mutations are not handled consistently:

  • recently updated workflow/task state may not be searchable immediately when an operator requires synchronous visibility
  • older indexed state can linger after newer execution-state writes
  • ES8 write-path behavior differs across workflow/task/log/message/event documents

Fix

This change:

  • routes ES8 workflow and task writes through indexDocumentWithIlmFallback(...), preserving ILM alias/index resolution before delegating to the shared indexing path
  • keeps the default ES8 path async/bulk-oriented when no refresh policy is requested
  • adds conductor.elasticsearch.refreshOnWrite=false as an ES8-only opt-in
  • when refreshOnWrite=true, applies Elasticsearch refresh=true to ES8 index/update/delete/bulk writes so the affected shard is refreshed before the operation returns
  • preserves waitForIndexRefresh semantics as refresh=wait_for
  • prevents stale workflow/task writes from overwriting newer indexed state by comparing updateTime

Follow-up

Sweeper index/execution-store reconciliation was removed from this PR based on review feedback. That logic will be proposed separately as backend-specific reconciliation capability, with a no-op default and an ES8-specific implementation so non-ES8 backends do not pay additional read overhead.

Tests

  • added ES8 property tests for the new refreshOnWrite option
  • added ES8 DAO tests covering forced refresh behavior and stale-write protection for workflows, tasks, logs, messages, and event executions
  • retained focused core sweeper coverage after removing reconciliation logic from this PR

Verification

  • ./gradlew spotlessApply
  • ./gradlew :conductor-core:test --tests org.conductoross.conductor.core.execution.WorkflowSweeperTest

Previous verification from this branch:

  • ./gradlew :conductor-es8-persistence:test --tests org.conductoross.conductor.es8.config.ElasticSearchPropertiesTest --tests org.conductoross.conductor.es8.dao.index.TestElasticSearchRestDAOV8

Note: the ES8 integration suite is guarded by Testcontainers and was skipped in this environment rather than failing.

}

try {
String indexedStatus = indexDAO.get(workflow.getWorkflowId(), "status");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will add additional reads making the sweeper slower. This might be better suited inside ES implementation.

@rajeshwar-nu

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v1r3n @nthmost-orkes agreed. I removed the sweeper reconciliation logic from this PR and pushed the branch update.

This PR is now scoped back to ES8 write-path correctness plus optional refreshOnWrite behavior.

For reconciliation, I will raise a separate PR that adds backend-specific reconciliation capability instead of putting the ES read in the core sweeper hot path. The shape I have in mind is a no-op default capability for non-ES8 backends, with the actual reconciliation moved into an ES8-specific implementation. That keeps non-ES8 backends from paying additional read overhead while still allowing ES8 deployments to opt into index/execution-store drift repair.

@nthmost-orkes

Copy link
Copy Markdown
Contributor

Thanks for the detailed work here — the analysis of the inconsistent refresh behavior is correct, and the problem is real. Closing this in favor of a different approach, but want to flag one piece worth preserving.

Why we're going a different direction

PR #823 is fixing the same root cause across ES7, OS v2, and OS v3 by routing indexWorkflow() through the existing async bulk path instead of per-document synchronous requests. That's the right fix — and it turns out ES8 has exactly the same gap and the same infrastructure to close it (Es8BulkIngestionSupport.indexObject() is already used by indexTask(), addMessage(), and addEventExecution(); indexWorkflow() is the only outlier).

Adding a refreshOnWrite forced-sync option moves ES8 in the opposite direction from where we want to go across all backends: async/bulk by default, with waitForIndexRefresh available for the rare case where you need synchronous visibility. Merging both would give ES7 and ES8 opposite semantics with no good reason.

The sweeper reconciliation logic is worth saving

The reconcileIndexedStateIfNeeded logic added to WorkflowSweeper is a genuinely useful defensive measure — detecting and recovering from index/execution-store drift is a good idea regardless of which refresh strategy is in play. The concern with it living in this PR is that it adds an ES read on the sweeper hot path for every in-flight workflow, which could create latency feedback under load.

If you're interested in contributing that as a standalone PR with the ES read gated behind a config flag (off by default), that would be a solid addition. Something like conductor.sweeper.indexReconciliationEnabled=false so operators can opt in only if they're seeing drift.

Closing this now — appreciate the effort you put into it.

@rajeshwar-nu

Copy link
Copy Markdown
Contributor Author

@nthmost-orkes thanks, agree with the concern that indexWorkflow() should use the shared indexing path instead of bypassing it.

One ES8-specific clarification: PR #823 addresses this for ES7 and OS v2/v3 by routing indexWorkflow() through indexObject(...). For ES8, the corresponding fix is different. indexWorkflow() should go through indexDocumentWithIlmFallback(...), because ES8 needs ILM-aware alias/concrete-index resolution before it delegates to indexObject(...).

This PR already makes that ES8-specific change for workflow/task writes. So I do not think #823 needs to address ES8 here unless it is expanded beyond its current ES7/OS v2/OS v3 scope.

Separate from that, refreshOnWrite is an opt-in ES8 behavior change. It is not meant to replace the default async/bulk direction from #823. Default ES8 behavior remains async/bulk-oriented when no refresh policy is requested.

The problem this flag is trying to solve is narrower than general indexing throughput. Some ES8 deployments require read-after-write search visibility for workflow/task state. Today waitForIndexRefresh maps to Elasticsearch refresh=wait_for, which waits for the next refresh cycle. It does not force immediate refresh. index.refresh_interval can shorten that wait, but it still cannot mean “refresh after every write”.

refreshOnWrite=true makes that requirement explicit: ES8 write/delete operations use Elasticsearch refresh=true, so the affected shard is refreshed before the operation returns. This gives operators a consistent, opt-in force-refresh mode without changing the default async/bulk behavior for everyone else.

I updated the PR description to make this separation clearer. Could you please reopen this PR for review with that scope in mind?

@nthmost-orkes nthmost-orkes reopened this May 18, 2026
@rajeshwar-nu rajeshwar-nu requested a review from v1r3n May 21, 2026 06:38
@rajeshwar-nu

Copy link
Copy Markdown
Contributor Author

Hi @v1r3n , @nthmost-orkes , can I please get some reviews on this? Much appreciated 🙏🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants