Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
Expand Down Expand Up @@ -123,14 +125,16 @@ private static class StateStoreMetadata {
private DeserializationExceptionHandler deserializationExceptionHandler;
private ProcessingExceptionHandler processingExceptionHandler;
private Sensor droppedRecordsSensor;
private BooleanSupplier inErrorStateSupplier;

public GlobalStateManagerImpl(final LogContext logContext,
final Time time,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
final StreamsConfig config,
final BooleanSupplier inErrorStateSupplier) {
this.time = time;
this.topology = topology;
this.stateDirectory = stateDirectory;
Expand All @@ -147,6 +151,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
logPrefix = logContext.logPrefix();
this.globalConsumer = globalConsumer;
this.stateRestoreListener = stateRestoreListener;
this.inErrorStateSupplier = inErrorStateSupplier;

final Map<String, Object> consumerProps = config.getGlobalConsumerConfigs("dummy");
// need to add mandatory configs; otherwise `QuietConsumerConfig` throws
Expand Down Expand Up @@ -209,6 +214,10 @@ public Set<String> initialize() {
LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores);

for (final StateStoreMetadata metadata : storeMetadata.values()) {
if (inErrorStateSupplier.getAsBoolean()) {
log.info("Global store bootstrap interrupted by shutdown before starting {}", metadata.stateStore.name());
break;
}
// load the committed offsets from the store
final StateStore store = metadata.stateStore;
if (store.persistent()) {
Expand Down Expand Up @@ -348,7 +357,21 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {
// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call
// `poll(pollMS)` without adding the request timeout and do a more precise
// timeout handling
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout);
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
}

final ConsumerRecords<byte[], byte[]> records;
try {
records = globalConsumer.poll(pollMsPlusRequestTimeout);
} catch (final WakeupException e) {
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
}
throw e;
}
if (records.isEmpty()) {
currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline);
} else {
Expand Down Expand Up @@ -493,7 +516,20 @@ private void restoreState(final StateStoreMetadata storeMetadata) {
// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call
// `poll(pollMS)` without adding the request timeout and do a more precise
// timeout handling
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout);
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
Comment on lines +519 to +521
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make the shutdown-interrupted bootstrap path explicit instead of returning normally from GlobalStateManagerImpl?

Currently, when inErrorStateSupplier.getAsBoolean() is true, restoreState() / reprocessState() just return, so GlobalStateManagerImpl#initialize() can also return as if bootstrap completed successfully. As a result, GlobalStateUpdateTask#initialize() may continue into initTopology(), processorContext.initialize(), and flushState() even though shutdown has already been requested.

Since initTopology() can invoke user-provided Processor#init(), this could unnecessarily open external resources during shutdown. Maybe this should use an explicit internal signal, such as a dedicated bootstrap-interrupted exception caught only on the clean shutdown path, or return an initialize status like completed/interrupted so the follow-up initialization can be skipped.

What do you think?

}
final ConsumerRecords<byte[], byte[]> records;
try {
records = globalConsumer.poll(pollMsPlusRequestTimeout);
} catch (final WakeupException e) {
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
}
throw e;
}
if (records.isEmpty()) {
currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline);
} else {
Expand All @@ -518,6 +554,10 @@ private void restoreState(final StateStoreMetadata storeMetadata) {
}
}

private void logBootstrapInterrupted(final StateStoreMetadata storeMetadata) {
log.info("Bootstrap interrupted by shutdown for {}", storeMetadata.stateStore.name());
}

private long getGlobalConsumerOffset(final TopicPartition topicPartition) {
return retryUntilSuccessOrThrowOnTaskTimeout(
() -> globalConsumer.position(topicPartition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.LogContext;
Expand Down Expand Up @@ -299,7 +300,13 @@ public void run() {
if (size != -1L) {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
try {
stateConsumer.pollAndUpdate();
} catch (final WakeupException e) {
if (!inErrorState()) {
throw e;
}
}

if (fetchDeadlineClientInstanceId != -1) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
Expand Down Expand Up @@ -382,7 +389,8 @@ private StateConsumer initialize() {
globalConsumer,
stateDirectory,
stateRestoreListener,
config
config,
this::inErrorState
);

final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
Expand Down Expand Up @@ -429,8 +437,21 @@ private StateConsumer initialize() {
);
}

if (inErrorState()) {
closeStateConsumer(stateConsumer, false);
return null;
}

setState(RUNNING);
return stateConsumer;
} catch (final WakeupException e) {
closeStateConsumer(stateConsumer, false);
if (inErrorState()) {
log.info("Global thread initialization interrupted by shutdown");
} else {
startupException = new StreamsException(
"Unexpected wakeup during initialization of GlobalStreamThread", e);
}
} catch (final StreamsException fatalException) {
closeStateConsumer(stateConsumer, false);
startupException = fatalException;
Expand Down Expand Up @@ -477,6 +498,7 @@ public void shutdown() {
// if already shutting down or dead
setState(PENDING_SHUTDOWN);
initializationLatch.countDown();
globalConsumer.wakeup();
}

public Map<MetricName, Metric> consumerMetrics() {
Expand Down
Loading