Skip to content

Commit 760e7e4

Browse files
authored
Add minor fixes to follow up #19091 (#19119)
* Add minor fixes to follow up #19091 * Reset state if scaling fails * Add anotehr test * Fixes * Remove extra subtype
1 parent b7ac0bf commit 760e7e4

9 files changed

Lines changed: 236 additions & 29 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -433,33 +433,34 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
433433
/**
434434
* Checks if there is a Task distinct from the given {@code taskId} or its replicas
435435
* that is currently waiting to publish offsets for the given partitions.
436+
*
437+
* @return true only if the given {@param supervisorId} represents a
438+
* {@link SeekableStreamSupervisor} and the supervisor has other tasks that
439+
* are currently publishing offsets to an overlapping set of partitions.
436440
*/
437441
public boolean isAnotherTaskGroupPublishingToPartitions(
438442
String supervisorId,
439443
String taskId,
440444
DataSourceMetadata startMetadata
441445
)
442446
{
443-
try {
444-
InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null");
445-
if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
446-
throw InvalidInput.exception(
447-
"Start metadata[%s] of type[%s] is not valid streaming metadata",
448-
supervisorId, startMetadata.getClass()
449-
);
450-
}
447+
InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null");
448+
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
449+
if (supervisor == null || supervisor.rhs == null) {
450+
throw NotFound.exception("Could not find supervisor[%s]", supervisorId);
451+
}
452+
if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
453+
return false;
454+
}
451455

452-
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
453-
if (supervisor == null || supervisor.rhs == null) {
454-
throw NotFound.exception("Could not find supervisor[%s]", supervisorId);
455-
}
456-
if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
457-
throw InvalidInput.exception(
458-
"Supervisor[%s] of type[%s] is not a streaming supervisor",
459-
supervisorId, supervisor.rhs.getType()
460-
);
461-
}
456+
if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
457+
throw InvalidInput.exception(
458+
"Start metadata[%s] of type[%s] is not valid streaming metadata",
459+
startMetadata, startMetadata == null ? null : startMetadata.getClass()
460+
);
461+
}
462462

463+
try {
463464
final Set<Object> partitionIds = Set.copyOf(
464465
((SeekableStreamDataSourceMetadata<?, ?>) startMetadata)
465466
.getSeekableStreamSequenceNumbers()

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3580,7 +3580,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
35803580
* during a task rollover based on the recommendations from the task auto-scaler.
35813581
*/
35823582
@VisibleForTesting
3583-
void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedException
3583+
void maybeScaleDuringTaskRollover()
35843584
{
35853585
if (taskAutoScaler == null) {
35863586
// Do nothing
@@ -3594,11 +3594,16 @@ void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedExcept
35943594
supervisorId, currentTaskCount, rolloverTaskCount
35953595
);
35963596
isScalingTasksOnRollover.set(true);
3597-
new DynamicAllocationTasksNotice(
3598-
() -> rolloverTaskCount,
3599-
() -> isScalingTasksOnRollover.set(false),
3600-
emitter
3601-
).handle();
3597+
try {
3598+
new DynamicAllocationTasksNotice(
3599+
() -> rolloverTaskCount,
3600+
() -> {},
3601+
emitter
3602+
).handle();
3603+
}
3604+
finally {
3605+
isScalingTasksOnRollover.set(false);
3606+
}
36023607
}
36033608
}
36043609
}

indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.druid.indexing.overlord.SegmentPublishResult;
3333
import org.apache.druid.indexing.overlord.Segments;
3434
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
35+
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
3536
import org.apache.druid.java.util.common.Intervals;
3637
import org.apache.druid.timeline.DataSegment;
3738
import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -42,6 +43,8 @@
4243
import org.junit.Rule;
4344
import org.junit.Test;
4445

