Skip to content

Commit 28ec585

Browse files
authored
KAFKA-20723: Add fail fast check for share group dlq(...). [1/N] (apache#22635)
* Add check whether the DLQ state manager is started, before enqueuing the request in the `Sender`. * Change sequence of validations in `DefaultShareGroupDLQManager`. * Add tests to verify the `dlq()` method changes. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
1 parent 7c2b51e commit 28ec585

4 files changed

Lines changed: 55 additions & 11 deletions

File tree

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,9 +929,16 @@ class BrokerServer(
929929
Utils.closeQuietly(brokerTopicStats, "broker topic stats")
930930
Utils.closeQuietly(sharePartitionManager, "share partition manager")
931931

932+
// The order of closing sharePartitionManager, groupCoordinator and persister matters.
933+
// groupCoordinator, sharePartitionManager must be closed before the persister so that
934+
// new requests from sharePartitionManager, groupCoordinator do not encounter a stopped
935+
// persister.
932936
if (persister != null)
933937
Utils.swallow(this.logger.underlying, () => persister.stop())
934938

939+
// The order of closing sharePartitionManager and shareGroupDLQManager matters.
940+
// sharePartitionManager must be closed before the shareGroupDLQManager so any new
941+
// requests from sharePartitionManager do not encounter a stopped shareGroupDLQManager.
935942
if (shareGroupDLQManager != null)
936943
Utils.swallow(this.logger.underlying, () => shareGroupDLQManager.stop())
937944

server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,22 @@ private DefaultShareGroupDLQManager(
6262
ShareGroupMetrics shareGroupMetrics,
6363
LogReader logReader
6464
) {
65-
this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer, shareGroupMetrics, logReader);
65+
stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer, shareGroupMetrics, logReader);
6666
}
6767

6868
private void start() {
69-
this.stateManager.start();
69+
stateManager.start();
7070
}
7171

7272
@Override
7373
public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) {
7474
try {
7575
validate(param);
76+
return stateManager.dlq(param);
7677
} catch (Exception e) {
77-
log.error("Unable to validate dlq record parameters", e);
78+
log.error("Unable to enqueue DLQ request", e);
7879
return CompletableFuture.failedFuture(e);
7980
}
80-
return stateManager.dlq(param);
8181
}
8282

8383
@Override
@@ -111,16 +111,16 @@ private static void validate(ShareGroupDLQRecordParameter param) {
111111
throw new IllegalArgumentException(prefix + " partition cannot be negative.");
112112
}
113113

114-
if (param.lastOffset() < param.firstOffset()) {
115-
throw new IllegalArgumentException(prefix + " last offset cannot be less than first offset.");
116-
}
117-
118114
if (param.firstOffset() < 0) {
119115
throw new IllegalArgumentException(prefix + " first offset cannot be negative.");
120116
}
121117

122118
if (param.lastOffset() < 0) {
123119
throw new IllegalArgumentException(prefix + " last offset cannot be negative.");
124120
}
121+
122+
if (param.lastOffset() < param.firstOffset()) {
123+
throw new IllegalArgumentException(prefix + " last offset cannot be less than first offset.");
124+
}
125125
}
126126
}

server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
185185

