-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19960: Close pending tasks on shutdown. #21365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,212 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.streams.integration; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
| import org.apache.kafka.common.serialization.LongSerializer; | ||
| import org.apache.kafka.common.serialization.Serdes; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
| import org.apache.kafka.common.utils.Bytes; | ||
| import org.apache.kafka.common.utils.MockTime; | ||
| import org.apache.kafka.streams.CloseOptions; | ||
| import org.apache.kafka.streams.KafkaStreams; | ||
| import org.apache.kafka.streams.KeyValue; | ||
| import org.apache.kafka.streams.StreamsConfig; | ||
| import org.apache.kafka.streams.TopologyWrapper; | ||
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | ||
| import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; | ||
| import org.apache.kafka.streams.processor.StateStore; | ||
| import org.apache.kafka.streams.processor.StateStoreContext; | ||
| import org.apache.kafka.streams.processor.internals.StreamThread; | ||
| import org.apache.kafka.streams.state.KeyValueStore; | ||
| import org.apache.kafka.streams.state.StoreBuilder; | ||
| import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; | ||
| import org.apache.kafka.streams.state.internals.CacheFlushListener; | ||
| import org.apache.kafka.streams.state.internals.CachedStateStore; | ||
| import org.apache.kafka.streams.state.internals.RocksDBStore; | ||
| import org.apache.kafka.test.MockApiProcessorSupplier; | ||
| import org.apache.kafka.test.TestUtils; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.TestInfo; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.Properties; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| public class RebalanceTaskClosureIntegrationTest { | ||
|
|
||
| private static final int NUM_BROKERS = 1; | ||
| protected static final String INPUT_TOPIC_NAME = "input-topic"; | ||
| private static final int NUM_PARTITIONS = 3; | ||
|
|
||
| private final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); | ||
|
|
||
| private KafkaStreamsWrapper streams1; | ||
| private KafkaStreamsWrapper streams2; | ||
| private String safeTestName; | ||
|
|
||
| @BeforeEach | ||
| public void before(final TestInfo testInfo) throws InterruptedException, IOException { | ||
| cluster.start(); | ||
| cluster.createTopic(INPUT_TOPIC_NAME, NUM_PARTITIONS, 1); | ||
| safeTestName = safeUniqueTestName(testInfo); | ||
| } | ||
|
|
||
| @AfterEach | ||
| public void after() { | ||
| cluster.stop(); | ||
| if (streams1 != null) { | ||
| streams1.close(Duration.ofSeconds(30)); | ||
| } | ||
| if (streams2 != null) { | ||
| streams2.close(Duration.ofSeconds(30)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void shouldClosePendingTasksToInitAfterRebalance() throws Exception { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be good to add a test description at the top, explaining what we are exactly testing. and so forth. I am not sure I understand the test yet, and I am sure if somebody come back to it at some point in the future they might struggle, too. |
||
| final CountDownLatch recycleLatch = new CountDownLatch(1); | ||
| final CountDownLatch pendingShutdownLatch = new CountDownLatch(1); | ||
| // Count how many times we initialize and close stores | ||
| final AtomicInteger initCount = new AtomicInteger(); | ||
| final AtomicInteger closeCount = new AtomicInteger(); | ||
| final StoreBuilder<KeyValueStore<Bytes, byte[]>> storeBuilder = new AbstractStoreBuilder<>("testStateStore", Serdes.Integer(), Serdes.ByteArray(), new MockTime()) { | ||
|
|
||
| @Override | ||
| public KeyValueStore<Bytes, byte[]> build() { | ||
| return new TestRocksDBStore(name, recycleLatch, pendingShutdownLatch, initCount, closeCount); | ||
| } | ||
| }; | ||
|
|
||
| final TopologyWrapper topology = new TopologyWrapper(); | ||
| topology.addSource("ingest", INPUT_TOPIC_NAME); | ||
| topology.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest"); | ||
| topology.addStateStore(storeBuilder, "my-processor"); | ||
|
|
||
| streams1 = new KafkaStreamsWrapper(topology, props("1")); | ||
| streams1.setStreamThreadStateListener((t, newState, oldState) -> { | ||
| if (newState == StreamThread.State.PENDING_SHUTDOWN) { | ||
| pendingShutdownLatch.countDown(); | ||
| } | ||
| }); | ||
| streams1.start(); | ||
|
|
||
| TestUtils.waitForCondition(() -> streams1.state() == KafkaStreams.State.RUNNING, "Streams never reached RUNNING state"); | ||
|
|
||
| streams2 = new KafkaStreamsWrapper(topology, props("2")); | ||
| streams2.start(); | ||
|
|
||
| TestUtils.waitForCondition(() -> streams2.state() == KafkaStreams.State.RUNNING, "Streams never reached RUNNING state"); | ||
|
|
||
| // starting the second KS app triggered a rebalance. Which in turn will recycle active tasks that need to become standby. | ||
| // That's exactly what we are waiting for | ||
| recycleLatch.await(); | ||
|
|
||
| // sending a message to disable retries in the consumer client. if there are no messages, it retries the whole sequence of actions, | ||
| // including the rebalance data. which we don't want, because we just staged the right condition | ||
| IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC_NAME, List.of(new KeyValue<>(1L, "key")), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if I understand. "disable retires on the consumer" -- what retires are you referring to, and what consumer exaclty? "it retires the whole sequence of actions, including the rebalance data" -- what sequence are you referring too? What do you mean by "rebalance data"? "which we don't want, because we just staged the right condition" -- Why do we not want this? Not sure what the "right condition" is, and why we would "lose it" w/o sending an input record.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was talking about this code: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L639-L671 so updateAssignmentMetadataIfNeeded calls onAssignment and so on, which in turn at some point will call clearCache. where we set up a synchronization point in this test. I will update the comment to make it more clear what I mean |
||
| TestUtils.producerConfig(cluster.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), cluster.time); | ||
| // Now we can close both apps. The StreamThreadStateListener will unblock the clearCache call, letting the rebalance finish. | ||
| // We don't want it to happen any sooner, because we want the stream thread to stop before it gets to moving messages from task registry to state updater. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "before it gets to moving messages from task registry to state updater" -- do you mean "task" instead of "messaged"? |
||
| streams1.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)); | ||
| streams2.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)); | ||
|
|
||
| assertEquals(initCount.get(), closeCount.get()); | ||
| } | ||
|
|
||
| private Properties props(final String storePathSuffix) { | ||
| final Properties streamsConfiguration = new Properties(); | ||
|
|
||
| streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName); | ||
| streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); | ||
| streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the default IIRC -- no need to set it |
||
| streamsConfiguration.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 1000); | ||
| streamsConfiguration.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); | ||
| streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + "/" + storePathSuffix); | ||
| streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); | ||
| streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); | ||
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); | ||
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. Already the default. |
||
| streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); | ||
|
|
||
| return streamsConfiguration; | ||
| } | ||
|
|
||
| private static class TestRocksDBStore extends RocksDBStore implements CachedStateStore<Bytes, byte[]> { | ||
|
|
||
| private final CountDownLatch recycleLatch; | ||
| private final CountDownLatch pendingShutdownLatch; | ||
| private final AtomicInteger initCount; | ||
| private final AtomicInteger closeCount; | ||
|
|
||
| public TestRocksDBStore(final String name, | ||
| final CountDownLatch recycleLatch, | ||
| final CountDownLatch pendingShutdownLatch, | ||
| final AtomicInteger initCount, | ||
| final AtomicInteger closeCount) { | ||
| super(name, "rocksdb"); | ||
| this.recycleLatch = recycleLatch; | ||
| this.pendingShutdownLatch = pendingShutdownLatch; | ||
| this.initCount = initCount; | ||
| this.closeCount = closeCount; | ||
| } | ||
|
|
||
| @Override | ||
| public void init(final StateStoreContext stateStoreContext, | ||
| final StateStore root) { | ||
| initCount.incrementAndGet(); | ||
| super.init(stateStoreContext, root); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean setFlushListener(final CacheFlushListener<Bytes, byte[]> listener, | ||
| final boolean sendOldValues) { | ||
| return false; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we just implement CachedStateStore, there is no super |
||
| } | ||
|
|
||
| @Override | ||
| public void flushCache() { | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we just implement CachedStateStore, there is no super |
||
|
|
||
| @Override | ||
| public void clearCache() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we just implement CachedStateStore, there is no super |
||
| // Clear cache is called during recycle, so we use it as a hook | ||
| recycleLatch.countDown(); | ||
| try { | ||
| pendingShutdownLatch.await(); | ||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void close() { | ||
| closeCount.incrementAndGet(); | ||
| super.close(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) { | |
| // TODO: change type to `StreamTask` | ||
| final Set<Task> activeTasks = new TreeSet<>(Comparator.comparing(Task::id)); | ||
| activeTasks.addAll(tasks.activeTasks()); | ||
| final Set<Task> standbyTasks = new TreeSet<>(Comparator.comparing(Task::id)); | ||
| standbyTasks.addAll(tasks.standbyTasks()); | ||
| for (Task pendingTask : tasks.pendingTasksToInit()) { | ||
| if (pendingTask.isActive()) { | ||
| activeTasks.add(pendingTask); | ||
| } else { | ||
| standbyTasks.add(pendingTask); | ||
| } | ||
| } | ||
|
|
||
| executeAndMaybeSwallow( | ||
| clean, | ||
| () -> closeAndCleanUpTasks(activeTasks, standbyTaskIterable(), clean), | ||
| () -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean), | ||
| e -> firstException.compareAndSet(null, e), | ||
| e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e) | ||
| ); | ||
|
|
@@ -1523,7 +1532,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeT | |
| final boolean clean, | ||
| final AtomicReference<RuntimeException> firstException) { | ||
| if (!clean) { | ||
| return activeTaskIterable(); | ||
| return activeTasksToClose; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my own understanding. The original code here was not wrong in the sense that before this PR the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the reason why I decided to change this was that if we follow the call:
even without my change it's already a bit incorrect, because we take the active tasks from the task registry and from the state updater: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1708-L1709, which isn't really a big deal, because we handle the state updater before this call, so there should be nothing there. on top of that, as you mentioned, this list doesn't contain the pending tasks. so if we keep it as is, when we shutdown dirtily, we will not close them. |
||
| } | ||
| final Comparator<Task> byId = Comparator.comparing(Task::id); | ||
| final Set<Task> tasksToCommit = new TreeSet<>(byId); | ||
|
|
@@ -1616,7 +1625,7 @@ private Collection<Task> tryCloseCleanStandbyTasks(final Collection<Task> standb | |
| final boolean clean, | ||
| final AtomicReference<RuntimeException> firstException) { | ||
| if (!clean) { | ||
| return standbyTaskIterable(); | ||
| return standbyTasksToClose; | ||
| } | ||
| final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,7 +193,9 @@ public synchronized void removeTask(final Task taskToRemove) { | |
| throw new IllegalStateException("Attempted to remove a task that is not closed or suspended: " + taskId); | ||
| } | ||
|
|
||
| if (taskToRemove.isActive()) { | ||
| if (pendingTasksToInit.contains(taskToRemove)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We throw above if a task is not CLOSED or SUSPENDED, but Are we testing this code path properly? If no test fails, it seems we never call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's supposed to be CLOSED too. |
||
| pendingTasksToInit.remove(taskToRemove); | ||
| } else if (taskToRemove.isActive()) { | ||
| if (activeTasksPerId.remove(taskId) == null) { | ||
| throw new IllegalArgumentException("Attempted to remove an active task that is not owned: " + taskId); | ||
| } | ||
|
|
@@ -203,7 +205,7 @@ public synchronized void removeTask(final Task taskToRemove) { | |
| throw new IllegalArgumentException("Attempted to remove a standby task that is not owned: " + taskId); | ||
| } | ||
| } | ||
| failedTaskIds.remove(taskToRemove.id()); | ||
| failedTaskIds.remove(taskId); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -301,6 +303,11 @@ public synchronized Collection<Task> activeTasks() { | |
| return Collections.unmodifiableCollection(activeTasksPerId.values()); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized Collection<Task> standbyTasks() { | ||
| return Collections.unmodifiableCollection(standbyTasksPerId.values()); | ||
| } | ||
|
|
||
| /** | ||
| * All tasks returned by any of the getters are read-only and should NOT be modified; | ||
| * and the returned task could be modified by other threads concurrently | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change? It seems you try to allow to register a second state listener? In general, we allow to only register one -- why do we need two? (Also, if we need multiple, why limit to two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it overrides the default one, and KafkaStreams#state doesn't work.
with this change if we add a special state listener, we keep the old one as well.
if for some reason we would like to add two additional state listeners, this implementation will work too. we will just need to call it twice. then it will wrap it twice and all three listeners will be called instead of one.
the reason I changed the method was pretty much because I wanted to have a listener and a to be able to call KafkaStreams#state and get the actual state as well