diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md index d321af143020..73a365e2ff91 100644 --- a/docs/api-reference/supervisor-api.md +++ b/docs/api-reference/supervisor-api.md @@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1 ``` +### Reset offsets and start a backfill supervisor + +Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range. + +This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset. + +The following requirements must be met before calling this endpoint: + +- The supervisor must be a `SeekableStreamSupervisor`. +- The supervisor's `useEarliestSequenceNumber` property must be `false`. +- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks. +- The supervisor must be in a `RUNNING` state so that it can query the latest offsets from the stream. + +The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only. + +#### URL + +`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetOffsetsAndBackfill` + +#### Query parameters + +| Parameter | Type | Description | Default | +|---------|---------|---------|---------| +| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. If not specified, inherits `taskCount` from the source supervisor. | None | + +#### Responses + + + + + + +*Successfully reset and started backfill supervisor* + + + + + +*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)* + + + + + +*Invalid supervisor ID* + + + + + +*Failed to retrieve stream offsets or serialize the backfill spec* + + + + +--- + +#### Sample request + +The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks. + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/resetOffsetsAndBackfill?backfillTaskCount=2 HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ View the response + + ```json +{ + "id": "social_media", + "backfillSupervisorId": "social_media_backfill_abcdefgh" +} + ``` +
+ ### Terminate a supervisor Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart. diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 6099105b3374..04973a5272fd 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -322,7 +322,7 @@ protected Map getTimeLagPerPartition(Map currentOffs } @Override - protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) + public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map)); } @@ -408,7 +408,7 @@ public LagStats computeLagStats() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { getRecordSupplierLock().lock(); @@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream() } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>(); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 727eb52db272..5863284cc2d9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -356,7 +356,7 @@ protected Map getTimeLagPerPartition(Map map) + public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map)); } @@ -548,7 +548,7 @@ private Map getTimestampPerPartitionAtCurrentOffset(S *

*/ @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { if (getIoConfig().isEmitTimeLagMetrics()) { updatePartitionTimeAndRecordLagFromStream(); @@ -597,7 +597,7 @@ private void updateOffsetSnapshot( } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return offsetSnapshotRef.get().getLatestOffsetsFromStream(); } @@ -630,7 +630,7 @@ protected boolean isMultiTopic() * Gets the offsets as stored in the metadata store. The map returned will only contain * offsets from topic partitions that match the current supervisor config stream. This * override is needed because in the case of multi-topic, a user could have updated the supervisor - * config from single topic to mult-topic, where the new multi-topic pattern regex matches the + * config from single topic to multi-topic, where the new multi-topic pattern regex matches the * old config single topic. Without this override, the previously stored metadata for the single * topic would be deemed as different from the currently configure stream, and not be included in * the offset map returned. This implementation handles these cases appropriately. @@ -640,7 +640,7 @@ protected boolean isMultiTopic() * updated to single topic or multi-topic depending on the supervisor config, as needed. */ @Override - protected Map getOffsetsFromMetadataStorage() + public Map getOffsetsFromMetadataStorage() { final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); if (checkSourceMetadataMatch(dataSourceMetadata)) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 0f91fc0965db..3f1f4034f3ce 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -321,7 +321,7 @@ protected Map getTimeLagPerPartition(Map currentOf } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -336,7 +336,7 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; // this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 52f3cba7fc11..4ca373fa2c42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -21,11 +21,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.error.NotFound; @@ -35,8 +38,11 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -129,33 +135,8 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; - if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { - SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; - Map context = seekableStreamSupervisorSpec.getContext(); - if (context != null) { - Boolean useConcurrentLocks = QueryContexts.getAsBoolean( - Tasks.USE_CONCURRENT_LOCKS, - context.get(Tasks.USE_CONCURRENT_LOCKS) - ); - if (useConcurrentLocks == null) { - TaskLockType taskLockType = QueryContexts.getAsEnum( - Tasks.TASK_LOCK_TYPE, - context.get(Tasks.TASK_LOCK_TYPE), - TaskLockType.class - ); - if (taskLockType == null) { - hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; - } else if (taskLockType == TaskLockType.APPEND) { - hasAppendLock = true; - } else { - hasAppendLock = false; - } - } else { - hasAppendLock = useConcurrentLocks; - } - } - } + boolean hasAppendLock = supervisorSpec instanceof SeekableStreamSupervisorSpec + && specHasConcurrentLocks((SeekableStreamSupervisorSpec) supervisorSpec); if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() @@ -393,6 +374,115 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData return true; } + /** + * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to + * process the skipped range from the previously checkpointed offsets up to the latest offsets. + * + * @param id supervisor ID + * @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec + * @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"} + * @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor}, + * if {@code useEarliestSequenceNumber} is true, + * if {@code useConcurrentLocks} is not set to true in the supervisor context, + * or if the supervisor is not in a RUNNING state + * @throws IllegalStateException if the latest or checkpointed offsets cannot be retrieved, + * or if the backfill spec cannot be serialized + */ + public Map resetSupervisorAndBackfill(String id, @Nullable Integer backfillTaskCount) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(id, "id"); + + Pair supervisorPair = supervisors.get(id); + if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) { + throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id); + } + SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs; + SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs; + + // Verify useEarliestOffset is false + if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) { + throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true."); + } + + if (!specHasConcurrentLocks(streamSpec)) { + throw new IAE( + "Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks" + ); + } + + // We need an active recordSupplier to query the latest offsets from the stream + if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) { + throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id); + } + + log.info("Capturing latest offsets from stream for supervisor[%s]", id); + streamSupervisor.updatePartitionLagFromStream(); + Map endOffsets = streamSupervisor.getLatestSequencesFromStream(); + + log.info("Capturing checkpointed offsets for supervisor[%s]", id); + Map startOffsets = streamSupervisor.getOffsetsFromMetadataStorage(); + + // Validate that we successfully retrieved offsets + if (endOffsets == null || endOffsets.isEmpty()) { + throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id); + } + if (startOffsets == null || startOffsets.isEmpty()) { + throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id); + } + + String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill"); + + try { + Map normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class); + Map normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class); + BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets); + SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, backfillSupervisorId, boundedStreamConfig, backfillTaskCount); + createOrUpdateAndStartSupervisor(backfillSpec); + } + catch (JsonProcessingException e) { + throw new ISE(e, "Failed to create backfill supervisor spec for supervisor[%s]", id); + } + + log.info("Started backfill supervisor[%s] for supervisor[%s]", backfillSupervisorId, id); + + log.info("Resetting supervisor[%s] metadata to latest offsets", id); + DataSourceMetadata resetMetadata = streamSupervisor.createDataSourceMetaDataForReset( + streamSupervisor.getIoConfig().getStream(), + endOffsets + ); + + streamSupervisor.resetOffsets(resetMetadata); + + // Reset autoscaler if present + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.reset(); + } + + return ImmutableMap.of( + "id", id, + "backfillSupervisorId", backfillSupervisorId + ); + } + + SupervisorSpec createBackfillSpec( + SeekableStreamSupervisorSpec sourceSpec, + String backfillSupervisorId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer backfillTaskCount + ) throws JsonProcessingException + { + ObjectNode specNode = jsonMapper.valueToTree(sourceSpec); + specNode.put("id", backfillSupervisorId); + ObjectNode ioConfigNode = (ObjectNode) specNode.path("spec").path("ioConfig"); + ioConfigNode.set("boundedStreamConfig", jsonMapper.valueToTree(boundedStreamConfig)); + if (backfillTaskCount != null) { + ioConfigNode.put("taskCount", backfillTaskCount); + } + return jsonMapper.treeToValue(specNode, SupervisorSpec.class); + } + public boolean checkPointDataSourceMetadata( String supervisorId, int taskGroupId, @@ -631,4 +721,29 @@ private SupervisorSpec getSpec(String id) return supervisor == null ? null : supervisor.rhs; } } + + /** + * Returns true if the spec's context enables concurrent (append) locks, accepting both + * {@code useConcurrentLocks: true} (or any truthy string) and {@code taskLockType: APPEND}. + */ + private static boolean specHasConcurrentLocks(SeekableStreamSupervisorSpec spec) + { + Map context = spec.getContext(); + if (context == null) { + return false; + } + Boolean useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS) + ); + if (useConcurrentLocks != null) { + return useConcurrentLocks; + } + TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + return taskLockType == TaskLockType.APPEND; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index aff9edf19af9..af8c7adc7664 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -640,6 +640,50 @@ private Response handleResetRequest( ); } + @POST + @Path("/{id}/resetOffsetsAndBackfill") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response resetOffsetsAndBackfill( + @PathParam("id") final String id, + @QueryParam("backfillTaskCount") @Nullable final Integer backfillTaskCount + ) + { + return handleResetAndBackfill(id, backfillTaskCount); + } + + private Response handleResetAndBackfill(final String id, @Nullable final Integer backfillTaskCount) + { + if (backfillTaskCount != null && backfillTaskCount < 1) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", "backfillTaskCount must be a positive integer")) + .build(); + } + return asLeaderWithSupervisorManager( + manager -> { + if (!manager.getSupervisorIds().contains(id)) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); + } + try { + Map result = manager.resetSupervisorAndBackfill(id, backfillTaskCount); + return Response.ok(result).build(); + } + catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + catch (Exception e) { + return Response.serverError() + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + } + ); + } + private Response asLeaderWithSupervisorManager(Function f) { Optional supervisorManager = taskMaster.getSupervisorManager(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 91b4244c0bf3..fdf7563873d4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3274,7 +3274,7 @@ private boolean updatePartitionDataFromStream() /** * gets mapping of partitions in stream to their latest offsets. */ - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return new HashMap<>(); } @@ -4552,7 +4552,7 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti } } - protected Map getOffsetsFromMetadataStorage() + public Map getOffsetsFromMetadataStorage() { final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata @@ -4939,7 +4939,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept coalesceAndAwait(futures); } - protected abstract void updatePartitionLagFromStream(); + public abstract void updatePartitionLagFromStream(); /** * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. @@ -5196,7 +5196,7 @@ protected abstract List sequence * @return specific instance of datasource metadata */ - protected abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 525444e23dea..5815952ea626 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -19,13 +19,19 @@ package org.apache.druid.indexing.overlord.supervisor; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.InvalidInput; @@ -35,7 +41,11 @@ import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -43,6 +53,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.metrics.SupervisorStatsProvider; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -1068,6 +1079,205 @@ public void test_isAnotherTaskGroupPublishingToPartitions() ); } + @Test + public void testResetSupervisorAndBackfill() throws Exception + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of()); + replayAll(); + manager.start(); + + final ConcurrentHashMap> supervisorsMap = getSupervisorsMap(); + final SeekableStreamSupervisorSpec streamSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); + final SeekableStreamSupervisor streamSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); + final SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createNiceMock(SeekableStreamSupervisorIOConfig.class); + + // non-SeekableStream supervisor → IAE + // Use a concrete anonymous Supervisor (not a mock) to reliably fail instanceof SeekableStreamSupervisor + final Supervisor nonStreamSupervisor = new Supervisor() + { + @Override + public void start() + { + } + + @Override + public void stop(boolean stopGracefully) + { + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return null; + } + + @Override + public void reset(DataSourceMetadata dataSourceMetadata) + { + } + }; + supervisorsMap.put("id1", Pair.of(nonStreamSupervisor, streamSpec)); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + + // useEarliestSequenceNumber=true → IAE + supervisorsMap.put("id1", Pair.of(streamSupervisor, streamSpec)); + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(true).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks not set (null context) → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(null).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks=false → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", false)).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks="true" (string) → accepted, fails at next guard (not RUNNING) + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", "true")).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // taskLockType=APPEND → accepted, fails at next guard (not RUNNING) + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("taskLockType", "APPEND")).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // supervisor not RUNNING → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // empty latest offsets → ISE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once(); + streamSupervisor.updatePartitionLagFromStream(); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of()).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalStateException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // empty start offsets from metadata → ISE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once(); + streamSupervisor.updatePartitionLagFromStream(); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of("0", 100L)).once(); + EasyMock.expect(streamSupervisor.getOffsetsFromMetadataStorage()).andReturn(ImmutableMap.of()).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalStateException.class, + () -> manager.resetSupervisorAndBackfill("id1", null) + ); + + verifyAll(); + } + + @Test + public void testCreateBackfillSpec() throws Exception + { + final ObjectMapper localMapper = new DefaultObjectMapper(); + localMapper.registerSubtypes( + new NamedType(TestBackfillSupervisorSpec.class, "testBackfill"), + new NamedType(TestBackfillSupervisorSpec.IngestionSpec.class, "testBackfillIngestionSpec"), + new NamedType(TestBackfillSupervisorSpec.IOConfig.class, "testBackfillIOConfig") + ); + + final SupervisorManager localManager = new SupervisorManager(localMapper, metadataSupervisorManager); + + final TestBackfillSupervisorSpec.IOConfig ioConfig = new TestBackfillSupervisorSpec.IOConfig("test-stream", null, null); + final TestBackfillSupervisorSpec.IngestionSpec ingestionSpec = new TestBackfillSupervisorSpec.IngestionSpec(ioConfig); + final SeekableStreamSupervisorSpec sourceSpec = new TestBackfillSupervisorSpec("original-id", ingestionSpec); + + final BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig( + ImmutableMap.of("0", 100L), + ImmutableMap.of("0", 200L) + ); + + // Without overriding taskCount + final SupervisorSpec backfillSpec = localManager.createBackfillSpec( + sourceSpec, + "backfill-id", + boundedStreamConfig, + null + ); + Assert.assertEquals("backfill-id", backfillSpec.getId()); + final TestBackfillSupervisorSpec backfillCast = (TestBackfillSupervisorSpec) backfillSpec; + final BoundedStreamConfig actualConfig = backfillCast.getIoConfig().getBoundedStreamConfig(); + Assert.assertNotNull(actualConfig); + Assert.assertEquals(ImmutableMap.of("0", 100L), actualConfig.getStartSequenceNumbers()); + Assert.assertEquals(ImmutableMap.of("0", 200L), actualConfig.getEndSequenceNumbers()); + Assert.assertEquals(1, backfillCast.getIoConfig().getTaskCount()); + + // With overriding taskCount + final SupervisorSpec backfillSpecWithCount = localManager.createBackfillSpec( + sourceSpec, + "backfill-id-2", + boundedStreamConfig, + 5 + ); + Assert.assertEquals("backfill-id-2", backfillSpecWithCount.getId()); + final TestBackfillSupervisorSpec backfillWithCount = (TestBackfillSupervisorSpec) backfillSpecWithCount; + Assert.assertEquals(5, backfillWithCount.getIoConfig().getTaskCount()); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; @@ -1137,4 +1347,90 @@ public List getDataSources() return Collections.singletonList(id); } } + + @JsonTypeName("testBackfill") + private static class TestBackfillSupervisorSpec extends SeekableStreamSupervisorSpec + { + @JsonCreator + TestBackfillSupervisorSpec( + @JsonProperty("id") String id, + @JsonProperty("spec") IngestionSpec ingestionSpec + ) + { + super( + id, + ingestionSpec, + ImmutableMap.of("useConcurrentLocks", true), + false, + null, null, null, null, + MAPPER, + null, null, null, null + ); + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public String getType() + { + return "testBackfill"; + } + + @Override + public String getSource() + { + return "test-stream"; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return this; + } + + @Override + public SeekableStreamSupervisorIOConfig getIoConfig() + { + return getSpec().getIOConfig(); + } + + @JsonTypeName("testBackfillIngestionSpec") + static class IngestionSpec extends SeekableStreamSupervisorIngestionSpec + { + @JsonCreator + IngestionSpec( + @JsonProperty("ioConfig") IOConfig ioConfig + ) + { + super( + new DataSchema( + "testDS", + new TimestampSpec("time", "auto", null), + new DimensionsSpec(Collections.emptyList()), + null, null, null, null, null + ), + ioConfig, + null + ); + } + } + + @JsonTypeName("testBackfillIOConfig") + static class IOConfig extends SeekableStreamSupervisorIOConfig + { + @JsonCreator + IOConfig( + @JsonProperty("stream") String stream, + @JsonProperty("taskCount") Integer taskCount, + @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig + ) + { + super(stream, null, 1, taskCount, null, null, null, false, null, null, null, null, LagAggregator.DEFAULT, null, null, null, null, boundedStreamConfig); + } + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 4ccf4659994f..a4251fccf3be 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1379,6 +1379,100 @@ public void testResetOffsets() verifyAll(); } + @Test + public void testResetOffsetsAndBackfill() + { + // 200 - success + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetSupervisorAndBackfill("my-id", null)) + .andReturn(ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh")); + replayAll(); + + Response response = supervisorResource.resetOffsetsAndBackfill("my-id", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 404 - supervisor does not exist + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of()); + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", null); + Assert.assertEquals(404, response.getStatus()); + verifyAll(); + resetAll(); + + // 400 - IAE (e.g. supervisor not running) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetSupervisorAndBackfill("my-id", null)) + .andThrow(new IllegalArgumentException("Supervisor[my-id] must be in a RUNNING state")); + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", null); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "Supervisor[my-id] must be in a RUNNING state"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 500 - ISE (e.g. failed to retrieve offsets) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetSupervisorAndBackfill("my-id", null)) + .andThrow(new IllegalStateException("Failed to get latest offsets from stream")); + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", null); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "Failed to get latest offsets from stream"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 400 - invalid backfillTaskCount (zero) + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", 0); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "backfillTaskCount must be a positive integer"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 400 - invalid backfillTaskCount (negative) + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", -1); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "backfillTaskCount must be a positive integer"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 503 - no supervisor manager (not leader) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + response = supervisorResource.resetOffsetsAndBackfill("my-id", null); + Assert.assertEquals(503, response.getStatus()); + verifyAll(); + } + @Test public void testNoopSupervisorSpecSerde() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index eff5d1acd980..e19d68cb2b3f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -3059,7 +3059,7 @@ public String toString() final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor() { @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -3284,7 +3284,7 @@ protected String baseTaskName() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { // do nothing } @@ -3381,7 +3381,7 @@ protected boolean doesTaskMatchSupervisor(Task task) } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -3521,7 +3521,7 @@ public LagStats computeLagStats() } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return streamOffsets; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java index 4eefaed9bd99..0488670e1e48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java @@ -124,7 +124,7 @@ protected String baseTaskName() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { // do nothing } @@ -205,7 +205,7 @@ protected boolean doesTaskMatchSupervisor(Task task) } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map )