|
64 | 64 | import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; |
65 | 65 | import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; |
66 | 66 | import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; |
| 67 | +import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleActionSupplier; |
| 68 | +import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection; |
67 | 69 | import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; |
68 | 70 | import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; |
69 | 71 | import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; |
|
122 | 124 | import java.util.SortedMap; |
123 | 125 | import java.util.TreeMap; |
124 | 126 | import java.util.TreeSet; |
125 | | -import java.util.concurrent.Callable; |
126 | 127 | import java.util.concurrent.ConcurrentHashMap; |
127 | 128 | import java.util.concurrent.CopyOnWriteArrayList; |
128 | 129 | import java.util.concurrent.ExecutionException; |
@@ -454,13 +455,13 @@ public boolean equals(Object obj) |
454 | 455 | // change taskCount without resubmitting. |
455 | 456 | private class DynamicAllocationTasksNotice implements Notice |
456 | 457 | { |
457 | | - Callable<Integer> computeDesiredTaskCount; |
| 458 | + ScaleActionSupplier computeDesiredTaskCount; |
458 | 459 | ServiceEmitter emitter; |
459 | 460 | Runnable onSuccessfulScale; |
460 | 461 | private static final String TYPE = "dynamic_allocation_tasks_notice"; |
461 | 462 |
|
462 | 463 | DynamicAllocationTasksNotice( |
463 | | - Callable<Integer> computeDesiredTaskCount, |
| 464 | + ScaleActionSupplier computeDesiredTaskCount, |
464 | 465 | Runnable onSuccessfulScale, |
465 | 466 | ServiceEmitter emitter |
466 | 467 | ) |
@@ -499,59 +500,92 @@ public void handle() |
499 | 500 | supervisorId, |
500 | 501 | dataSource |
501 | 502 | ); |
502 | | - final Integer desiredTaskCount = computeDesiredTaskCount.call(); |
503 | | - ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() |
504 | | - .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) |
505 | | - .setDimension(DruidMetrics.DATASOURCE, dataSource) |
506 | | - .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); |
507 | | - for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) { |
508 | | - // There are expected to be pending tasks if this scaling is happening on task rollover |
509 | | - if (!list.isEmpty() && !isScalingTasksOnRollover.get()) { |
510 | | - log.info( |
511 | | - "Skipping DynamicAllocationTasksNotice execution for supervisor[%s] for datasource[%s] because following tasks are pending [%s]", |
512 | | - supervisorId, |
513 | | - dataSource, |
514 | | - list |
515 | | - ); |
516 | | - if (desiredTaskCount > 0) { |
| 503 | + final int desiredTaskCount = computeDesiredTaskCount.computeTaskCount(); |
| 504 | + final int currentTaskCount = getCurrentTaskCount(); |
| 505 | + final boolean needToScale = desiredTaskCount > 0 && desiredTaskCount != currentTaskCount; |
| 506 | + |
| 507 | + if (needToScale) { |
| 508 | + ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() |
| 509 | + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) |
| 510 | + .setDimension(DruidMetrics.DATASOURCE, dataSource) |
| 511 | + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); |
| 512 | + |
| 513 | + // 1) Make sure we wait for any pending completion tasks to finish. |
| 514 | + // At this point there could be 2 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation). |
| 515 | + // We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid. |
| 516 | + for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) { |
| 517 | + // There are expected to be pending tasks if this scaling is happening on task rollover |
| 518 | + if (!list.isEmpty() && !isScalingTasksOnRollover.get()) { |
| 519 | + log.info( |
| 520 | + "Skipping DynamicAllocationTasksNotice execution for supervisor[%s] for datasource[%s] because following tasks are pending [%s]", |
| 521 | + supervisorId, |
| 522 | + dataSource, |
| 523 | + list |
| 524 | + ); |
517 | 525 | emitter.emit(event.setDimension( |
518 | 526 | AUTOSCALER_SKIP_REASON_DIMENSION, |
519 | 527 | "There are tasks pending completion" |
520 | 528 | ) |
521 | 529 | .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); |
| 530 | + return; |
522 | 531 | } |
523 | | - return; |
524 | 532 | } |
525 | | - } |
526 | | - if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) { |
527 | | - log.info( |
528 | | - "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%s], active task count is [%s]", |
529 | | - nowTime - dynamicTriggerLastRunTime, |
530 | | - autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), |
531 | | - supervisorId, |
532 | | - dataSource, |
533 | | - desiredTaskCount, |
534 | | - getActiveTaskGroupsCount() |
535 | | - ); |
536 | 533 |
|
537 | | - if (desiredTaskCount > 0) { |
| 534 | + // 2) Make sure we are not breaching any scaling cooldown limits |
| 535 | + final long lastRunTime; |
| 536 | + final long effectiveFreq; |
| 537 | + final ScaleDirection scaleDirection; |
| 538 | + |
| 539 | + if (desiredTaskCount > currentTaskCount) { |
| 540 | + // Scale up: use the scale-up specific frequency, falling back to base. |
| 541 | + Long specificFreq = autoScalerConfig.getMinTriggerScaleUpActionFrequencyMillis(); |
| 542 | + effectiveFreq = specificFreq != null |
| 543 | + ? specificFreq |
| 544 | + : autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(); |
| 545 | + lastRunTime = dynamicTriggerLastScaleUpRunTime; |
| 546 | + scaleDirection = ScaleDirection.SCALE_UP; |
| 547 | + } else { |
| 548 | + // Scale down: use the scale-down specific frequency, falling back to base. |
| 549 | + Long specificFreq = autoScalerConfig.getMinTriggerScaleDownActionFrequencyMillis(); |
| 550 | + effectiveFreq = specificFreq != null |
| 551 | + ? specificFreq |
| 552 | + : autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(); |
| 553 | + lastRunTime = dynamicTriggerLastScaleDownRunTime; |
| 554 | + scaleDirection = ScaleDirection.SCALE_DOWN; |
| 555 | + } |
| 556 | + |
| 557 | + if (nowTime - lastRunTime < effectiveFreq) { |
| 558 | + log.info( |
| 559 | + "DynamicAllocationTasksNotice submitted again in [%d]ms, effective [%s] throttle is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]", |
| 560 | + nowTime - lastRunTime, |
| 561 | + scaleDirection, |
| 562 | + effectiveFreq, |
| 563 | + supervisorId, |
| 564 | + dataSource, |
| 565 | + desiredTaskCount, |
| 566 | + currentTaskCount |
| 567 | + ); |
| 568 | + |
538 | 569 | emitter.emit(event.setDimension( |
539 | 570 | AUTOSCALER_SKIP_REASON_DIMENSION, |
540 | 571 | "minTriggerScaleActionFrequencyMillis not elapsed yet" |
541 | 572 | ) |
542 | 573 | .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); |
| 574 | + return; |
543 | 575 | } |
544 | | - return; |
545 | | - } |
546 | 576 |
|
547 | | - if (desiredTaskCount > 0) { |
| 577 | + // At this point, we can attempt a scaling action, so emit |
548 | 578 | emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); |
549 | | - } |
550 | 579 |
|
551 | | - boolean allocationSuccess = changeTaskCount(desiredTaskCount); |
552 | | - if (allocationSuccess) { |
553 | | - onSuccessfulScale.run(); |
554 | | - dynamicTriggerLastRunTime = nowTime; |
| 580 | + boolean allocationSuccess = changeTaskCount(desiredTaskCount); |
| 581 | + if (allocationSuccess) { |
| 582 | + onSuccessfulScale.run(); |
| 583 | + if (desiredTaskCount > currentTaskCount) { |
| 584 | + dynamicTriggerLastScaleUpRunTime = nowTime; |
| 585 | + } else { |
| 586 | + dynamicTriggerLastScaleDownRunTime = nowTime; |
| 587 | + } |
| 588 | + } |
555 | 589 | } |
556 | 590 | } |
557 | 591 | catch (Exception ex) { |
@@ -586,7 +620,8 @@ public String getType() |
586 | 620 | * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor. |
587 | 621 | * |
588 | 622 | * @param desiredActiveTaskCount desired taskCount computed from AutoScaler |
589 | | - * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'. |
| 623 | + * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least the configured |
| 624 | + * minTriggerScale(Up|Down)ActionFrequencyMillis (falling back to minTriggerScaleActionFrequencyMillis) before the next same-direction 'changeTaskCount'. |
590 | 625 | * If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis. |
591 | 626 | * @throws InterruptedException |
592 | 627 | * @throws ExecutionException |
@@ -958,7 +993,8 @@ public String getType() |
958 | 993 | private final boolean useExclusiveStartingSequence; |
959 | 994 | private boolean listenerRegistered = false; |
960 | 995 | private long lastRunTime; |
961 | | - private long dynamicTriggerLastRunTime; |
| 996 | + private long dynamicTriggerLastScaleUpRunTime; |
| 997 | + private long dynamicTriggerLastScaleDownRunTime; |
962 | 998 | private int initRetryCounter = 0; |
963 | 999 | private volatile DateTime firstRunTime; |
964 | 1000 | private volatile DateTime earlyStopTime = null; |
@@ -1416,7 +1452,7 @@ public void tryInit() |
1416 | 1452 | } |
1417 | 1453 |
|
1418 | 1454 | public Runnable buildDynamicAllocationTask( |
1419 | | - Callable<Integer> scaleAction, |
| 1455 | + ScaleActionSupplier scaleAction, |
1420 | 1456 | Runnable onSuccessfulScale, |
1421 | 1457 | ServiceEmitter emitter |
1422 | 1458 | ) |
|
0 commit comments