diff --git a/.github/workflows/unit-and-integration-tests-unified.yml b/.github/workflows/unit-and-integration-tests-unified.yml
index b5860b494557..e960493dba31 100644
--- a/.github/workflows/unit-and-integration-tests-unified.yml
+++ b/.github/workflows/unit-and-integration-tests-unified.yml
@@ -64,4 +64,4 @@ jobs:
runs-on: ubuntu-latest
if: ${{ !cancelled() }}
steps:
- - uses: Kesin11/actions-timeline@54d513e0b5ff1158f1cf8321108d666a5a6c1fca
+ - uses: Kesin11/actions-timeline@44c9c178ffb2fb1d9859614a3ffa79ccfb77565e
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
index 6b36d72c672e..777dd297ecb9 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java
@@ -19,29 +19,87 @@
package org.apache.druid.benchmark;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.BuiltInTypesModule;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.jackson.AggregatorsModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.LoggingEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
-import org.apache.druid.query.Result;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
+import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
+import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
+import org.apache.druid.query.metadata.metadata.ListColumnIncluderator;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
+import org.apache.druid.segment.realtime.appenderator.TestAppenderatorConfig;
import org.apache.druid.segment.realtime.sink.Committers;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -59,8 +117,11 @@
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
+import java.net.URI;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@@ -71,6 +132,18 @@
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class SinkQuerySegmentWalkerBenchmark
{
+ private static final String DATASOURCE = "foo";
+ private static final List QUERY_COLUMNS = ImmutableList.of("__time", "dim", "count", "met");
+ private static final MultipleIntervalSegmentSpec QUERY_INTERVALS =
+ new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/2001")));
+ private static final String SET_PROCESSING_THREAD_NAMES = "setProcessingThreadNames";
+
+ @Param({"timeseries", "scan", "segmentMetadata", "groupBy"})
+ private String queryType;
+
+ @Param({"false", "true"})
+ private boolean setProcessingThreadNames;
+
@Param({"10", "50", "100", "200"})
private int numFireHydrants;
@@ -78,24 +151,66 @@ public class SinkQuerySegmentWalkerBenchmark
private final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "test", loggingEmitter);
private File cacheDir;
+ private ExecutorService queryExecutor;
private Appenderator appenderator;
+ private TestGroupByBuffers groupByBuffers;
@Setup(Level.Trial)
public void setup() throws Exception
{
final String userConfiguredCacheDir = System.getProperty("druid.benchmark.cacheDir", System.getenv("DRUID_BENCHMARK_CACHE_DIR"));
cacheDir = new File(userConfiguredCacheDir);
- final StreamAppenderatorTester tester =
- new StreamAppenderatorTester.Builder().maxRowsInMemory(1)
- .basePersistDirectory(cacheDir)
- .withServiceEmitter(serviceEmitter)
- .build();
+ FileUtils.deleteDirectory(cacheDir);
+ final ObjectMapper objectMapper = makeObjectMapper();
+ final IndexIO indexIO = new IndexIO(
+ objectMapper,
+ new ColumnConfig()
+ {
+ }
+ );
+ final IndexMergerV9 indexMerger = new IndexMergerV9(
+ objectMapper,
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
+ final DataSchema schema = makeDataSchema();
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ final AppenderatorConfig tuningConfig = makeTuningConfig();
+
+ queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
+ groupByBuffers = TestGroupByBuffers.createDefault();
- appenderator = tester.getAppenderator();
+ serviceEmitter.start();
+ EmittingLogger.registerEmitter(serviceEmitter);
+
+ final QueryRunnerFactoryConglomerate conglomerate = makeQueryRunnerFactoryConglomerate();
+ appenderator = Appenderators.createRealtime(
+ null,
+ schema.getDataSource(),
+ schema,
+ tuningConfig,
+ new SegmentGenerationMetrics(),
+ makeDataSegmentPusher(),
+ objectMapper,
+ indexIO,
+ indexMerger,
+ conglomerate,
+ new NoopDataSegmentAnnouncer(),
+ serviceEmitter,
+ new ForwardingQueryProcessingPool(queryExecutor),
+ MapCache.create(2048),
+ new CacheConfig(),
+ new CachePopulatorStats(),
+ NoopPolicyEnforcer.instance(),
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
+ CentralizedDatasourceSchemaConfig.create(),
+ interval -> {}
+ );
appenderator.startJob();
final SegmentIdWithShardSpec segmentIdWithShardSpec = new SegmentIdWithShardSpec(
- StreamAppenderatorTester.DATASOURCE,
+ DATASOURCE,
Intervals.of("2000/2001"),
"A",
new LinearShardSpec(0)
@@ -119,33 +234,214 @@ public void setup() throws Exception
@TearDown(Level.Trial)
public void tearDown() throws Exception
{
- appenderator.close();
- FileUtils.deleteDirectory(cacheDir);
+ try {
+ if (appenderator != null) {
+ appenderator.close();
+ }
+ }
+ finally {
+ if (queryExecutor != null) {
+ queryExecutor.shutdownNow();
+ }
+ try {
+ if (groupByBuffers != null) {
+ groupByBuffers.close();
+ }
+ }
+ finally {
+ FileUtils.deleteDirectory(cacheDir);
+ }
+ }
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
- public void emitSinkMetrics(Blackhole blackhole) throws Exception
+ public void runSinkQuery(Blackhole blackhole) throws Exception
{
- {
- final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
- .dataSource(StreamAppenderatorTester.DATASOURCE)
- .intervals(ImmutableList.of(Intervals.of("2000/2001")))
- .aggregators(
- Arrays.asList(
- new LongSumAggregatorFactory("count", "count"),
- new LongSumAggregatorFactory("met", "met")
- )
- )
- .granularity(Granularities.DAY)
- .build();
-
- final List> results =
- QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
- blackhole.consume(results);
-
- serviceEmitter.flush();
+ final Query> query = makeQuery();
+ final List> results = QueryPlus.wrap(query).run(appenderator, ResponseContext.createEmpty()).toList();
+ blackhole.consume(results);
+
+ serviceEmitter.flush();
+ }
+
+ private Query> makeQuery()
+ {
+ switch (queryType) {
+ case "timeseries":
+ return makeTimeseriesQuery();
+ case "scan":
+ return makeScanQuery();
+ case "segmentMetadata":
+ return makeSegmentMetadataQuery();
+ case "groupBy":
+ return makeGroupByQuery();
+ default:
+ throw new IllegalStateException("Unsupported query type[" + queryType + "]");
}
}
+
+ private QueryRunnerFactoryConglomerate makeQueryRunnerFactoryConglomerate()
+ {
+ return DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(
+ ImmutableMap., QueryRunnerFactory>builder()
+ .put(
+ TimeseriesQuery.class,
+ new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ ScanQuery.class,
+ new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
+ )
+ )
+ .put(
+ SegmentMetadataQuery.class,
+ new SegmentMetadataQueryRunnerFactory(
+ new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ )
+ .put(
+ GroupByQuery.class,
+ GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig(), groupByBuffers)
+ )
+ .build()
+ );
+ }
+
+ private TimeseriesQuery makeTimeseriesQuery()
+ {
+ return Druids.newTimeseriesQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .aggregators(makeAggregators())
+ .granularity(Granularities.DAY)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private ScanQuery makeScanQuery()
+ {
+ return Druids.newScanQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .columns(QUERY_COLUMNS)
+ .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private SegmentMetadataQuery makeSegmentMetadataQuery()
+ {
+ return Druids.newSegmentMetadataQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(QUERY_INTERVALS)
+ .toInclude(new ListColumnIncluderator(QUERY_COLUMNS))
+ .analysisTypes(
+ SegmentMetadataQuery.AnalysisType.CARDINALITY,
+ SegmentMetadataQuery.AnalysisType.SIZE,
+ SegmentMetadataQuery.AnalysisType.INTERVAL,
+ SegmentMetadataQuery.AnalysisType.MINMAX,
+ SegmentMetadataQuery.AnalysisType.AGGREGATORS
+ )
+ .merge(true)
+ .context(makeQueryContext())
+ .build();
+ }
+
+ private GroupByQuery makeGroupByQuery()
+ {
+ return GroupByQuery.builder()
+ .setDataSource(DATASOURCE)
+ .setInterval("2000/2001")
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(makeAggregators())
+ .setContext(makeQueryContext())
+ .build();
+ }
+
+ private List makeAggregators()
+ {
+ return Arrays.asList(
+ new LongSumAggregatorFactory("count", "count"),
+ new LongSumAggregatorFactory("met", "met")
+ );
+ }
+
+ private Map makeQueryContext()
+ {
+ return ImmutableMap.of(SET_PROCESSING_THREAD_NAMES, setProcessingThreadNames);
+ }
+
+ private static ObjectMapper makeObjectMapper()
+ {
+ final ObjectMapper objectMapper = new DefaultObjectMapper();
+ objectMapper.registerSubtypes(LinearShardSpec.class);
+ objectMapper.registerModules(new AggregatorsModule());
+ objectMapper.registerModules(new BuiltInTypesModule().getJacksonModules());
+ objectMapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
+ .addValue(ObjectMapper.class.getName(), objectMapper)
+ );
+ return objectMapper;
+ }
+
+ private static DataSchema makeDataSchema()
+ {
+ return DataSchema.builder()
+ .withDataSource(DATASOURCE)
+ .withTimestamp(new TimestampSpec("ts", "auto", null))
+ .withDimensions(DimensionsSpec.EMPTY)
+ .withAggregators(
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("met", "met")
+ )
+ .withGranularity(new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null))
+ .build();
+ }
+
+ private AppenderatorConfig makeTuningConfig()
+ {
+ return new TestAppenderatorConfig(
+ TuningConfig.DEFAULT_APPENDABLE_INDEX,
+ 1,
+ Runtime.getRuntime().totalMemory() / 3,
+ false,
+ IndexSpec.getDefault(),
+ 0,
+ false,
+ 0L,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
+ cacheDir,
+ false
+ );
+ }
+
+ private static DataSegmentPusher makeDataSegmentPusher()
+ {
+ return new DataSegmentPusher()
+ {
+ @Override
+ public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
+ {
+ return segment;
+ }
+
+ @Override
+ public Map makeLoadSpec(URI uri)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
diff --git a/docs/api-reference/service-status-api.md b/docs/api-reference/service-status-api.md
index 1ba8b55c4d78..1c192162d5d0 100644
--- a/docs/api-reference/service-status-api.md
+++ b/docs/api-reference/service-status-api.md
@@ -154,36 +154,6 @@ Host: http://ROUTER_IP:ROUTER_PORT
"name": "org.apache.druid.query.aggregation.datasketches.kll.KllSketchModule",
"artifact": "druid-datasketches",
"version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.MSQExternalDataSourceModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.MSQIndexingModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.MSQDurableStorageModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.MSQServiceClientModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.MSQSqlModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
- },
- {
- "name": "org.apache.druid.msq.guice.SqlTaskModule",
- "artifact": "druid-multi-stage-query",
- "version": "26.0.0"
}
],
"memory": {
@@ -326,7 +296,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"log4j.shutdownHookEnabled": "true",
"java.vm.vendor": "Homebrew",
"sun.arch.data.model": "64",
- "druid.extensions.loadList": "[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-datasketches\", \"druid-multi-stage-query\"]",
+ "druid.extensions.loadList": "[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-datasketches\"]",
"java.vendor.url": "https://github.com/Homebrew/homebrew-core/issues",
"druid.router.coordinatorServiceName": "druid/coordinator",
"user.timezone": "UTC",
diff --git a/docs/api-reference/sql-ingestion-api.md b/docs/api-reference/sql-ingestion-api.md
index 59942aff8e0c..9348291581e8 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -26,9 +26,8 @@ import TabItem from '@theme/TabItem';
-->
:::info
- This page describes SQL-based batch ingestion using the [`druid-multi-stage-query`](../multi-stage-query/index.md)
- extension, new in Druid 24.0. Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which
- ingestion method is right for you.
+ This page describes SQL-based batch ingestion using the [multi-stage query (MSQ) task engine](../multi-stage-query/index.md).
+ Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
:::
The **Query** view in the web console provides a friendly experience for the multi-stage query task engine (MSQ task engine) and multi-stage query architecture. We recommend using the web console if you don't need a programmatic interface.
@@ -847,4 +846,4 @@ The response shows the ID of the task that was canceled.
{
"task": "query-655efe33-781a-4c50-ae84-c2911b42d63c"
}
-```
\ No newline at end of file
+```
diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md
index d321af143020..8f9c5c36dc5c 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1
```
+### Reset offsets to latest and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store between when this endpoint captures the current offsets and when it applies the reset, that checkpoint can be overwritten, causing the main supervisor to re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without manually suspending the main supervisor before calling this endpoint and waiting for all pending tasks to complete.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md).
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.
+- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks.
+- The supervisor must be in a `RUNNING` state.
+
+The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only.
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill`
+
+#### Query parameters
+
+| Parameter | Type | Description | Default |
+|---------|---------|---------|---------|
+| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. | Defaults to `taskCount` from the source supervisor if not specified |
+
+#### Responses
+
+
+
+
+
+
+*Successfully reset and started backfill supervisor*
+
+
+
+
+
+*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)*
+
+
+
+
+
+*Invalid supervisor ID*
+
+
+
+
+
+*Failed to retrieve stream offsets or serialize the backfill spec*
+
+
+
+
+---
+
+#### Sample request
+
+The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks.
+
+
+
+
+
+
+```shell
+curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2"
+```
+
+
+
+
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2 HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+```
+
+
+
+
+#### Sample response
+
+
+ View the response
+
+ ```json
+{
+ "id": "social_media",
+ "backfillSupervisorId": "social_media_backfill_abcdefgh"
+}
+ ```
+
+
### Terminate a supervisor
Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart.
diff --git a/docs/configuration/extensions.md b/docs/configuration/extensions.md
index 6c802739fc4b..31f1a5b62b29 100644
--- a/docs/configuration/extensions.md
+++ b/docs/configuration/extensions.md
@@ -50,7 +50,6 @@ Core extensions are maintained by Druid committers.
|druid-kerberos|Kerberos authentication for druid processes.|[link](../development/extensions-core/druid-kerberos.md)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.md) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../querying/lookups-cached-global.md)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.md)|
-|druid-multi-stage-query| Support for the multi-stage query architecture for Apache Druid and the multi-stage query task engine.|[link](../multi-stage-query/index.md)|
|druid-orc-extensions|Support for data in Apache ORC data format.|[link](../development/extensions-core/orc.md)|
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.md)|
|druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.md)|
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f0b80523c401..12a7cd387dcd 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -156,18 +156,6 @@ Druid interacts with ZooKeeper through a set of standard path configurations. We
|`druid.zk.paths.base`|Base ZooKeeper path.|`/druid`|
|`druid.zk.paths.coordinatorPath`|Used by the Coordinator for leader election.|`${druid.zk.paths.base}/coordinator`|
-The indexing service also uses its own set of paths. These configs can be included in the common configuration.
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.zk.paths.indexer.base`|Base ZooKeeper path for |`${druid.zk.paths.base}/indexer`|
-|`druid.zk.paths.indexer.announcementsPath`|Middle Managers announce themselves here.|`${druid.zk.paths.indexer.base}/announcements`|
-|`druid.zk.paths.indexer.tasksPath`|Used to assign tasks to Middle Managers.|`${druid.zk.paths.indexer.base}/tasks`|
-|`druid.zk.paths.indexer.statusPath`|Parent path for announcement of task statuses.|`${druid.zk.paths.indexer.base}/status`|
-
-If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`.
-For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.coordinatorPath` will default to `/druid1/coordinator` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`.
-
The following path is used for service discovery. It is **not** affected by `druid.zk.paths.base` and **must** be specified separately.
|Property|Description|Default|
@@ -966,7 +954,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`|
+|`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `httpRemote`. `httpRemote` is recommended for distributed deployments and uses HTTP to interact with Middle Managers.|`httpRemote`|
|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`|
@@ -981,17 +969,14 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an underlying task persistence mechanism.|`PT1M`|
|`druid.indexer.queue.maxTaskPayloadSize`|Maximum allowed size in bytes of a single task payload accepted by the Overlord.|none (allow all task payload sizes)|
-The following configs only apply if the Overlord is running in remote mode. For a description of local vs. remote mode, see [Overlord service](../design/overlord.md).
+The following configs apply when the Overlord is running with the `httpRemote` runner. For a description of local vs. distributed mode, see [Overlord service](../design/overlord.md).
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"|
|`druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
-|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect Middle Managers to compress Znodes.|true|
-|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in ZooKeeper, should be in the range of `[10KiB, 2GiB)`. [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
-|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a Middle Manager is disconnected from ZooKeeper.|`PT15M`|
-|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a Middle Manager before timing out|`PT1M`|
+|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a Middle Manager is disconnected.|`PT15M`|
|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|1|
|`druid.indexer.runner.maxRetriesBeforeBlacklist`|Number of consecutive times the Middle Manager can fail tasks, before the worker is blacklisted, must be at least 1|5|
|`druid.indexer.runner.workerBlackListBackoffTime`|How long to wait before a task is whitelisted again. This value should be greater that the value set for taskBlackListCleanupPeriod.|`PT15M`|
@@ -1322,12 +1307,10 @@ Middle Managers pass their configurations down to their child peons. The Middle
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.allowedPrefixes`|Whitelist of prefixes for configs that can be passed down to child peons.|`com.metamx`, `druid`, `org.apache.druid`, `user.timezone`, `file.encoding`, `java.io.tmpdir`, `hadoop`|
-|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Middle Managers should compress Znodes.|true|
|`druid.indexer.runner.classpath`|Java classpath for the peon.|`System.getProperty("java.class.path")`|
|`druid.indexer.runner.javaCommand`|Command required to execute java.|java|
|`druid.indexer.runner.javaOpts`|_DEPRECATED_ A string of -X Java options to pass to the peon's JVM. Quotable parameters or parameters with spaces are encouraged to use javaOptsArray|`''`|
|`druid.indexer.runner.javaOptsArray`|A JSON array of strings to be passed in as options to the peon's JVM. This is additive to `druid.indexer.runner.javaOpts` and is recommended for properly handling arguments which contain quotes or spaces like `["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
-|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in ZooKeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.|512KiB|
|`druid.indexer.runner.startPort`|Starting port used for Peon services, should be greater than 1023 and less than 65536.|8100|
|`druid.indexer.runner.endPort`|Ending port used for Peon services, should be greater than or equal to `druid.indexer.runner.startPort` and less than 65536.|65535|
|`druid.indexer.runner.ports`|A JSON array of integers to specify ports that used for Peon services. If provided and non-empty, ports for Peon services will be chosen from these ports. And `druid.indexer.runner.startPort/druid.indexer.runner.endPort` will be completely ignored.|`[]`|
diff --git a/docs/design/zookeeper.md b/docs/design/zookeeper.md
index ca64e1a0d5bc..d69ba92f0a1c 100644
--- a/docs/design/zookeeper.md
+++ b/docs/design/zookeeper.md
@@ -36,9 +36,8 @@ The operations that happen over ZK are:
1. [Coordinator](../design/coordinator.md) leader election
2. [Overlord](../design/overlord.md) leader election
3. Service (node) announcement and discovery — services announce their presence so other services can find them
-4. [Overlord](../design/overlord.md) and [Middle Manager](../design/middlemanager.md) task management
-Segment loading, dropping, and discovery no longer use ZooKeeper — they are served over HTTP.
+Segment loading, segment discovery, and Overlord ↔ Middle Manager task management no longer use ZooKeeper — they are served over HTTP.
## Coordinator leader election
diff --git a/docs/development/extensions-core/k8s-jobs.md b/docs/development/extensions-core/k8s-jobs.md
index 67be33522ef1..b65a7bb496bd 100644
--- a/docs/development/extensions-core/k8s-jobs.md
+++ b/docs/development/extensions-core/k8s-jobs.md
@@ -1019,7 +1019,7 @@ To do this, set the following property.
|Property| Possible Values |Description|Default|required|
|--------|-----------------|-----------|-------|--------|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g., `k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection. |`k8s`|No|
-|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` (e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner to be utilized.|`httpRemote`|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` (e.g., `httpRemote`)| Specifies the variant of the worker task runner to be utilized.|`httpRemote`|No|
| **For `taskType` runner strategy:**|||||
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String` (e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides apply. This setting ensures there is always a fallback runner available.|None|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`| `JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific overrides for runner types. Each entry sets a task type to a specific runner, allowing fine control. |`{}`|No|
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 50eaf43366dc..986d7e977975 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -24,7 +24,7 @@ sidebar_label: JSON-based batch
-->
:::info
- This page describes JSON-based batch ingestion using [ingestion specs](ingestion-spec.md). For SQL-based batch ingestion using the [`druid-multi-stage-query`](../multi-stage-query/index.md) engine, see [SQL-based ingestion](../multi-stage-query/index.md). Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
+ This page describes JSON-based batch ingestion using [ingestion specs](ingestion-spec.md). For SQL-based batch ingestion using the [multi-stage query (MSQ) task engine](../multi-stage-query/index.md), see [SQL-based ingestion](../multi-stage-query/index.md). Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
:::
Apache Druid supports the following types of JSON-based batch indexing tasks:
diff --git a/docs/multi-stage-query/security.md b/docs/multi-stage-query/security.md
index 77acafc29f51..0a50b68d4d6f 100644
--- a/docs/multi-stage-query/security.md
+++ b/docs/multi-stage-query/security.md
@@ -23,9 +23,9 @@ sidebar_label: Security
~ under the License.
-->
-All authenticated users can use the multi-stage query task engine (MSQ task engine) through the UI and API if the
-extension is loaded. However, without additional permissions, users are not able to issue queries that read or write
-Druid datasources or external data. The permission needed depends on what the user is trying to do.
+All authenticated users can use the multi-stage query task engine (MSQ task engine) through the UI and API. However,
+without additional permissions, users are not able to issue queries that read or write Druid datasources or external
+data. The permission needed depends on what the user is trying to do.
To submit a query:
@@ -77,4 +77,3 @@ The MSQ task engine needs the following permissions for pushing, fetching, and r
- `Microsoft.Storage/storageAccounts/blobServices/containers/blobs/delete` to delete files when they're no longer needed.
-
diff --git a/docs/operations/web-console.md b/docs/operations/web-console.md
index ef1118ebc4ce..5d935106c3d5 100644
--- a/docs/operations/web-console.md
+++ b/docs/operations/web-console.md
@@ -65,7 +65,7 @@ You can access the [data loader](#data-loader) and [lookups view](#lookups) from
## Query
-SQL-based ingestion and the multi-stage query task engine use the **Query** view, which provides you with a UI to edit and use SQL queries. You should see this UI automatically in Druid 24.0 and later since the multi-stage query extension is loaded by default.
+SQL-based ingestion and the multi-stage query task engine use the **Query** view, which provides you with a UI to edit and use SQL queries.
The following screenshot shows a populated enhanced **Query** view along with a description of its parts:
diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md
index c7b7d4e4efc2..3add7863c46a 100644
--- a/docs/querying/aggregations.md
+++ b/docs/querying/aggregations.md
@@ -471,7 +471,11 @@ For these reasons, we have deprecated this aggregator and recommend using the Da
### Expression aggregator
-Aggregator applicable only at query time. Aggregates results using [Druid expressions](./math-expr.md) functions to facilitate building custom functions.
+Aggregates results using [Druid expressions](./math-expr.md) functions to facilitate building custom functions.
+
+The expression aggregator can be used at query time with any intermediate type. It can also be used at ingest time, but
+only when the type of `initialValue` is a primitive numeric type (`LONG` or `DOUBLE`) and matches the type of
+`initialCombineValue`. Other intermediate types, such as strings, arrays, and complex types, are query-time only.
| Property | Description | Required |
| --- | --- | --- |
diff --git a/docs/querying/query-context-reference.md b/docs/querying/query-context-reference.md
index c485c0231c06..41bd206199e7 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -68,7 +68,7 @@ Unless otherwise noted, the following parameters apply to all query types, and t
|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.|
|`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.|
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced: - Log the stack trace of the exception (if any) produced by the query |
-|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
+|`setProcessingThreadNames`|`false`| Flag indicating whether processing thread names will be set to `processing_` while processing a query. Thread renaming aids in interpreting thread dumps, but has measurable thread renaming overhead when segment scans are very quick. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|
|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are queried. `include` queries all segments, including realtime. `exclude` skips realtime segments. `exclusive` queries only realtime segments. |
@@ -140,4 +140,3 @@ For more information, see the following topics:
- [Set query context](./query-context.md) to learn how to configure query context parameters.
- [SQL query context](sql-query-context.md) for query context parameters specific to Druid SQL.
- [SQL-based ingestion reference](../multi-stage-query/reference/#context-parameters) for context parameters used in SQL-based ingestion (MSQ).
-
diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md
index f270c1b74353..730fef78d074 100644
--- a/docs/tutorials/index.md
+++ b/docs/tutorials/index.md
@@ -67,7 +67,7 @@ The distribution directory contains `LICENSE` and `NOTICE` files and subdirector
## Start up Druid services
Start up Druid services using the automatic single-machine configuration.
-This configuration includes default settings that are appropriate for this tutorial, such as loading the `druid-multi-stage-query` extension by default so that you can use the MSQ task engine.
+This configuration includes default settings that are appropriate for this tutorial.
You can view the default settings in the configuration files located in `conf/druid/auto`.
diff --git a/docs/tutorials/tutorial-msq-convert-spec.md b/docs/tutorials/tutorial-msq-convert-spec.md
index 0d386bc06293..a8501284ca9d 100644
--- a/docs/tutorials/tutorial-msq-convert-spec.md
+++ b/docs/tutorials/tutorial-msq-convert-spec.md
@@ -25,9 +25,8 @@ description: How to convert an ingestion spec to a query for SQL-based ingestion
-->
:::info
- This page describes SQL-based batch ingestion using the [`druid-multi-stage-query`](../multi-stage-query/index.md)
- extension, new in Druid 24.0. Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which
- ingestion method is right for you.
+ This page describes SQL-based batch ingestion using the [multi-stage query (MSQ) task engine](../multi-stage-query/index.md).
+ Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
:::
If you're already ingesting data with [native batch ingestion](../ingestion/native-batch.md), you can use the [web console](../operations/web-console.md) to convert the ingestion spec to a SQL query that the multi-stage query task engine can use to ingest data.
diff --git a/docs/tutorials/tutorial-msq-extern.md b/docs/tutorials/tutorial-msq-extern.md
index dcd0d5095980..1cb7aac89092 100644
--- a/docs/tutorials/tutorial-msq-extern.md
+++ b/docs/tutorials/tutorial-msq-extern.md
@@ -25,9 +25,8 @@ description: How to generate a query that references externally hosted data
-->
:::info
- This page describes SQL-based batch ingestion using the [`druid-multi-stage-query`](../multi-stage-query/index.md)
- extension, new in Druid 24.0. Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which
- ingestion method is right for you.
+ This page describes SQL-based batch ingestion using the [multi-stage query (MSQ) task engine](../multi-stage-query/index.md).
+ Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
:::
This tutorial demonstrates how to generate a query that references externally hosted data using the **Connect external data** wizard.
diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
index 84ee947c8467..4692ec0715f5 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
@@ -33,9 +33,12 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
@@ -55,6 +58,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -107,6 +111,65 @@ public class CompactionTaskTest extends CompactionTestBase
"namespace", "continent", "country", "region", "city", "timestamp"
);
+ /**
+ * Index task identical in shape to {@link MoreResources.Task#INDEX_TASK_WITH_AGGREGATORS} but with a pair of
+ * {@link ExpressionLambdaAggregatorFactory} metrics over the {@code added} long field. Used by
+ * {@link #testCompactionWithExpressionLambdaAggregator} to verify that an expression aggregator works correctly.
+ */
+ private static final Supplier INDEX_TASK_WITH_EXPR_AGG = () ->
+ TaskBuilder
+ .ofTypeIndex()
+ .jsonInputFormat()
+ .localInputSourceWithFiles(
+ Resources.DataFile.tinyWiki1Json(),
+ Resources.DataFile.tinyWiki2Json(),
+ Resources.DataFile.tinyWiki3Json()
+ )
+ .timestampColumn("timestamp")
+ .dimensions(
+ "page",
+ "language", "tags", "user", "unpatrolled", "newPage", "robot",
+ "anonymous", "namespace", "continent", "country", "region", "city"
+ )
+ .metricAggregates(
+ new CountAggregatorFactory("ingested_events"),
+ new ExpressionLambdaAggregatorFactory(
+ "added_sum_expr",
+ Set.of("added"),
+ null,
+ "0",
+ null,
+ null,
+ false,
+ false,
+ "__acc + added",
+ null,
+ null,
+ null,
+ null,
+ TestExprMacroTable.INSTANCE
+ ),
+ new ExpressionLambdaAggregatorFactory(
+ "added_or_expr",
+ Set.of("added"),
+ null,
+ "0",
+ null,
+ null,
+ false,
+ false,
+ "bitwiseOr(\"__acc\", \"added\")",
+ null,
+ null,
+ null,
+ null,
+ TestExprMacroTable.INSTANCE
+ )
+ )
+ .dynamicPartitionWithMaxRows(3)
+ .granularitySpec("DAY", "SECOND", true)
+ .appendToExisting(false);
+
private String fullDatasourceName;
@BeforeEach
@@ -259,6 +322,33 @@ public void testCompactionWithTimestampDimension() throws Exception
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP.get(), COMPACTION_TASK.get(), null);
}
+ @Test
+ public void testCompactionWithExpressionLambdaAggregator() throws Exception
+ {
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ runTask(INDEX_TASK_WITH_EXPR_AGG.get());
+ verifySegmentsCount(4);
+
+ // Snapshot metric values prior to compaction.
+ final String preCompact = cluster.runSql(
+ "SELECT SUM(added_sum_expr), SUM(added_or_expr) FROM %s",
+ fullDatasourceName
+ );
+
+ // Compact 4 segments -> 2; this performs cross-segment rollup which drives RowCombiningTimeAndDimsIterator
+ // into ExpressionLambdaAggregatorFactory.makeAggregateCombiner().
+ compactData(COMPACTION_TASK.get(), null, null);
+ verifySegmentsCount(2);
+
+ // Metric values must round-trip through compaction unchanged.
+ final String postCompact = cluster.runSql(
+ "SELECT SUM(added_sum_expr), SUM(added_or_expr) FROM %s",
+ fullDatasourceName
+ );
+ Assertions.assertEquals(preCompact, postCompact);
+ }
+ }
+
private void loadDataAndCompact(
TaskBuilder.Index indexTask,
TaskBuilder.Compact compactionResource,
diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
index 7e22d85d9cab..fa184418df52 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
@@ -292,6 +292,48 @@ public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds
Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR");
}
+ @Test
+ public void test_resetToLatestAndBackfill()
+ {
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ // Create a streaming supervisor with concurrent locks and withUseEarliestSequenceNumber=false
+ final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+ .withContext(Map.of("useConcurrentLocks", true))
+ .withIoConfig(io -> io
+ .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null))
+ .withUseEarliestSequenceNumber(false)
+ )
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(supervisor);
+
+ waitForSupervisorDetailedState(supervisor.getId(), "RUNNING");
+
+ final int totalRecords = publish1kRecords(topic, false);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ // Reset the main supervisor and spin up a backfill supervisor.
+ // Since all records are already ingested before the call, the backfill
+ // supervisor will complete immediately without ingesting anything.
+ final Map result = cluster.callApi().resetToLatestAndBackfill(supervisor.getId());
+ Assertions.assertEquals(supervisor.getId(), result.get("id"));
+ final String backfillSupervisorId = (String) result.get("backfillSupervisorId");
+
+ // Wait for the backfill to finish
+ waitForSupervisorToComplete(backfillSupervisorId);
+
+ // Main supervisor should still be running
+ final SupervisorStatus mainStatus = cluster.callApi().getSupervisorStatus(supervisor.getId());
+ Assertions.assertEquals("RUNNING", mainStatus.getState());
+ Assertions.assertTrue(mainStatus.isHealthy());
+
+ final SupervisorStatus backfillStatus = cluster.callApi().getSupervisorStatus(backfillSupervisorId);
+ Assertions.assertEquals("COMPLETED", backfillStatus.getState());
+ Assertions.assertTrue(backfillStatus.isHealthy());
+ }
+
private void waitForSupervisorToComplete(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
@@ -301,6 +343,15 @@ private void waitForSupervisorToComplete(String supervisorId)
);
}
+ private void waitForSupervisorDetailedState(String supervisorId, String detailedState)
+ {
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("supervisor/count")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+ .hasDimension("detailedState", detailedState)
+ );
+ }
+
private void waitForSupervisorToBeUnhealthy(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 6099105b3374..04973a5272fd 100644
--- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -322,7 +322,7 @@ protected Map getTimeLagPerPartition(Map currentOffs
}
@Override
- protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
+ public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -408,7 +408,7 @@ public LagStats computeLagStats()
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();
@@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream()
}
@Override
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
index 4a445f6f1c11..4763a949a615 100644
--- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
+++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
@@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -155,6 +156,55 @@ protected RabbitStreamSupervisorSpec toggleSuspend(boolean suspend)
supervisorStateManagerConfig);
}
+ @Override
+ public RabbitStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ RabbitStreamSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ RabbitStreamSupervisorIOConfig backfillIoConfig = new RabbitStreamSupervisorIOConfig(
+ ioConfig.getStream(),
+ ioConfig.getUri(),
+ ioConfig.getInputFormat(),
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getConsumerProperties(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.getPollTimeout(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getStopTaskCount(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new RabbitStreamSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (RabbitStreamIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
+ }
+
@Override
public String toString()
{
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 727eb52db272..5863284cc2d9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -356,7 +356,7 @@ protected Map getTimeLagPerPartition(Map map)
+ public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -548,7 +548,7 @@ private Map getTimestampPerPartitionAtCurrentOffset(S
*
*/
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
@@ -597,7 +597,7 @@ private void updateOffsetSnapshot(
}
@Override
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
@@ -630,7 +630,7 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
- * config from single topic to mult-topic, where the new multi-topic pattern regex matches the
+ * config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
@@ -640,7 +640,7 @@ protected boolean isMultiTopic()
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
- protected Map getOffsetsFromMetadataStorage()
+ public Map getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index b607ade1acfe..31d3e8fad691 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -36,6 +36,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}
+ @Override
+ public KafkaSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig(
+ ioConfig.getTopic(),
+ ioConfig.getTopicPattern(),
+ ioConfig.getInputFormat(),
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getConsumerProperties(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.getLagAggregator(),
+ ioConfig.getPollTimeout(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getConfigOverrides(),
+ ioConfig.getIdleConfig(),
+ ioConfig.getStopTaskCount(),
+ ioConfig.isEmitTimeLagMetrics(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KafkaSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KafkaIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
+ }
+
/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic.
*
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 8879ff6d9753..06ca9b64ced5 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -564,6 +565,38 @@ public void test_validateSpecUpdateTo()
sourceSpec.validateSpecUpdateTo(validDestSpec);
}
+ @Test
+ public void testCreateBackfillSpec()
+ {
+ KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers", "localhost:9092"))
+ .withTaskCount(3)
+ )
+ .build("testDs", "metrics");
+
+ BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(
+ Map.of("0", 100L, "1", 200L),
+ Map.of("0", 500L, "1", 600L)
+ );
+
+ KafkaSupervisorSpec backfill = (KafkaSupervisorSpec) spec.createBackfillSpec("backfill-id", boundedStreamConfig, 2);
+
+ Assert.assertEquals("backfill-id", backfill.getId());
+ Assert.assertEquals("testDs", backfill.getSpec().getDataSchema().getDataSource());
+ Assert.assertEquals("metrics", backfill.getSpec().getIOConfig().getTopic());
+ Assert.assertEquals(2, backfill.getSpec().getIOConfig().getTaskCount());
+ Assert.assertEquals(boundedStreamConfig, backfill.getSpec().getIOConfig().getBoundedStreamConfig());
+ }
+
private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
{
KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder()
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 0f91fc0965db..3f1f4034f3ce 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -321,7 +321,7 @@ protected Map getTimeLagPerPartition(Map currentOf
}
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -336,7 +336,7 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 8e6615716809..4899337797bf 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -35,6 +35,7 @@
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -193,4 +194,57 @@ protected KinesisSupervisorSpec toggleSuspend(boolean suspend)
supervisorStateManagerConfig
);
}
+
+ @Override
+ public KinesisSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KinesisSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KinesisSupervisorIOConfig backfillIoConfig = new KinesisSupervisorIOConfig(
+ ioConfig.getStream(),
+ ioConfig.getInputFormat(),
+ ioConfig.getEndpoint(),
+ null,
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getRecordsPerFetch(),
+ ioConfig.getFetchDelayMillis(),
+ ioConfig.getAwsAssumedRoleArn(),
+ ioConfig.getAwsExternalId(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.isDeaggregate(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KinesisSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KinesisIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ awsCredentialsConfig,
+ supervisorStateManagerConfig
+ );
+ }
}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
index 89311981b0e2..9ec20045361c 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
@@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import javax.annotation.Nullable;
@@ -51,11 +50,9 @@ public KubernetesAndWorkerTaskRunnerConfig(
this.runnerStrategy = ObjectUtils.getIfNull(runnerStrategy, KubernetesTaskRunnerFactory.TYPE_NAME);
this.workerType = ObjectUtils.getIfNull(workerType, HttpRemoteTaskRunnerFactory.TYPE_NAME);
Preconditions.checkArgument(
- this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
- this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
- "workerType must be set to one of (%s, %s)",
- HttpRemoteTaskRunnerFactory.TYPE_NAME,
- RemoteTaskRunnerFactory.TYPE_NAME
+ this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME),
+ "workerType must be set to [%s]; the ZooKeeper-based 'remote' worker type has been removed.",
+ HttpRemoteTaskRunnerFactory.TYPE_NAME
);
}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 6e82bb8766ff..b45aa6fb846d 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -44,7 +44,6 @@
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
@@ -264,10 +263,10 @@ public void stop()
}
/**
- * Provides a TaskRunnerFactory instance suitable for environments without Zookeeper.
- * In such environments, the standard RemoteTaskRunnerFactory may not be operational.
- * Depending on the workerType defined in KubernetesAndWorkerTaskRunnerConfig,
- * this method selects and returns an appropriate TaskRunnerFactory implementation.
+ * Provides the worker-side {@link TaskRunnerFactory} that the {@code k8sAndWorker} runner pairs
+ * with {@link KubernetesTaskRunnerFactory}. Only {@link HttpRemoteTaskRunnerFactory} is
+ * supported; the ZooKeeper-based 'remote' worker type was removed, and
+ * {@link KubernetesAndWorkerTaskRunnerConfig} enforces this at config-validation time.
*/
@Provides
@LazySingleton
@@ -277,10 +276,8 @@ TaskRunnerFactory extends WorkerTaskRunner> provideWorkerTaskRunner(
Injector injector
)
{
- String workerType = runnerConfig.getWorkerType();
- return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType)
- ? injector.getInstance(HttpRemoteTaskRunnerFactory.class)
- : injector.getInstance(RemoteTaskRunnerFactory.class);
+ // workerType is validated to be HttpRemoteTaskRunnerFactory.TYPE_NAME by the config.
+ return injector.getInstance(HttpRemoteTaskRunnerFactory.class);
}
/**
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
index 329a1ea52bce..5338ad2ebb9b 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
@@ -38,7 +38,7 @@ public void test_deserializable() throws IOException
);
Assertions.assertEquals("worker", config.getRunnerStrategy());
- Assertions.assertEquals("remote", config.getWorkerType());
+ Assertions.assertEquals("httpRemote", config.getWorkerType());
}
@Test
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index e37313ebb0fb..55e5103567b6 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -35,7 +35,6 @@
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.JacksonModule;
@@ -77,8 +76,6 @@ public class KubernetesOverlordModuleTest
@Mock
private HttpClient httpClient;
@Mock
- private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
- @Mock
private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
@Mock
private ConfigManagerConfig configManagerConfig;
@@ -111,7 +108,7 @@ public void setUpConfigManagerMock()
@Test
public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
{
- injector = makeInjectorWithProperties(initializePropertes(false), false, true);
+ injector = makeInjectorWithProperties(initializePropertes(), true);
KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance(
KubernetesAndWorkerTaskRunnerFactory.class);
Assertions.assertNotNull(taskRunnerFactory);
@@ -122,32 +119,21 @@ public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
@Test
public void testMultipleKubernetesTaskRunnerFactoryBindSuccessfully()
{
- final Properties props = initializePropertes(false);
+ final Properties props = initializePropertes();
props.setProperty("druid.indexer.runner.type", MultipleKubernetesTaskRunnerFactory.TYPE_NAME);
props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
final TaskRunnerFactory> taskRunnerFactory = injector.getInstance(TaskRunnerFactory.class);
Assertions.assertInstanceOf(MultipleKubernetesTaskRunnerFactory.class, taskRunnerFactory);
}
- @Test
- public void testRemoteTaskRunnerFactoryBindSuccessfully()
- {
- injector = makeInjectorWithProperties(initializePropertes(true), true, false);
- KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance(
- KubernetesAndWorkerTaskRunnerFactory.class);
- Assertions.assertNotNull(taskRunnerFactory);
-
- Assertions.assertNotNull(taskRunnerFactory.build());
- }
-
@Test
public void testExceptionThrownIfNoTaskRunnerFactoryBind()
{
Assertions.assertThrows(ProvisionException.class, () -> {
- injector = makeInjectorWithProperties(initializePropertes(false), false, false);
+ injector = makeInjectorWithProperties(initializePropertes(), false);
injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class);
});
}
@@ -159,7 +145,7 @@ public void test_build_withMultiContainerAdapterType_returnsWithMultiContainerTa
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordMultiContainer");
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
TaskAdapter taskAdapter = injector.getInstance(
TaskAdapter.class);
@@ -173,7 +159,7 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordSingleContainer");
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
TaskAdapter taskAdapter = injector.getInstance(
TaskAdapter.class);
@@ -188,7 +174,7 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsPro
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordSingleContainer");
props.setProperty("druid.indexer.runner.sidecarSupport", "true");
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
Assertions.assertThrows(
ProvisionException.class,
@@ -203,7 +189,7 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo
Properties props = new Properties();
props.setProperty("druid.indexer.runner.sidecarSupport", "true");
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
TaskAdapter adapter = injector.getInstance(TaskAdapter.class);
@@ -218,7 +204,7 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing
Properties props = new Properties();
props.setProperty("druid.indexer.runner.sidecarSupport", "false");
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
TaskAdapter adapter = injector.getInstance(TaskAdapter.class);
@@ -235,7 +221,7 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit
props.setProperty("druid.indexer.runner.k8s.adapter.type", "customTemplateAdapter");
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", url.getPath());
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
TaskAdapter adapter = injector.getInstance(TaskAdapter.class);
@@ -251,7 +237,7 @@ public void test_httpClientFactory_defaultsToVertx()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
// Don't set httpClientType - should default to vertx
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
DruidKubernetesHttpClientFactory factory = injector.getInstance(DruidKubernetesHttpClientFactory.class);
Assertions.assertNotNull(factory);
@@ -266,7 +252,7 @@ public void test_httpClientFactory_okhttpSelection()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
props.setProperty("druid.indexer.runner.k8sAndWorker.http.httpClientType", "okhttp");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
DruidKubernetesHttpClientFactory factory = injector.getInstance(DruidKubernetesHttpClientFactory.class);
Assertions.assertNotNull(factory);
@@ -281,7 +267,7 @@ public void test_httpClientFactory_vertxExplicitSelection()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
props.setProperty("druid.indexer.runner.k8sAndWorker.http.httpClientType", "vertx");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
DruidKubernetesHttpClientFactory factory = injector.getInstance(DruidKubernetesHttpClientFactory.class);
Assertions.assertNotNull(factory);
@@ -296,7 +282,7 @@ public void test_httpClientFactory_jdkSelection()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
props.setProperty("druid.indexer.runner.k8sAndWorker.http.httpClientType", "javaStandardHttp");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
DruidKubernetesHttpClientFactory factory = injector.getInstance(DruidKubernetesHttpClientFactory.class);
Assertions.assertNotNull(factory);
@@ -312,7 +298,7 @@ public void test_httpClientFactory_invalidTypeThrowsException()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
props.setProperty("druid.indexer.runner.k8sAndWorker.http.httpClientType", "invalid");
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
injector.getInstance(DruidKubernetesHttpClientFactory.class);
});
}
@@ -324,7 +310,7 @@ public void test_druidKubernetesClient_createdWithVertxClient()
props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
// Don't set httpClientType - should default to vertx
- injector = makeInjectorWithProperties(props, false, true);
+ injector = makeInjectorWithProperties(props, true);
DruidKubernetesClient client = injector.getInstance(DruidKubernetesClient.class);
Assertions.assertNotNull(client, "DruidKubernetesClient should be created successfully");
@@ -333,8 +319,7 @@ public void test_druidKubernetesClient_createdWithVertxClient()
private Injector makeInjectorWithProperties(
final Properties props,
- boolean isWorkerTypeRemote,
- boolean isWorkerTypeHttpRemote
+ boolean bindHttpRemoteTaskRunnerFactory
)
{
return Guice.createInjector(
@@ -350,10 +335,7 @@ private Injector makeInjectorWithProperties(
binder.bind(DruidNode.class)
.annotatedWith(Self.class)
.toInstance(new DruidNode("test-inject", null, false, null, null, true, false));
- if (isWorkerTypeRemote) {
- binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory);
- }
- if (isWorkerTypeHttpRemote) {
+ if (bindHttpRemoteTaskRunnerFactory) {
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
}
binder.bind(
@@ -374,14 +356,11 @@ private Injector makeInjectorWithProperties(
));
}
- private static Properties initializePropertes(boolean isWorkerTypeRemote)
+ private static Properties initializePropertes()
{
final Properties props = new Properties();
props.put("druid.indexer.runner.namespace", "NAMESPACE");
props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s");
- if (isWorkerTypeRemote) {
- props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType", "remote");
- }
return props;
}
}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json b/extensions-core/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
index 43e7414f11f8..de09ff0ee209 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
@@ -1,4 +1,4 @@
{
"runnerStrategy.type": "worker",
- "runnerStrategy.workerType": "remote"
-}
\ No newline at end of file
+ "runnerStrategy.workerType": "httpRemote"
+}
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
index 7474a79a15eb..96bfc03214dc 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
@@ -36,12 +36,14 @@
import java.io.File;
import java.io.IOException;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
public class OrcInputFormat extends NestedInputFormat
{
static final long SCALE_FACTOR = 8L;
private final boolean binaryAsString;
private final Configuration conf;
+ private final AtomicBoolean fileSystemInitialized = new AtomicBoolean(false);
@JsonCreator
public OrcInputFormat(
@@ -55,19 +57,20 @@ public OrcInputFormat(
this.conf = conf;
}
- private void initialize(Configuration conf)
+ // Init FileSystem once under this class's classloader to avoid concurrent setContextClassLoader races.
+ private void ensureFileSystemInitialized()
{
- //Initializing seperately since during eager initialization, resolving
- //namenode hostname throws an error if nodes are ephemeral
-
- // Ensure that FileSystem class level initialization happens with correct CL
- // See https://github.com/apache/druid/issues/1714
- ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ if (!fileSystemInitialized.compareAndSet(false, true)) {
+ return;
+ }
+ final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
FileSystem.get(conf);
}
catch (IOException ex) {
+ // Reset so a subsequent createReader can retry init instead of skipping it.
+ fileSystemInitialized.set(false);
throw new RuntimeException(ex);
}
finally {
@@ -91,7 +94,7 @@ public boolean getBinaryAsString()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
- initialize(conf);
+ ensureFileSystemInitialized();
return new OrcReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}
diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
index a7f6e5131c3a..555d1de2c998 100644
--- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
+++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
@@ -73,6 +73,7 @@ public void testEquals()
{
EqualsVerifier.forClass(OrcInputFormat.class)
.withPrefabValues(Configuration.class, new Configuration(), new Configuration())
+ .withIgnoredFields("fileSystemInitialized")
.usingGetClass()
.verify();
}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index 2cac95a200b1..82412fe412c9 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -417,9 +417,9 @@ static void uploadFileIfPossible(
/**
* Determines whether to use HTTP or HTTPS protocol based on configuration.
*/
- public static boolean useHttps(AWSClientConfig clientConfig, AWSEndpointConfig endpointConfig)
+ public static boolean useHttps(@Nullable AWSClientConfig clientConfig, AWSEndpointConfig endpointConfig)
{
- String protocol = clientConfig.getProtocol();
+ final String protocol = clientConfig == null ? null : clientConfig.getProtocol();
final String endpointUrl = endpointConfig.getUrl();
if (org.apache.commons.lang3.StringUtils.isNotEmpty(endpointUrl)) {
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 168216affdca..e9291c739249 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -404,6 +404,39 @@ public void testSerdeWithCloudConfigPropertiesWithSessionToken() throws Exceptio
EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}
+ @Test
+ public void testSchemelessEndpointConfigUrlWithNullClientConfigResolvesSupplier() throws Exception
+ {
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig())
+ .andStubReturn(S3_STORAGE_CONFIG);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+
+ final AWSEndpointConfig schemelessEndpoint = MAPPER.readValue(
+ "{\"url\":\"s3.example.com\",\"signingRegion\":\"us-east-1\"}",
+ AWSEndpointConfig.class
+ );
+
+ final S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null,
+ CLOUD_CONFIG_PROPERTIES,
+ null,
+ schemelessEndpoint,
+ null
+ );
+
+ // Forces s3ClientSupplier evaluation, which hits S3Utils.useHttps and confirms a null client config does not blow up.
+ inputSource.createEntity(new CloudObjectLocation("bucket", "path"));
+
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ }
+
@Test
public void testGetSetSessionToken()
{
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java
index 6c46df7d993a..16b8c20d0f2e 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java
@@ -19,6 +19,9 @@
package org.apache.druid.storage.s3;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@@ -382,4 +385,42 @@ public void testRetryWithS3MultiObjectDeleteException() throws Exception
);
Assert.assertEquals(maxRetries, count.get());
}
+
+ private static final ObjectMapper JSON = new ObjectMapper();
+
+ private static AWSEndpointConfig endpointWith(String json) throws IOException
+ {
+ return JSON.readValue(json, AWSEndpointConfig.class);
+ }
+
+ @Test
+ public void testUseHttpsNullClientConfigSchemelessEndpointReturnsTrue() throws IOException
+ {
+ Assert.assertTrue(S3Utils.useHttps(null, endpointWith("{\"url\":\"s3.example.com\"}")));
+ }
+
+ @Test
+ public void testUseHttpsNullClientConfigHttpEndpointReturnsFalse() throws IOException
+ {
+ Assert.assertFalse(S3Utils.useHttps(null, endpointWith("{\"url\":\"http://s3.example.com\"}")));
+ }
+
+ @Test
+ public void testUseHttpsNullClientConfigHttpsEndpointReturnsTrue() throws IOException
+ {
+ Assert.assertTrue(S3Utils.useHttps(null, endpointWith("{\"url\":\"https://s3.example.com\"}")));
+ }
+
+ @Test
+ public void testUseHttpsNullClientConfigNullEndpointUrlReturnsTrue() throws IOException
+ {
+ Assert.assertTrue(S3Utils.useHttps(null, new AWSEndpointConfig()));
+ }
+
+ @Test
+ public void testUseHttpsDefaultClientConfigSchemelessEndpointReturnsTrue() throws IOException
+ {
+ // Sanity check: default AWSClientConfig protocol is "https"; schemeless URL inherits "https".
+ Assert.assertTrue(S3Utils.useHttps(new AWSClientConfig(), endpointWith("{\"url\":\"s3.example.com\"}")));
+ }
}
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 42e117203649..e6cc7d787c10 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -92,14 +92,6 @@
io.nettynetty
-
- org.apache.zookeeper
- zookeeper
-
-
- org.apache.zookeeper
- zookeeper-jute
- com.fasterxml.jackson.corejackson-core
@@ -112,10 +104,6 @@
com.google.guavaguava
-
- org.apache.curator
- curator-recipes
- jakarta.validationjakarta.validation-api
diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
index cc3732439d8a..da60043c6780 100644
--- a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
+++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceModuleHelper.java
@@ -22,8 +22,6 @@
import com.google.inject.Binder;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
-import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
-import org.apache.druid.server.initialization.IndexerZkConfig;
/**
*/
@@ -34,8 +32,6 @@ public class IndexingServiceModuleHelper
public static void configureTaskRunnerConfigs(Binder binder)
{
JsonConfigProvider.bind(binder, INDEXER_RUNNER_PROPERTY_PREFIX, ForkingTaskRunnerConfig.class);
- JsonConfigProvider.bind(binder, INDEXER_RUNNER_PROPERTY_PREFIX, RemoteTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, INDEXER_RUNNER_PROPERTY_PREFIX, HttpRemoteTaskRunnerConfig.class);
- JsonConfigProvider.bind(binder, "druid.zk.paths.indexer", IndexerZkConfig.class);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index cd911ed99811..76a8385d7095 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -105,7 +105,8 @@ public ImmutableWorkerInfo(
}
/**
- * Helper used by {@link ZkWorker} and {@link org.apache.druid.indexing.overlord.hrtr.WorkerHolder}.
+ * Helper used by {@link org.apache.druid.indexing.overlord.hrtr.WorkerHolder} to build a worker view from a set of
+ * task announcements.
*/
public static ImmutableWorkerInfo fromWorkerAnnouncements(
final Worker worker,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
deleted file mode 100644
index 4018701d447f..000000000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ /dev/null
@@ -1,1673 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.overlord;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.concurrent.LifecycleLock;
-import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.curator.cache.PathChildrenCacheFactory;
-import org.apache.druid.indexer.RunnerTaskState;
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
-import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
-import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
-import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
-import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
-import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
-import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
-import org.apache.druid.indexing.worker.TaskAnnouncement;
-import org.apache.druid.indexing.worker.Worker;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.RE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.server.initialization.IndexerZkConfig;
-import org.apache.druid.tasklogs.TaskLogStreamer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.joda.time.Duration;
-import org.joda.time.Period;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
- * The RemoteTaskRunner uses Zookeeper to keep track of which workers are running which tasks. Tasks are assigned by
- * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
- * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
- * The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
- *
- * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
- * fail. The RemoteTaskRunner depends on another component to create additional worker resources.
- *
- * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
- * worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
- *
- * The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
- */
-public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
-{
- private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
- private static final Joiner JOINER = Joiner.on("/");
-
- private final ObjectMapper jsonMapper;
- private final RemoteTaskRunnerConfig config;
- private final Duration shutdownTimeout;
- private final IndexerZkConfig indexerZkConfig;
- private final CuratorFramework cf;
- private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory;
- private final ExecutorService workerStatusPathChildrenCacheExecutor;
- private final PathChildrenCache workerPathCache;
- private final HttpClient httpClient;
- private final Supplier workerConfigRef;
-
- // all workers that exist in ZK
- private final ConcurrentMap zkWorkers = new ConcurrentHashMap<>();
- // payloads of pending tasks, which we remember just long enough to assign to workers
- private final ConcurrentMap pendingTaskPayloads = new ConcurrentHashMap<>();
- // tasks that have not yet been assigned to a worker
- private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
- // all tasks that have been assigned to a worker
- private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
- // tasks that are complete but not cleaned up yet
- private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
-
- private final ExecutorService runPendingTasksExec;
-
- // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy.
- private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>();
-
- // Workers that have been blacklisted.
- private final Set blackListedWorkers = Collections.synchronizedSet(new HashSet<>());
-
- // task runner listeners
- private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>();
-
- // workers which were assigned a task and are yet to acknowledge same.
- // Map: workerId -> taskId
- private final ConcurrentMap workersWithUnacknowledgedTask = new ConcurrentHashMap<>();
- // Map: taskId -> taskId .tasks which are being tried to be assigned to a worker
- private final ConcurrentMap tryAssignTasks = new ConcurrentHashMap<>();
-
- private final Object statusLock = new Object();
-
- private final LifecycleLock lifecycleLock = new LifecycleLock();
-
- private final ListeningScheduledExecutorService cleanupExec;
-
- private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>();
- private final ProvisioningStrategy provisioningStrategy;
- private final ServiceEmitter emitter;
- private ProvisioningService provisioningService;
-
- public RemoteTaskRunner(
- ObjectMapper jsonMapper,
- RemoteTaskRunnerConfig config,
- IndexerZkConfig indexerZkConfig,
- CuratorFramework cf,
- PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
- HttpClient httpClient,
- Supplier workerConfigRef,
- ProvisioningStrategy provisioningStrategy,
- ServiceEmitter emitter
- )
- {
- this.jsonMapper = jsonMapper;
- this.config = config;
- this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast
- this.indexerZkConfig = indexerZkConfig;
- this.cf = cf;
- this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZkConfig.getAnnouncementsPath());
- this.workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor();
- this.workerStatusPathChildrenCacheFactory = pathChildrenCacheFactory
- .withExecutorService(workerStatusPathChildrenCacheExecutor)
- .withShutdownExecutorOnClose(false)
- .build();
- this.httpClient = httpClient;
- this.workerConfigRef = workerConfigRef;
- this.cleanupExec = MoreExecutors.listeningDecorator(
- ScheduledExecutors.fixed(1, "RemoteTaskRunner-Scheduled-Cleanup--%d")
- );
- this.provisioningStrategy = provisioningStrategy;
- this.runPendingTasksExec = Execs.multiThreaded(
- config.getPendingTasksRunnerNumThreads(),
- "rtr-pending-tasks-runner-%d"
- );
- this.emitter = emitter;
- }
-
- @Override
- @LifecycleStart
- public void start()
- {
- if (!lifecycleLock.canStart()) {
- return;
- }
- try {
- log.info("Starting RemoteTaskRunner...");
- final MutableInt waitingFor = new MutableInt(1);
- final Object waitingForMonitor = new Object();
-
- // Add listener for creation/deletion of workers
- workerPathCache.getListenable().addListener(
- (client, event) -> {
- final Worker worker;
- switch (event.getType()) {
- case CHILD_ADDED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- synchronized (waitingForMonitor) {
- waitingFor.increment();
- }
- Futures.addCallback(
- addWorker(worker),
- new FutureCallback<>()
- {
- @Override
- public void onSuccess(ZkWorker zkWorker)
- {
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
- }
-
- @Override
- public void onFailure(Throwable throwable)
- {
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
- }
- },
- MoreExecutors.directExecutor()
- );
- break;
- case CHILD_UPDATED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- updateWorker(worker);
- break;
-
- case CHILD_REMOVED:
- worker = jsonMapper.readValue(
- event.getData().getData(),
- Worker.class
- );
- removeWorker(worker);
- break;
- case INITIALIZED:
- // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
- List workers;
- try {
- workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
- }
- catch (KeeperException.NoNodeException e) {
- // statusPath doesn't exist yet; can occur if no middleManagers have started.
- workers = ImmutableList.of();
- }
- for (String workerId : workers) {
- final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
- final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
- if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
- try {
- scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
- }
- catch (Exception e) {
- log.warn(
- e,
- "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
- workerId,
- workerStatusPath
- );
- }
- }
- }
- synchronized (waitingForMonitor) {
- waitingFor.decrement();
- waitingForMonitor.notifyAll();
- }
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
- }
- }
- );
- workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- synchronized (waitingForMonitor) {
- while (waitingFor.intValue() > 0) {
- waitingForMonitor.wait();
- }
- }
-
- ScheduledExecutors.scheduleAtFixedRate(
- cleanupExec,
- Period.ZERO.toStandardDuration(),
- config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
- this::checkBlackListedNodes
- );
-
- provisioningService = provisioningStrategy.makeProvisioningService(this);
- lifecycleLock.started();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- lifecycleLock.exitStart();
- }
- }
-
- @Override
- @LifecycleStop
- public void stop()
- {
- if (!lifecycleLock.canStop()) {
- return;
- }
- try {
- log.info("Stopping RemoteTaskRunner...");
- provisioningService.close();
-
- Closer closer = Closer.create();
- for (ZkWorker zkWorker : zkWorkers.values()) {
- closer.register(zkWorker);
- }
- closer.register(workerPathCache);
- try {
- closer.close();
- }
- finally {
- workerStatusPathChildrenCacheExecutor.shutdown();
- }
-
- if (runPendingTasksExec != null) {
- runPendingTasksExec.shutdown();
- }
-
- if (cleanupExec != null) {
- cleanupExec.shutdown();
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- lifecycleLock.exitStop();
- }
- }
-
- @Override
- public List>> restore()
- {
- return ImmutableList.of();
- }
-
- @Override
- public void registerListener(TaskRunnerListener listener, Executor executor)
- {
- for (Pair pair : listeners) {
- if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
- throw new ISE("Listener [%s] already registered", listener.getListenerId());
- }
- }
-
- final Pair listenerPair = Pair.of(listener, executor);
-
- synchronized (statusLock) {
- for (Map.Entry entry : runningTasks.entrySet()) {
- TaskRunnerUtils.notifyLocationChanged(
- ImmutableList.of(listenerPair),
- entry.getKey(),
- entry.getValue().getLocation()
- );
- }
-
- log.info("Registered listener [%s]", listener.getListenerId());
- listeners.add(listenerPair);
- }
- }
-
- @Override
- public void unregisterListener(String listenerId)
- {
- for (Pair pair : listeners) {
- if (pair.lhs.getListenerId().equals(listenerId)) {
- listeners.remove(pair);
- log.info("Unregistered listener [%s]", listenerId);
- return;
- }
- }
- }
-
- @Override
- public Collection getWorkers()
- {
- return getImmutableWorkerFromZK(zkWorkers.values());
- }
-
- @Override
- public Collection getRunningTasks()
- {
- return ImmutableList.copyOf(runningTasks.values());
- }
-
- @Override
- public Collection getPendingTasks()
- {
- return ImmutableList.copyOf(pendingTasks.values());
- }
-
- @Override
- public Collection getPendingTaskPayloads()
- {
- // return a snapshot of current pending task payloads.
- return ImmutableList.copyOf(pendingTaskPayloads.values());
- }
-
- @Override
- public RemoteTaskRunnerConfig getConfig()
- {
- return config;
- }
-
- @Override
- public Collection getKnownTasks()
- {
- // Use a map to dedupe tasks, since they may transition from one state to another while this method is iterating
- // through the various collections.
- final Map items = new LinkedHashMap<>();
-
- // Racey, since there is a period of time during assignment when a task is neither pending nor running.
- for (RemoteTaskRunnerWorkItem item : pendingTasks.values()) {
- items.put(item.getTaskId(), item);
- }
-
- for (RemoteTaskRunnerWorkItem item : runningTasks.values()) {
- items.put(item.getTaskId(), item);
- }
-
- for (RemoteTaskRunnerWorkItem item : completeTasks.values()) {
- items.put(item.getTaskId(), item);
- }
-
- return ImmutableList.copyOf(items.values());
- }
-
- @Nullable
- @Override
- public RunnerTaskState getRunnerTaskState(String taskId)
- {
- if (pendingTasks.containsKey(taskId)) {
- return RunnerTaskState.PENDING;
- }
- if (runningTasks.containsKey(taskId)) {
- return RunnerTaskState.RUNNING;
- }
- if (completeTasks.containsKey(taskId)) {
- return RunnerTaskState.NONE;
- }
-
- return null;
- }
-
- @Override
- public TaskLocation getTaskLocation(String taskId)
- {
- if (pendingTasks.containsKey(taskId)) {
- return pendingTasks.get(taskId).getLocation();
- }
- if (runningTasks.containsKey(taskId)) {
- return runningTasks.get(taskId).getLocation();
- }
- if (completeTasks.containsKey(taskId)) {
- return completeTasks.get(taskId).getLocation();
- }
-
- return TaskLocation.unknown();
- }
-
- @Override
- public Optional getScalingStats()
- {
- return Optional.fromNullable(provisioningService.getStats());
- }
-
- @Nullable
- public ZkWorker findWorkerRunningTask(String taskId)
- {
- for (ZkWorker zkWorker : zkWorkers.values()) {
- if (zkWorker.isRunningTask(taskId)) {
- return zkWorker;
- }
- }
- return null;
- }
-
- /**
- * Retrieve {@link ZkWorker} based on an ID (host), or null if the ID doesn't exist.
- */
- @Nullable
- ZkWorker findWorkerId(String workerId)
- {
- return zkWorkers.get(workerId);
- }
-
- public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
- {
- return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);
- }
-
- /**
- * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
- *
- * @param task task to run
- */
- @Override
- public ListenableFuture run(final Task task)
- {
- final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
- if ((pendingTask = pendingTasks.get(task.getId())) != null) {
- log.info("Assigned a task[%s] that is already pending!", task.getId());
- runPendingTasks();
- return pendingTask.getResult();
- } else if ((runningTask = runningTasks.get(task.getId())) != null) {
- ZkWorker zkWorker = findWorkerRunningTask(task.getId());
- if (zkWorker == null) {
- log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
- } else {
- log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
- TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(runningTask, zkWorker, announcement.getTaskStatus());
- }
- }
- return runningTask.getResult();
- } else if ((completeTask = completeTasks.get(task.getId())) != null) {
- return completeTask.getResult();
- } else {
- RemoteTaskRunnerWorkItem workItem = addPendingTask(task);
- runPendingTasks();
- return workItem.getResult();
- }
- }
-
- /**
- * Finds the worker running the task and forwards the shutdown signal to the worker.
- *
- * @param taskId - task id to shutdown
- */
- @Override
- public void shutdown(final String taskId, String reason)
- {
- log.info("Shutdown [%s] because: [%s]", taskId, reason);
- if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
- log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
- } else if (pendingTasks.remove(taskId) != null) {
- pendingTaskPayloads.remove(taskId);
- log.info("Removed task from pending queue: %s", taskId);
- } else if (completeTasks.containsKey(taskId)) {
- cleanup(taskId);
- } else {
- final ZkWorker zkWorker = findWorkerRunningTask(taskId);
-
- if (zkWorker == null) {
- log.info("Can't shutdown! No worker running task %s", taskId);
- return;
- }
- URL url = null;
- try {
- url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/shutdown", taskId);
- final StatusResponseHolder response = httpClient.go(
- new Request(HttpMethod.POST, url),
- StatusResponseHandler.getInstance(),
- shutdownTimeout
- ).get();
-
- log.info(
- "Sent shutdown message to worker: %s, status %s, response: %s",
- zkWorker.getWorker().getHost(),
- response.getStatus(),
- response.getContent()
- );
-
- if (!HttpResponseStatus.OK.equals(response.getStatus())) {
- log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", url, taskId);
- }
- catch (Exception e) {
- throw new RE(e, "Error in handling post to [%s] for task [%s]", zkWorker.getWorker().getHost(), taskId);
- }
- }
- }
-
- @Override
- public Optional streamTaskLog(final String taskId, final long offset) throws IOException
- {
- final ZkWorker zkWorker = findWorkerRunningTask(taskId);
-
- if (zkWorker == null) {
- // Worker is not running this task, it might be available in deep storage
- return Optional.absent();
- } else {
- // Worker is still running this task
- final URL url = TaskRunnerUtils.makeWorkerURL(
- zkWorker.getWorker(),
- "/druid/worker/v1/task/%s/log?offset=%s",
- taskId,
- Long.toString(offset)
- );
- try {
- return Optional.of(httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- }
-
-
- @Override
- public Optional streamTaskReports(final String taskId) throws IOException
- {
- final ZkWorker zkWorker = findWorkerRunningTask(taskId);
-
- if (zkWorker == null) {
- // Worker is not running this task, it might be available in deep storage
- return Optional.absent();
- }
-
- final RemoteTaskRunnerWorkItem runningWorkItem = runningTasks.get(taskId);
-
- if (runningWorkItem == null) {
- // Worker very recently exited.
- return Optional.absent();
- }
-
- final TaskLocation taskLocation = runningWorkItem.getLocation();
-
- if (TaskLocation.unknown().equals(taskLocation)) {
- // No location known for this task. It may have not been assigned one yet.
- return Optional.absent();
- }
-
- final URL url = TaskRunnerUtils.makeTaskLocationURL(
- taskLocation,
- "/druid/worker/v1/chat/%s/liveReports",
- taskId
- );
-
- return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, url);
- }
-
-
- /**
- * Adds a task to the pending queue.
- * {@link #runPendingTasks()} should be called to run the pending task.
- */
- @VisibleForTesting
- RemoteTaskRunnerWorkItem addPendingTask(final Task task)
- {
- log.info("Added pending task %s", task.getId());
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
- task.getId(),
- task.getType(),
- null,
- null,
- task.getDataSource()
- );
- pendingTaskPayloads.put(task.getId(), task);
- pendingTasks.put(task.getId(), taskRunnerWorkItem);
- return taskRunnerWorkItem;
- }
-
- /**
- * This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that
- * are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
- * This method should be run each time there is new worker capacity or if new tasks are assigned.
- */
- @VisibleForTesting
- void runPendingTasks()
- {
- runPendingTasksExec.submit(
- (Callable) () -> {
- try {
- // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
- // into running status
- List copy = Lists.newArrayList(pendingTasks.values());
- sortByInsertionTime(copy);
-
- for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
- runPendingTask(taskRunnerWorkItem);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Exception in running pending tasks").emit();
- }
-
- return null;
- }
- );
- }
-
- /**
- * Run one pending task. This method must be called in the same class except for unit tests.
- */
- @VisibleForTesting
- void runPendingTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
- {
- String taskId = taskRunnerWorkItem.getTaskId();
- if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
- try {
- //this can still be null due to race from explicit task shutdown request
- //or if another thread steals and completes this task right after this thread makes copy
- //of pending tasks. See https://github.com/apache/druid/issues/2842 .
- Task task = pendingTaskPayloads.get(taskId);
- if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
- pendingTaskPayloads.remove(taskId);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Exception while trying to assign task")
- .addData("taskId", taskRunnerWorkItem.getTaskId())
- .emit();
- RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
- if (workItem != null) {
- taskComplete(
- workItem,
- null,
- TaskStatus.failure(
- taskId,
- StringUtils.format("Failed to assign this task. See overlord logs for more details.")
- )
- );
- }
- }
- finally {
- tryAssignTasks.remove(taskId);
- }
- }
- }
-
- @VisibleForTesting
- static void sortByInsertionTime(List tasks)
- {
- Collections.sort(tasks, Comparator.comparing(RemoteTaskRunnerWorkItem::getQueueInsertionTime));
- }
-
- /**
- * Removes a task from the complete queue and clears out the ZK status path of the task.
- *
- * @param taskId - the task to cleanup
- */
- private void cleanup(final String taskId)
- {
- if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
- return;
- }
- final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
- final Worker worker;
- if (removed == null || (worker = removed.getWorker()) == null) {
- log.makeAlert("Asked to cleanup nonexistent task")
- .addData("taskId", taskId)
- .emit();
- } else {
- final String workerId = worker.getHost();
- log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
- final String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId, taskId);
- try {
- cf.delete().guaranteed().forPath(statusPath);
- }
- catch (KeeperException.NoNodeException e) {
- log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Ensures no workers are already running a task before assigning the task to a worker.
- * It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR
- * needs to bootstrap after a restart.
- *
- * @param taskRunnerWorkItem - the task to assign
- * @return true iff the task is now assigned
- */
- private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
- {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
-
- if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
- log.info("Task[%s] already running.", task.getId());
- return true;
- } else {
- // Nothing running this task, announce it in ZK for a worker to run it
- WorkerBehaviorConfig workerConfig = workerConfigRef.get();
- WorkerSelectStrategy strategy;
- if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
- strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
- log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());
- } else {
- strategy = workerConfig.getSelectStrategy();
- }
-
- ZkWorker assignedWorker = null;
- final ImmutableWorkerInfo immutableZkWorker;
- try {
- synchronized (workersWithUnacknowledgedTask) {
- immutableZkWorker = strategy.findWorkerForTask(
- config,
- ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
- task
- );
-
- if (immutableZkWorker != null &&
- workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
- == null) {
- assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
- }
- }
-
- if (assignedWorker != null) {
- return announceTask(task, assignedWorker, taskRunnerWorkItem);
- } else {
- log.debug(
- "Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
- task.getId(),
- zkWorkers.values(),
- workersWithUnacknowledgedTask
- );
- }
-
- return false;
- }
- finally {
- if (assignedWorker != null) {
- workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
- //if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
- runPendingTasks();
- }
- }
- }
- }
-
- Map getWorkersEligibleToRunTasks()
- {
- return Maps.transformEntries(
- Maps.filterEntries(
- zkWorkers,
- input -> !lazyWorkers.containsKey(input.getKey()) &&
- !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
- !blackListedWorkers.contains(input.getValue())
- ),
- (String key, ZkWorker value) -> value.toImmutable()
- );
- }
-
- /**
- * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
- * removing the task ZK entry and creating a task status ZK entry.
- *
- * @param theZkWorker The worker the task is assigned to
- * @param taskRunnerWorkItem The task to be assigned
- * @return boolean indicating whether the task was successfully assigned or not
- */
- private boolean announceTask(
- final Task task,
- final ZkWorker theZkWorker,
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem
- ) throws Exception
- {
- Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
- final String worker = theZkWorker.getWorker().getHost();
- synchronized (statusLock) {
- if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
- // the worker might have been killed or marked as lazy
- log.debug("Not assigning task to already removed worker[%s]", worker);
- return false;
- }
- log.info("Assigning task [%s] to worker [%s]", task.getId(), worker);
-
- CuratorUtils.createIfNotExists(
- cf,
- JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
- CreateMode.EPHEMERAL,
- jsonMapper.writeValueAsBytes(task),
- config.getMaxZnodeBytes()
- );
-
- RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
- if (workItem == null) {
- log.makeAlert("Ignoring null work item from pending task queue")
- .addData("taskId", task.getId())
- .emit();
- return false;
- }
-
- final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
- emitter.emit(metricBuilder.setMetric(
- "task/pending/time",
- new Duration(workItem.getQueueInsertionTime(), DateTimes.nowUtc()).getMillis())
- );
-
- RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
- runningTasks.put(task.getId(), newWorkItem);
- log.info("Task [%s] started running on worker [%s]", task.getId(), newWorkItem.getWorker().getHost());
- TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
-
- // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
- // on a worker - this avoids overflowing a worker with tasks
- Stopwatch timeoutStopwatch = Stopwatch.createStarted();
- while (!isWorkerRunningTask(theZkWorker, task.getId())) {
- final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
- statusLock.wait(waitMs);
- long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
- if (elapsed >= waitMs) {
- log.makeAlert(
- "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
- worker,
- task.getId(),
- elapsed,
- config.getTaskAssignmentTimeout()
- ).emit();
- taskComplete(
- taskRunnerWorkItem,
- theZkWorker,
- TaskStatus.failure(
- task.getId(),
- StringUtils.format(
- "The worker that this task is assigned did not start it in timeout[%s]. "
- + "See overlord logs for more details.",
- config.getTaskAssignmentTimeout()
- )
- )
- );
- break;
- }
- }
- return true;
- }
- }
-
- private boolean cancelWorkerCleanup(String workerHost)
- {
- ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost);
- if (previousCleanup != null) {
- log.info("Cancelling Worker[%s] scheduled task cleanup", workerHost);
- previousCleanup.cancel(false);
- }
- return previousCleanup != null;
- }
-
- /**
- * When a new worker appears, listeners are registered for status changes associated with tasks assigned to
- * the worker. Status changes indicate the creation or completion of a task.
- * The RemoteTaskRunner updates state according to these changes.
- *
- * @param worker contains metadata for a worker that has appeared in ZK
- * @return future that will contain a fully initialized worker
- */
- private ListenableFuture addWorker(final Worker worker)
- {
- log.info("Worker[%s] reportin' for duty!", worker.getHost());
-
- try {
- cancelWorkerCleanup(worker.getHost());
-
- final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
- final PathChildrenCache statusCache = workerStatusPathChildrenCacheFactory.make(cf, workerStatusPath);
- final SettableFuture retVal = SettableFuture.create();
- final ZkWorker zkWorker = new ZkWorker(
- worker,
- statusCache,
- jsonMapper
- );
-
- // Add status listener to the watcher for status changes
- zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
- zkWorker.start();
- return retVal;
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @VisibleForTesting
- PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture retVal)
- {
- return (client, event) -> {
- final String taskId;
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
- synchronized (statusLock) {
- try {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_UPDATED:
- if (event.getData() == null) {
- log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
- log.makeAlert("Unexpected null for event.getData() in handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("eventType", event.getType().toString())
- .emit();
- return;
- }
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- final TaskAnnouncement announcement = jsonMapper.readValue(
- event.getData().getData(), TaskAnnouncement.class
- );
-
- log.info(
- "Worker[%s] wrote %s status for task [%s] on [%s]",
- zkWorker.getWorker().getHost(),
- announcement.getTaskStatus().getStatusCode(),
- taskId,
- announcement.getTaskLocation()
- );
-
- // Synchronizing state with ZK
- statusLock.notifyAll();
-
- final RemoteTaskRunnerWorkItem tmp;
- if ((tmp = runningTasks.get(taskId)) != null) {
- taskRunnerWorkItem = tmp;
- } else {
- final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
- taskId,
- announcement.getTaskType(),
- zkWorker.getWorker(),
- TaskLocation.unknown(),
- announcement.getTaskDataSource()
- );
- final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
- taskId,
- newTaskRunnerWorkItem
- );
- if (existingItem == null) {
- log.warn(
- "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
- zkWorker.getWorker().getHost(),
- taskId
- );
- taskRunnerWorkItem = newTaskRunnerWorkItem;
- } else {
- taskRunnerWorkItem = existingItem;
- }
- }
-
- if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
- taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
- TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
- }
-
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
- runPendingTasks();
- }
- break;
- case CHILD_REMOVED:
- if (event.getData() == null) {
- log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
- log.makeAlert("Unexpected null for event.getData() in handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("eventType", event.getType().toString())
- .emit();
- return;
- }
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- taskRunnerWorkItem = runningTasks.remove(taskId);
- if (taskRunnerWorkItem != null) {
- log.warn("Task[%s] just disappeared!", taskId);
- final TaskStatus taskStatus = TaskStatus.failure(
- taskId,
- "The worker that this task was assigned disappeared. See overlord logs for more details."
- );
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId, taskStatus);
- } else {
- log.info("Task[%s] went bye bye.", taskId);
- }
- break;
- case INITIALIZED:
- if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
- retVal.set(zkWorker);
- } else {
- final String message = StringUtils.format(
- "This should not happen...tried to add already-existing worker[%s]",
- worker.getHost()
- );
- log.makeAlert(message)
- .addData("workerHost", worker.getHost())
- .addData("workerIp", worker.getIp())
- .emit();
- retVal.setException(new IllegalStateException(message));
- }
- runPendingTasks();
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
- }
- }
- catch (Exception e) {
- String znode = null;
- if (event.getData() != null) {
- znode = event.getData().getPath();
- }
- log.makeAlert(e, "Failed to handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("znode", znode)
- .addData("eventType", event.getType().toString())
- .emit();
- }
- }
- };
- }
-
- /**
- * We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
- * dropping themselves and re-announcing.
- */
- private void updateWorker(final Worker worker)
- {
- final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
- if (zkWorker != null) {
- log.info("Worker[%s] updated its announcement from[%s] to[%s].", worker.getHost(), zkWorker.getWorker(), worker);
- zkWorker.setWorker(worker);
- } else {
- log.warn(
- "Worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.",
- worker.getHost()
- );
- }
- }
-
- /**
- * When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
- * the logic in the status listener. We still have to make sure there are no tasks assigned
- * to the worker but not yet running.
- *
- * @param worker - the removed worker
- */
- private void removeWorker(final Worker worker)
- {
- log.info("Kaboom! Worker[%s] removed!", worker.getHost());
-
- final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
- if (zkWorker != null) {
- try {
- scheduleTasksCleanupForWorker(worker.getHost(), getAssignedTasks(worker));
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- try {
- zkWorker.close();
- }
- catch (Exception e) {
- log.error(e, "Exception closing worker[%s]!", worker.getHost());
- }
- zkWorkers.remove(worker.getHost());
- checkBlackListedNodes();
- }
- }
- lazyWorkers.remove(worker.getHost());
- }
-
- /**
- * Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail"
- * if they are being run by "worker".
- */
- private void scheduleTasksCleanupForWorker(final String worker, final List tasksToFail)
- {
- // This method is only called from the PathChildrenCache event handler, so this may look like a race,
- // but is actually not.
- cancelWorkerCleanup(worker);
-
- final ListenableScheduledFuture> cleanupTask = cleanupExec.schedule(
- () -> {
- log.info("Running scheduled cleanup for Worker[%s]", worker);
- try {
- for (String assignedTask : tasksToFail) {
- String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
- String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
- if (cf.checkExists().forPath(taskPath) != null) {
- cf.delete().guaranteed().forPath(taskPath);
- }
-
- if (cf.checkExists().forPath(statusPath) != null) {
- cf.delete().guaranteed().forPath(statusPath);
- }
-
- log.info("Failing task[%s]", assignedTask);
- RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
- if (taskRunnerWorkItem != null) {
- final TaskStatus taskStatus = TaskStatus.failure(
- assignedTask,
- StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.")
- );
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, taskStatus);
- } else {
- log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
- }
- }
-
- // worker is gone, remove worker task status announcements path.
- String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
- if (cf.checkExists().forPath(workerStatusPath) != null) {
- cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
- }
- }
- catch (Exception e) {
- log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
- throw new RuntimeException(e);
- }
- },
- config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
- TimeUnit.MILLISECONDS
- );
-
- removedWorkerCleanups.put(worker, cleanupTask);
-
- // Remove this entry from removedWorkerCleanups when done, if it's actually the one in there.
- Futures.addCallback(
- cleanupTask,
- new FutureCallback