46+
import java.util.List;
47+
4548
public class SegmentTransactionalInsertActionTest
4649
{
4750
@Rule
@@ -190,6 +193,9 @@ public void test_fail_transactionalUpdateDataSourceMetadata() throws Exception
190193
actionTestKit.getTaskLockbox().add(task);
191194
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
192195

196+
final NoopSupervisorSpec supervisorSpec = new NoopSupervisorSpec(SUPERVISOR_ID, List.of(DATA_SOURCE));
197+
actionTestKit.getSupervisorManager().createOrUpdateAndStartSupervisor(supervisorSpec);
198+
193199
SegmentPublishResult result = SegmentTransactionalInsertAction.appendAction(
194200
ImmutableSet.of(SEGMENT1),
195201
SUPERVISOR_ID,

indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
3737
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
3838
import org.apache.druid.metadata.MetadataStorageTablesConfig;
39+
import org.apache.druid.metadata.SQLMetadataSupervisorManager;
3940
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
4041
import org.apache.druid.metadata.TestDerbyConnector;
4142
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
@@ -64,6 +65,7 @@ public class TaskActionTestKit extends ExternalResource
6465
private TestDerbyConnector testDerbyConnector;
6566
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
6667
private TaskActionToolbox taskActionToolbox;
68+
private SupervisorManager supervisorManager;
6769
private SegmentMetadataCache segmentMetadataCache;
6870
private BlockingExecutorService metadataCachePollExec;
6971

@@ -144,6 +146,11 @@ public TaskActionToolbox getTaskActionToolbox()
144146
return taskActionToolbox;
145147
}
146148

149+
public SupervisorManager getSupervisorManager()
150+
{
151+
return supervisorManager;
152+
}
153+
147154
public void syncSegmentMetadataCache()
148155
{
149156
metadataCachePollExec.finishNextPendingTasks(4);
@@ -200,7 +207,14 @@ public boolean isBatchAllocationReduceMetadataIO()
200207
}
201208
};
202209

203-
SupervisorManager supervisorManager = new SupervisorManager(objectMapper, null);
210+
this.supervisorManager = new SupervisorManager(
211+
objectMapper,
212+
new SQLMetadataSupervisorManager(
213+
objectMapper,
214+
testDerbyConnector,
215+
() -> testDerbyConnector.getMetadataTablesConfig()
216+
)
217+
);
204218
SegmentAllocationQueue segmentAllocationQueue = new SegmentAllocationQueue(
205219
taskLockbox,
206220
taskLockConfig,
@@ -230,7 +244,9 @@ public boolean isBatchAllocationReduceMetadataIO()
230244
testDerbyConnector.createTaskTables();
231245
testDerbyConnector.createAuditTable();
232246
testDerbyConnector.createIndexingStatesTable();
247+
testDerbyConnector.createSupervisorsTable();
233248

249+
supervisorManager.start();
234250
segmentMetadataCache.start();
235251
segmentMetadataCache.becomeLeader();
236252
syncSegmentMetadataCache();
@@ -289,6 +305,7 @@ public void after()
289305
taskActionToolbox = null;
290306
segmentMetadataCache.stopBeingLeader();
291307
segmentMetadataCache.stop();
308+
supervisorManager.stop();
292309
useSegmentMetadataCache = false;
293310
}
294311
}

indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import com.google.common.collect.ImmutableMap;
2626
import com.google.common.collect.ImmutableSet;
2727
import com.google.common.util.concurrent.SettableFuture;
28+
import org.apache.druid.data.input.impl.ByteEntity;
2829
import org.apache.druid.error.DruidException;
2930
import org.apache.druid.error.DruidExceptionMatcher;
3031
import org.apache.druid.error.InvalidInput;
3132
import org.apache.druid.indexing.common.TaskLockType;
3233
import org.apache.druid.indexing.common.task.Tasks;
3334
import org.apache.druid.indexing.overlord.DataSourceMetadata;
35+
import org.apache.druid.indexing.overlord.ObjectMetadata;
3436
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
3537
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
3638
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -796,6 +798,154 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor()
796798
verifyAll();
797799
}
798800

