|
30 | 30 | import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; |
31 | 31 | import org.apache.druid.indexing.overlord.TaskMaster; |
32 | 32 | import org.apache.druid.indexing.overlord.TaskStorage; |
| 33 | +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; |
33 | 34 | import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; |
| 35 | +import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; |
| 36 | +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig; |
34 | 37 | import org.apache.druid.jackson.DefaultObjectMapper; |
35 | 38 | import org.apache.druid.java.util.common.StringUtils; |
36 | 39 | import org.apache.druid.java.util.common.granularity.Granularities; |
@@ -422,6 +425,46 @@ public void testSuspendResume() throws IOException |
422 | 425 | Assert.assertFalse(runningSpec.isSuspended()); |
423 | 426 | } |
424 | 427 |
|
| 428 | + @Test |
| 429 | + public void testTaskCountSerdeRoundTrip() throws IOException |
| 430 | + { |
| 431 | + // A persisted taskCount must survive a serialize/deserialize round-trip even when |
| 432 | + // autoScalerConfig.taskCountStart is set. |
| 433 | + final CostBasedAutoScalerConfig autoScalerConfig = |
| 434 | + CostBasedAutoScalerConfig.builder() |
| 435 | + .enableTaskAutoScaler(true) |
| 436 | + .taskCountMin(1) |
| 437 | + .taskCountMax(100) |
| 438 | + .taskCountStart(25) |
| 439 | + .build(); |
| 440 | + |
| 441 | + final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder() |
| 442 | + .withDataSchema( |
| 443 | + schema -> schema |
| 444 | + .withTimestamp(TimestampSpec.DEFAULT) |
| 445 | + .withAggregators(new CountAggregatorFactory("rows")) |
| 446 | + .withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)) |
| 447 | + ) |
| 448 | + .withIoConfig( |
| 449 | + ioConfig -> ioConfig |
| 450 | + .withJsonInputFormat() |
| 451 | + .withConsumerProperties(Map.of("bootstrap.servers", "localhost:9092")) |
| 452 | + .withTaskCount(25) |
| 453 | + .withAutoScalerConfig(autoScalerConfig) |
| 454 | + .withLagAggregator(LagAggregator.DEFAULT) |
| 455 | + ) |
| 456 | + .build("testDs", "metrics"); |
| 457 | + |
| 458 | + // Mutate taskCount the same way SeekableStreamSupervisor.changeTaskCountInIOConfig does, |
| 459 | + // and verify that the mutation is picked up by serialization. |
| 460 | + spec.getIoConfig().setTaskCount(50); |
| 461 | + final byte[] payload = mapper.writeValueAsBytes(spec); |
| 462 | + final KafkaSupervisorSpec roundTripped = |
| 463 | + (KafkaSupervisorSpec) mapper.readValue(payload, SupervisorSpec.class); |
| 464 | + Assert.assertEquals(50, roundTripped.getIoConfig().getTaskCount()); |
| 465 | + Assert.assertTrue(roundTripped.getIoConfig().isTaskCountExplicit()); |
| 466 | + } |
| 467 | + |
425 | 468 | @Test |
426 | 469 | public void test_validateSpecUpdateTo() |
427 | 470 | { |
|
0 commit comments