Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1
```
</details>

### Reset offsets and start a backfill supervisor

Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range.

This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset.

The following requirements must be met before calling this endpoint:

- The supervisor must be a `SeekableStreamSupervisor`.
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks.
- The supervisor must be in a `RUNNING` state so that it can query the latest offsets from the stream.

The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only.

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetOffsetsAndBackfill`

#### Query parameters

| Parameter | Type | Description | Default |
|---------|---------|---------|---------|
| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. If not specified, inherits `taskCount` from the source supervisor. | None |

#### Responses

<Tabs>

<TabItem value="5" label="200 SUCCESS">


*Successfully reset and started backfill supervisor*

</TabItem>
<TabItem value="6" label="400 BAD REQUEST">


*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)*

</TabItem>
<TabItem value="7" label="404 NOT FOUND">


*Invalid supervisor ID*

</TabItem>
<TabItem value="8" label="500 SERVER ERROR">


*Failed to retrieve stream offsets or serialize the backfill spec*

</TabItem>
</Tabs>

---

#### Sample request

The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks.

<Tabs>

<TabItem value="9" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2"
```

</TabItem>
<TabItem value="10" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2 HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"backfillSupervisorId": "social_media_backfill_abcdefgh"
}
```
</details>

### Terminate a supervisor

Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, Long> currentOffs
}

@Override
protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -408,7 +408,7 @@ public LagStats computeLagStats()
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();

Expand All @@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream()
}

@Override
protected Map<String, Long> getLatestSequencesFromStream()
public Map<String, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPa
}

@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -548,7 +548,7 @@ private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(S
* </p>
*/
@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
Expand Down Expand Up @@ -597,7 +597,7 @@ private void updateOffsetSnapshot(
}

@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
public Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
Expand Down Expand Up @@ -630,7 +630,7 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
Expand All @@ -640,7 +640,7 @@ protected boolean isMultiTopic()
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
public Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOf
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
public SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
Expand All @@ -336,7 +336,7 @@ protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean i
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Kinesis cannot provide backfill end offsets

The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR

{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
Expand Down
Loading
Loading