Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1
```
</details>

### Reset offsets and start a backfill supervisor

Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range.

This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset.

The following requirements must be met before calling this endpoint:

- The supervisor must be a `SeekableStreamSupervisor`.
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks.
- The supervisor must be in a `RUNNING` state so that it can query the latest offsets from the stream.

The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only.

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetOffsetsAndBackfill`

#### Query parameters

| Parameter | Type | Description | Default |
|---------|---------|---------|---------|
| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. If not specified, inherits `taskCount` from the source supervisor. | None |

#### Responses

<Tabs>

<TabItem value="5" label="200 SUCCESS">


*Successfully reset and started backfill supervisor*

</TabItem>
<TabItem value="6" label="400 BAD REQUEST">


*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)*

</TabItem>
<TabItem value="7" label="404 NOT FOUND">


*Invalid supervisor ID*

</TabItem>
<TabItem value="8" label="500 SERVER ERROR">


*Failed to retrieve stream offsets or serialize the backfill spec*

</TabItem>
</Tabs>

---

#### Sample request

The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks.

<Tabs>

<TabItem value="9" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2"
```

</TabItem>
<TabItem value="10" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2 HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"backfillSupervisorId": "social_media_backfill_abcdefgh"
}
```
</details>

### Terminate a supervisor

Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, Long> currentOffs
}

@Override
protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -408,7 +408,7 @@ public LagStats computeLagStats()
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();

Expand All @@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream()
}

@Override
protected Map<String, Long> getLatestSequencesFromStream()
public Map<String, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPa
}

@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -548,7 +548,7 @@ private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(S
* </p>
*/
@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
Expand Down Expand Up @@ -597,7 +597,7 @@ private void updateOffsetSnapshot(
}

@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
public Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
Expand Down Expand Up @@ -630,7 +630,7 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
Expand All @@ -640,7 +640,7 @@ protected boolean isMultiTopic()
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
public Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOf
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
public SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
Expand All @@ -336,7 +336,7 @@ protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean i
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Kinesis cannot provide backfill end offsets

The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR

{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
Expand All @@ -35,8 +38,11 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
Expand Down Expand Up @@ -393,6 +399,117 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData
return true;
}

/**
* Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to
* process the skipped range from the previously checkpointed offsets up to the latest offsets.
*
* @param id supervisor ID
* @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec
* @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"}
* @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor},
* if {@code useEarliestSequenceNumber} is true,
* if {@code useConcurrentLocks} is not set to true in the supervisor context,
* or if the supervisor is not in a RUNNING state
* @throws IllegalStateException if the latest or checkpointed offsets cannot be retrieved,
* or if the backfill spec cannot be serialized
*/
public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable Integer backfillTaskCount)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");

Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
}
SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs;
SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs;

// Verify useEarliestOffset is false
if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true.");
}

// Verify useConcurrentLocks is enabled
final Map<String, Object> context = streamSpec.getContext();
if (context == null || !Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Concurrent-lock check rejects valid true contexts

This check only accepts a literal Boolean true under the hard-coded key. Other Druid paths in this class parse Tasks.USE_CONCURRENT_LOCKS with QueryContexts.getAsBoolean, which accepts values like string true. Supervisors whose tasks actually use concurrent locks can therefore be rejected by this endpoint.

throw new IAE(
"Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks"
);
}

// We need an active recordSupplier to query the latest offsets from the stream
if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) {
throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id);
}

log.info("Capturing latest offsets from stream for supervisor[%s]", id);
streamSupervisor.updatePartitionLagFromStream();
Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();

log.info("Capturing checkpointed offsets for supervisor[%s]", id);
Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();

// Validate that we successfully retrieved offsets
if (endOffsets == null || endOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id);
}
if (startOffsets == null || startOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id);
}

log.info("Resetting supervisor[%s] metadata to latest offsets", id);
DataSourceMetadata resetMetadata = streamSupervisor.createDataSourceMetaDataForReset(
streamSupervisor.getIoConfig().getStream(),
endOffsets
);

streamSupervisor.resetOffsets(resetMetadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Reset can skip data before backfill is guaranteed

resetOffsets only enqueues a ResetOffsetsNotice; it does not synchronously update metadata. This code queues the main supervisor reset before the bounded config is fully built and before createOrUpdateAndStartSupervisor succeeds. If any later step fails, the queued reset can still advance the live supervisor to latest offsets with no backfill supervisor, losing the skipped range this endpoint is meant to preserve.


// Reset autoscaler if present
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();
}

String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill");

try {
Map<String, Object> normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
Map<String, Object> normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
createOrUpdateAndStartSupervisor(backfillSpec);
}
catch (JsonProcessingException e) {
throw new ISE(e, "Failed to create backfill supervisor spec for supervisor[%s]", id);
}

log.info("Started backfill supervisor[%s] for supervisor[%s]", backfillSupervisorId, id);

return ImmutableMap.of(
"id", id,
"backfillSupervisorId", backfillSupervisorId
);
}

SupervisorSpec createBackfillSpec(
SeekableStreamSupervisorSpec sourceSpec,
String backfillSupervisorId,
BoundedStreamConfig boundedStreamConfig,
@Nullable Integer backfillTaskCount
) throws JsonProcessingException
{
ObjectNode specNode = jsonMapper.valueToTree(sourceSpec);
specNode.put("id", backfillSupervisorId);
ObjectNode ioConfigNode = (ObjectNode) specNode.path("spec").path("ioConfig");
ioConfigNode.set("boundedStreamConfig", jsonMapper.valueToTree(boundedStreamConfig));
if (backfillTaskCount != null) {
ioConfigNode.put("taskCount", backfillTaskCount);
}
return jsonMapper.treeToValue(specNode, SupervisorSpec.class);
}

public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,50 @@ private Response handleResetRequest(
);
}

@POST
@Path("/{id}/resetOffsetsAndBackfill")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response resetOffsetsAndBackfill(
@PathParam("id") final String id,
@QueryParam("backfillTaskCount") @Nullable final Integer backfillTaskCount
)
{
return handleResetAndBackfill(id, backfillTaskCount);
}

private Response handleResetAndBackfill(final String id, @Nullable final Integer backfillTaskCount)
{
if (backfillTaskCount != null && backfillTaskCount < 1) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", "backfillTaskCount must be a positive integer"))
.build();
}
return asLeaderWithSupervisorManager(
manager -> {
if (!manager.getSupervisorIds().contains(id)) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id)))
.build();
}
try {
Map<String, Object> result = manager.resetSupervisorAndBackfill(id, backfillTaskCount);
return Response.ok(result).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
catch (Exception e) {
return Response.serverError()
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
}
);
}

private Response asLeaderWithSupervisorManager(Function<SupervisorManager, Response> f)
{
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
Expand Down
Loading
Loading