186186
// Visibility for tests
187187
CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long requestBackoffMs, long requestBackoffMaxMs, int maxRequestAttempts) {
188+
if (!this.isStarted.get()) {
189+
return CompletableFuture.failedFuture(new IllegalStateException("ShareGroupDLQStateManager is not started."));
190+
}
188191
CompletableFuture<Void> future = new CompletableFuture<>();
189192
ProduceRequestHandler requestHandler = new ProduceRequestHandler(param, future, requestBackoffMs, requestBackoffMaxMs, maxRequestAttempts);
190193
enqueue(requestHandler);

server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,33 @@ public void testStopWithoutStartIsNoOp() {
382382
verifyNoInteractions(mockMetrics);
383383
}
384384

385-
// ---- DLQ topic validation tests (no thread start required) ----
385+
@Test
386+
public void testDlqBeforeStartFailsWithIllegalState() {
387+
stateManager = builder().build();
388+
// dlq() is invoked without a prior start(); the lifecycle guard must reject it with a
389+
// failed future rather than enqueueing onto a sender thread that is not running.
390+
CompletableFuture<Void> result = stateManager.dlq(param());
391+
assertTrue(result.isDone());
392+
assertTrue(result.isCompletedExceptionally());
393+
assertInstanceOf(IllegalStateException.class, getCause(result));
394+
verifyNoInteractions(mockMetrics);
395+
}
396+
397+
@Test
398+
public void testDlqAfterStopFailsWithIllegalState() throws Exception {
399+
stateManager = builder().build();
400+
stateManager.start();
401+
stateManager.stop();
402+
// Once stopped, dlq() must fail fast rather than enqueueing onto a shut-down sender thread
403+
// where the future would never complete.
404+
CompletableFuture<Void> result = stateManager.dlq(param());
405+
assertTrue(result.isDone());
406+
assertTrue(result.isCompletedExceptionally());
407+
assertInstanceOf(IllegalStateException.class, getCause(result));
408+
verifyNoInteractions(mockMetrics);
409+
}
410+
411+
// ---- DLQ topic validation tests ----
386412

387413
@Test
388414
public void testDlqEmptyTopicNameFailsValidation() throws Exception {
@@ -391,6 +417,7 @@ public void testDlqEmptyTopicNameFailsValidation() throws Exception {
391417
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
392418

393419
stateManager = builder().withCacheHelper(cacheHelper).build();
420+
stateManager.start();
394421
Throwable cause = getCause(stateManager.dlq(param()));
395422
assertInstanceOf(ConfigException.class, cause);
396423
assertTrue(cause.getMessage().contains("empty"));
@@ -404,6 +431,7 @@ public void testDlqTopicStartingWithUnderscoreFailsValidation() throws Exception
404431
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
405432

406433
stateManager = builder().withCacheHelper(cacheHelper).build();
434+
stateManager.start();
407435
Throwable cause = getCause(stateManager.dlq(param()));
408436
assertInstanceOf(ConfigException.class, cause);
409437
assertTrue(cause.getMessage().contains("__"));
@@ -419,6 +447,7 @@ public void testDlqExistingTopicWithoutDlqConfigFailsValidation() throws Excepti
419447
when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(false);
420448

421449
stateManager = builder().withCacheHelper(cacheHelper).build();
450+
stateManager.start();
422451
Throwable cause = getCause(stateManager.dlq(param()));
423452
assertInstanceOf(ConfigException.class, cause);
424453
assertTrue(cause.getMessage().contains("DLQ is not enabled"));
@@ -434,6 +463,7 @@ public void testDlqTopicMissingAndAutoCreateDisabledFailsValidation() throws Exc
434463
when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(false);
435464

436465
stateManager = builder().withCacheHelper(cacheHelper).build();
466+
stateManager.start();
437467
Throwable cause = getCause(stateManager.dlq(param()));
438468
assertInstanceOf(ConfigException.class, cause);
439469
assertTrue(cause.getMessage().contains("auto create is disabled"));
@@ -449,24 +479,28 @@ public void testDlqTopicPrefixMismatchFailsValidation() throws Exception {
449479
when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
450480

451481
stateManager = builder().withCacheHelper(cacheHelper).build();
482+
stateManager.start();
452483
Throwable cause = getCause(stateManager.dlq(param()));
453484
assertInstanceOf(ConfigException.class, cause);
454485
assertTrue(cause.getMessage().contains("does not comply with the DLQ topic prefix"));
455486
verifyNoInteractions(mockMetrics);
456487
}
457488

458489
@Test
459-
public void testDlqValidationFailureCompletesFutureBeforeStart() throws Exception {
490+
public void testDlqValidationFailureCompletesFutureSynchronously() throws Exception {
460491
ShareGroupDLQMetadataCacheHelper cacheHelper = mock(ShareGroupDLQMetadataCacheHelper.class);
461492
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.empty());
462493
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
463494

464-
// validateDlqTopic runs synchronously inside dlq(), so it should fail without the sender thread.
495+
// validateDlqTopic runs synchronously inside dlq() on the calling thread, so a validation
496+
// failure completes the returned future before dlq() returns - no sender-thread round trip.
465497
stateManager = builder().withCacheHelper(cacheHelper).build();
498+
stateManager.start();
466499
CompletableFuture<Void> result = stateManager.dlq(param());
467500
assertTrue(result.isDone());
468501
assertTrue(result.isCompletedExceptionally());
469502
assertFalse(result.isCancelled());
503+
assertInstanceOf(ConfigException.class, getCause(result));
470504
verifyNoInteractions(mockMetrics);
471505
}
472506

0 commit comments

Comments
 (0)