Skip to content

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477

Open
aho135 wants to merge 4 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream
Open

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477
aho135 wants to merge 4 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream

Conversation

@aho135
Copy link
Copy Markdown
Contributor

@aho135 aho135 commented May 18, 2026

This change adds an endpoint called resetOffsetsAndBackfill to SupervisorResource. This is a useful feature for operating Druid clusters where the most recent data is the most important (such as alerting use cases).

Description

Adds an endpoint called resetOffsetsAndBackfill to automatically ingest skipped data in the case where the offset is reset to latest. This requires useEarliestOffset=false, useConcurrentLocks=true because there can be conflicting time intervals between the backfill task and the main supervisor tasks and the Supervisor needs to be in a running state in order to call updatePartitionLagFromStream() to get the latest offsets

Release note

Adds an endpoint called resetOffsetsAndBackfill for Stream Supervisors. This is a useful feature for operating Druid clusters where the most recent data is the most important


Key changed/added classes in this PR
  • SupervisorResource
  • SupervisorManager

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@aho135 aho135 mentioned this pull request May 18, 2026
9 tasks
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3
Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3

Reviewed 11 of 11 changed files.


This is an automated review by Codex GPT-5.5

endOffsets
);

streamSupervisor.resetOffsets(resetMetadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Reset can skip data before backfill is guaranteed

resetOffsets only enqueues a ResetOffsetsNotice; it does not synchronously update metadata. This code queues the main supervisor reset before the bounded config is fully built and before createOrUpdateAndStartSupervisor succeeds. If any later step fails, the queued reset can still advance the live supervisor to latest offsets with no backfill supervisor, losing the skipped range this endpoint is meant to preserve.


@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Kinesis cannot provide backfill end offsets

The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR


// Verify useConcurrentLocks is enabled
final Map<String, Object> context = streamSpec.getContext();
if (context == null || !Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Concurrent-lock check rejects valid true contexts

This check only accepts a literal Boolean true under the hard-coded key. Other Druid paths in this class parse Tasks.USE_CONCURRENT_LOCKS with QueryContexts.getAsBoolean, which accepts values like string true. Supervisors whose tasks actually use concurrent locks can therefore be rejected by this endpoint.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants