Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
dfda88c
Initial implementation for BoundedStreamConfig
aho135 Apr 23, 2026
4180ace
Implement isOffsetAtOrBeyond for Rabbit and Kinesis
aho135 Apr 23, 2026
8cb75f6
Unit test coverage
aho135 Apr 24, 2026
300ebe3
Fix BoundedStreamConfigTest
aho135 Apr 24, 2026
9af9729
Remove unused import
aho135 Apr 24, 2026
e0ffef6
Remove unneeded tests
aho135 Apr 24, 2026
162e1f3
Unit test fix
aho135 Apr 24, 2026
3ea2b0b
Fix import and add coverage for RabbitStreamSupervisor
aho135 Apr 24, 2026
8e3e81c
Test coverage for validateBoundedStreamConfig
aho135 Apr 24, 2026
4bed658
Re-initialize partition group and reset state after reset
aho135 Apr 30, 2026
c9181f0
Handle edge case where startOffset equals endOffset
aho135 Apr 30, 2026
9e85331
Compare Kinesis sequence numbers using BigInteger
aho135 Apr 30, 2026
9a32ce0
Remove stale test case
aho135 Apr 30, 2026
b04e907
Remove redundant validation of boundedStreamConfig
aho135 May 1, 2026
8e6dfb8
Throw DruidException with ADMIN persona for BoundedStreamConfig
aho135 May 1, 2026
f03abb6
Clean up unused Logger
aho135 May 1, 2026
ae9083f
javadoc and comment cleanup for isBoundedWorkComplete
aho135 May 1, 2026
f8a313b
Add embedded test for bounded ingestion
aho135 May 1, 2026
5965ac4
Add boundedStreamConfig to SeekableStreamDataSourceMetadata for metad…
aho135 May 2, 2026
9e66948
Revert pendingCompletionGroups check
aho135 May 2, 2026
902e118
Unit test fix
aho135 May 2, 2026
670749c
embedded-test for metadata mismatch
aho135 May 3, 2026
2457caf
Remove unused var
aho135 May 3, 2026
3943ad0
Unit test fix
aho135 May 3, 2026
395fa9a
Add boundedStreamConfig documentation
aho135 May 4, 2026
4cde39f
Fix spellcheck
aho135 May 4, 2026
7e86ec6
Increase code coverage
aho135 May 4, 2026
d985dc4
Increase coverage for BoundedStreamConfig
aho135 May 4, 2026
021e721
Remove unnecessary test
aho135 May 4, 2026
d23d9c4
Simplify completion check in createNewTasks
aho135 May 4, 2026
42ada89
Remove unused function
aho135 May 5, 2026
ed589c2
Unit test bounded supervisor completion
aho135 May 5, 2026
234bc82
Improve coverage on RabbitStreamSupervisor
aho135 May 5, 2026
1cd928d
Unit test coverage
aho135 May 5, 2026
094427d
Unit test for IllegalArgumentException for KafkaSupervisor
aho135 May 5, 2026
3ad278d
Check if end offsets are exclusive for bounded work completion
aho135 May 5, 2026
cf623b8
Increase branch coverage
aho135 May 5, 2026
a037ccb
Increase branch coverage
aho135 May 5, 2026
0e6466c
Unit test coverage
aho135 May 6, 2026
b1b1179
Fix import
aho135 May 6, 2026
126638f
Remove use of deprecated function
aho135 May 6, 2026
2ff9cfd
Revert to deprecated function since not initialized in mock object
aho135 May 6, 2026
2d42ab4
Merge branch 'master' into bounded-stream-supervisor
aho135 May 6, 2026
31c870e
Fix merge conflict
aho135 May 6, 2026
b68705a
Detect metadata mismatch when committed offset > bounded config end
aho135 May 7, 2026
6c33c8f
Clean up redundant tests in BoundedStreamConfigTest and use EqualsVer…
aho135 May 12, 2026
41a31a1
Compare Kinesis Sequence numbers using built in comparison
aho135 May 12, 2026
dd9bdd2
Clean up docs based on review comments
aho135 May 13, 2026
fd7981f
Early return before convert for hasTaskGroupReachedBoundedEnd
aho135 May 13, 2026
5bc87fb
Merge branch 'master' into bounded-stream-supervisor
aho135 May 13, 2026
cc13428
Resolve merge conflicts
aho135 May 13, 2026
c62b936
Fix KinesisSupervisorTest
aho135 May 13, 2026
35c4574
Update KinesisSupervisorTest.java
aho135 May 13, 2026
e0b5ef9
Cover case where start > end
aho135 May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi
|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No||
|`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle at any given time. If not set, Druid can cycle all tasks at the same time. If set to a value less than `taskCount`, your cluster needs fewer available slots to run the supervisor. You can save costs by scaling down your ingestion tier, but this can lead to slower cycle times and lag. See [`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value|
|`boundedStreamConfig`|Object|Configures the supervisor for bounded (one-time) ingestion with explicit start and end offsets. When set, the supervisor creates tasks that read from `startSequenceNumbers` to `endSequenceNumbers`, then automatically terminates when all data is ingested. The bounded configuration is stored with datasource metadata; if a supervisor is restarted or a new supervisor is created with different offsets for the same datasource, it will fail. To retry with different offsets, use the supervisor reset API to clear metadata or use a different datasource. Useful for backfills and historical reprocessing. See [Bounded stream configuration](#bounded-stream-configuration) for details.|No|null|
Comment thread
aho135 marked this conversation as resolved.
Outdated
|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total tasks - 10 tasks with priority 1 and 5 tasks with priority 0. If both `replicas` and `serverPriorityToReplicas` are set, the sum of replicas in `serverPriorityToReplicas` must equal `replicas`.|No|null|

#### Task autoscaler
Expand Down Expand Up @@ -251,6 +252,57 @@ Before you set `stopTaskCount`, note the following:
- The [task autoscaler](#task-autoscaler) ignores `stopTaskCount` when shutting down tasks in response to a task count change. The task autoscaler needs to redistribute partitions across tasks, which requires all tasks to be shut down.
- If you set `stopTaskCount` to a value less than `taskCount`, Druid cycles the longest running tasks first, then other tasks up to the value set.

#### Bounded stream configuration

Use `boundedStreamConfig` to configure one-time ingestion from a specific range of offsets. This is useful for backfilling historical data or reprocessing data with different configurations.

The `boundedStreamConfig` object contains the following properties:

|Property|Type|Description|Required|
|--------|----|-----------|--------|
|`startSequenceNumbers`|Object|Map of partition IDs to start offsets (inclusive for Kafka, inclusive for Kinesis).|Yes|
|`endSequenceNumbers`|Object|Map of partition IDs to end offsets (exclusive for Kafka, inclusive for Kinesis).|Yes|

When configured, the supervisor:
1. Creates tasks that start reading from `startSequenceNumbers`
2. Tasks automatically stop when they reach `endSequenceNumbers`
3. Supervisor does not create replacement tasks after tasks complete
4. Supervisor transitions to `COMPLETED` state and terminates when all tasks finish

**Metadata consistency:** The bounded configuration is stored in datasource metadata along with checkpointed offsets. If you restart the supervisor or create a new supervisor with a different `boundedStreamConfig` for the same datasource, the supervisor will fail with an error. To start a new bounded ingestion with different offsets, either:
- Use the [supervisor reset API](../api-reference/supervisor-api.md#reset-a-supervisor) to clear existing metadata
- Use a different datasource name
Comment thread
aho135 marked this conversation as resolved.
Outdated

**Example (Kafka):**

```json
{
"type": "kafka",
"spec": {
"ioConfig": {
"topic": "my-topic",
"inputFormat": {
"type": "json"
},
"boundedStreamConfig": {
"startSequenceNumbers": {
"0": 1000,
"1": 2000,
"2": 1500
},
"endSequenceNumbers": {
"0": 5000,
"1": 6000,
"2": 5500
}
}
}
}
}
```

This configuration ingests data from partition 0 offsets 1000-4999, partition 1 offsets 2000-5999, and partition 2 offsets 1500-5499.

### Tuning configuration

The `tuningConfig` object is optional. If you don't specify the `tuningConfig` object, Druid uses the default configuration settings.
Expand Down
Loading
Loading