801+
@Test
802+
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorIdIsNull()
803+
{
804+
MatcherAssert.assertThat(
805+
Assert.assertThrows(
806+
DruidException.class,
807+
() -> manager.isAnotherTaskGroupPublishingToPartitions(null, "task1", null)
808+
),
809+
DruidExceptionMatcher.invalidInput().expectMessageIs("'supervisorId' cannot be null")
810+
);
811+
}
812+
813+
@Test
814+
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorNotFound()
815+
{
816+
MatcherAssert.assertThat(
817+
Assert.assertThrows(
818+
DruidException.class,
819+
() -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null)
820+
),
821+
DruidExceptionMatcher.notFound().expectMessageIs("Could not find supervisor[supervisor1]")
822+
);
823+
}
824+
825+
@Test
826+
public void test_isAnotherTaskGroupPublishingToPartitions_returnsFalse_forNonStreamingSupervisor()
827+
{
828+
final String supervisorId = "supervisor1";
829+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
830+
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, supervisor1))
831+
);
832+
833+
supervisor1.start();
834+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
835+
replayAll();
836+
837+
manager.start();
838+
839+
Assert.assertFalse(
840+
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null)
841+
);
842+
}
843+
844+
@Test
845+
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsNull()
846+
{
847+
final String supervisorId = "supervisor1";
848+
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
849+
EasyMock.createMock(SeekableStreamSupervisor.class);
850+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
851+
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
852+
);
853+
854+
seekableStreamSupervisor.start();
855+
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
856+
replayAll();
857+
EasyMock.replay(seekableStreamSupervisor);
858+
859+
manager.start();
860+
861+
MatcherAssert.assertThat(
862+
Assert.assertThrows(
863+
DruidException.class,
864+
() -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null)
865+
),
866+
DruidExceptionMatcher.invalidInput().expectMessageIs(
867+
"Start metadata[null] of type[null] is not valid streaming metadata"
868+
)
869+
);
870+
}
871+
872+
@Test
873+
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsInvalid()
874+
{
875+
final String supervisorId = "supervisor1";
876+
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
877+
EasyMock.createMock(SeekableStreamSupervisor.class);
878+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
879+
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
880+
);
881+
882+
seekableStreamSupervisor.start();
883+
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
884+
replayAll();
885+
EasyMock.replay(seekableStreamSupervisor);
886+
887+
manager.start();
888+
889+
MatcherAssert.assertThat(
890+
Assert.assertThrows(
891+
DruidException.class,
892+
() -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", new ObjectMetadata("abc"))
893+
),
894+
DruidExceptionMatcher.invalidInput().expectMessageIs(
895+
"Start metadata[ObjectMetadata{theObject=abc}] of"
896+
+ " type[class org.apache.druid.indexing.overlord.ObjectMetadata]"
897+
+ " is not valid streaming metadata"
898+
)
899+
);
900+
}
901+
902+
@Test
903+
public void test_isAnotherTaskGroupPublishingToPartitions()
904+
{
905+
final String supervisorId = "supervisor1";
906+
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
907+
EasyMock.createMock(SeekableStreamSupervisor.class);
908+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
909+
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
910+
);
911+
912+
seekableStreamSupervisor.start();
913+
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
914+
915+
// Expect a readyTaskId for which no other group is currently publishing
916+
final String readyTaskId = "task1";
917+
EasyMock.expect(
918+
seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions(
919+
EasyMock.eq(readyTaskId),
920+
EasyMock.anyObject()
921+
)
922+
).andReturn(false).atLeastOnce();
923+
924+
// Expect a conflictingTaskId for which another group is currently publishing
925+
final String conflictingTaskId = "task2";
926+
EasyMock.expect(
927+
seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions(
928+
EasyMock.eq(conflictingTaskId),
929+
EasyMock.anyObject()
930+
)
931+
).andReturn(true).atLeastOnce();
932+
933+
replayAll();
934+
EasyMock.replay(seekableStreamSupervisor);
935+
manager.start();
936+
937+
final DataSourceMetadata startMetadata = TestSeekableStreamDataSourceMetadata.start(
938+
"topic",
939+
Map.of("0", "10")
940+
);
941+
Assert.assertTrue(
942+
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, conflictingTaskId, startMetadata)
943+
);
944+
Assert.assertFalse(
945+
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, readyTaskId, startMetadata)
946+
);
947+
}
948+
799949
private static class TestSupervisorSpec implements SupervisorSpec
800950
{
801951
private final String id;

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,24 @@
2323
import com.fasterxml.jackson.annotation.JsonProperty;
2424
import org.apache.druid.indexing.overlord.DataSourceMetadata;
2525

26+
import java.util.Map;
27+
2628
public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
2729
{
30+
public static TestSeekableStreamDataSourceMetadata start(String topic, Map<String, String> partitionOffsets)
31+
{
32+
return new TestSeekableStreamDataSourceMetadata(
33+
new SeekableStreamStartSequenceNumbers<>(topic, partitionOffsets, null)
34+
);
35+
}
36+
37+
public static TestSeekableStreamDataSourceMetadata end(String topic, Map<String, String> partitionOffsets)
38+
{
39+
return new TestSeekableStreamDataSourceMetadata(
40+
new SeekableStreamEndSequenceNumbers<>(topic, partitionOffsets)
41+
);
42+
}
43+
2844
@JsonCreator
2945
public TestSeekableStreamDataSourceMetadata(
3046
@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers)

0 commit comments

Comments
 (0)