diff --git a/app/src/main/scala/hstream/server/KafkaBroker.scala b/app/src/main/scala/hstream/server/KafkaBroker.scala index 9a3f625..99db8dc 100644 --- a/app/src/main/scala/hstream/server/KafkaBroker.scala +++ b/app/src/main/scala/hstream/server/KafkaBroker.scala @@ -3,7 +3,7 @@ package kafka.server import org.apache.kafka.common.network.ListenerName import org.apache.kafka.metadata.BrokerState import org.apache.kafka.common.utils.Time -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, Path, Paths, StandardOpenOption} import kafka.utils.Logging import kafka.network.SocketServer @@ -32,8 +32,11 @@ class KafkaBroker( s"kafka-tests-${scala.util.Properties.userName}-$rand" } + var logDir: Path = _ + // TODO: TMP_FOR_HSTREAM - def startup() = { + def startup(logPath: Path) = { + logDir = logPath if (sys.env.getOrElse("CONFIG_FILE", "").trim.isEmpty) { // TODO throw new NotImplementedError("KafkaBroker.startup") @@ -65,7 +68,10 @@ class KafkaBroker( val dockerCmd = s"docker run $rmArg -d --network host --name $containerName -v $storeDir:/data/store $image $command" info(s"=> Start hserver by: $dockerCmd") - dockerCmd.run() + val code = dockerCmd.! + if (code != 0) { + throw new RuntimeException(s"Failed to start broker, exit code: $code") + } } else { throw new NotImplementedError("startup: spec is invalid!") } @@ -93,13 +99,34 @@ class KafkaBroker( .getOrElse("container_logs", throw new IllegalArgumentException("container_logs is required")) .asInstanceOf[Boolean] ) { - val testFilename = config.testingConfig - .getOrElse("test.filename", throw new IllegalArgumentException("test.filename is required")) - .asInstanceOf[String] - val proj = sys.props.get("user.dir").getOrElse(".") - val containerLogsDir = s"$proj/build/reports/logs/$testFilename-${System.currentTimeMillis()}" - Files.createDirectories(Paths.get(containerLogsDir)) - s"bash -c 'docker logs $containerName > $containerLogsDir/$containerName.log 2>&1'".! + Files.createDirectories(logDir) + val fileName = Paths.get(s"$logDir/$containerName.log") + if (!Files.exists(fileName)) { + Files.createFile(fileName) + } +// val code = s"bash -c 'docker logs $containerName >> $fileName 2>&1'".! + val cmd = Seq("docker", "logs", containerName) + + val processLogger = ProcessLogger( + stdout => Files.writeString(fileName, stdout + "\n", StandardOpenOption.APPEND), + stderr => Files.writeString(fileName, stderr + "\n", StandardOpenOption.APPEND) + ) + info(s"get logs from $containerName in ${System.currentTimeMillis()}") + val code = Process(cmd).!(processLogger) + + if (code != 0) { + error(s"Failed to dump logs to $fileName, exit code: $code") + // 执行 docker ps -a 并打印结果 + val psCmd = Seq("docker", "ps", "-a") + val psProcessLogger = ProcessLogger( + stdout => info(stdout), + stderr => error(stderr) + ) + Process(psCmd).!(psProcessLogger) + } else { + Files.writeString(fileName, "==============================================\n", StandardOpenOption.APPEND) + info(s"Dump logs to $fileName") + } } // Remove broker container if ( @@ -107,27 +134,37 @@ class KafkaBroker( .getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required")) .asInstanceOf[Boolean] ) { - s"docker rm -f $containerName".! + val cmd = s"docker rm -f $containerName" + Process(cmd).! + info(s"Remove container $containerName in ${System.currentTimeMillis()}") + info(s"------- Current containers -------") + val psCmd = Seq("docker", "ps", "-a") + val psProcessLogger = ProcessLogger( + stdout => info(stdout), + stderr => error(stderr) + ) + Process(psCmd).!(psProcessLogger) + info(s"------- End of show current containers -------") } - // Delete all logs - val storeAdminPort = config.testingConfig - .getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required")) - .asInstanceOf[Int] - val deleteLogProc = - s"docker run --rm --network host hstreamdb/hstream bash -c 'echo y | hadmin-store --port $storeAdminPort logs remove --path /hstream -r'" - .run() - val code = deleteLogProc.exitValue() - // TODO: remove a non-exist log should be OK - // if (code != 0) { - // throw new RuntimeException(s"Failed to delete logs, exit code: $code") - // } - - // Delete all metastore(zk) nodes - val metastorePort = config.testingConfig - .getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required")) - .asInstanceOf[Int] - s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".! +// // Delete all logs +// val storeAdminPort = config.testingConfig +// .getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required")) +// .asInstanceOf[Int] +// val deleteLogProc = +// s"docker run --rm --network host hstreamdb/hstream bash -c 'echo y | hadmin-store --port $storeAdminPort logs remove --path /hstream -r'" +// .run() +// val code = deleteLogProc.exitValue() +// // TODO: remove a non-exist log should be OK +// // if (code != 0) { +// // throw new RuntimeException(s"Failed to delete logs, exit code: $code") +// // } +// +// // Delete all metastore(zk) nodes +// val metastorePort = config.testingConfig +// .getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required")) +// .asInstanceOf[Int] +// s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".! } else { throw new NotImplementedError("shutdown: spec is invalid!") } diff --git a/app/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/app/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala new file mode 100644 index 0000000..17603ad --- /dev/null +++ b/app/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} + +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ + +class ListOffsetsIntegrationTest extends KafkaServerTestHarness { + + val topicName = "foo" + var adminClient: Admin = _ + private var testInfo: TestInfo = _ + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + this.testInfo = testInfo + super.setUp(testInfo) + createTopic(topicName, 1, 1.toShort) + produceMessages() + adminClient = Admin.create(Map[String, Object]( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() + ).asJava) + } + + @AfterEach + override def tearDown(): Unit = { + Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") + super.tearDown() + } + + @Test + def testEarliestOffset(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) + assertEquals(0, earliestOffset.offset()) + } + + @Test + def testLatestOffset(): Unit = { + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) + assertEquals(3, latestOffset.offset()) + } + + @Test + @Disabled("HSTREAM_TESTS: disabled until hstream supports maxTimestamp() offset spec. (ListOffsets v7+)") + def testMaxTimestampOffset(): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) + assertEquals(1, maxTimestampOffset.offset()) + } + + private def runFetchOffsets(adminClient: Admin, + offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { + val tp = new TopicPartition(topicName, 0) + adminClient.listOffsets(Map( + tp -> offsetSpec + ).asJava, new ListOffsetsOptions()).all().get().get(tp) + } + + def produceMessages(): Unit = { + val records = Seq( + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10000)), + ) + TestUtils.produceMessages(servers, records, -1) + } + + def generateConfigs: Seq[KafkaConfig] = + // KAFKA + // TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) + TestUtils.createBrokerConfigs(1, metaStoreConnect, testInfo).map(KafkaConfig.fromProps) +} diff --git a/app/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/app/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala new file mode 100644 index 0000000..4f65878 --- /dev/null +++ b/app/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -0,0 +1,564 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + + package kafka.api + + import java.time + import java.util.concurrent._ + import java.util.{Collection, Collections, Properties} + import kafka.server.KafkaConfig + import kafka.utils.{FixedPortTestUtils, Logging, TestUtils} + import org.apache.kafka.clients.consumer._ + import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + import org.apache.kafka.common.TopicPartition + import org.apache.kafka.common.errors.GroupMaxSizeReachedException + import org.apache.kafka.common.message.FindCoordinatorRequestData + import org.apache.kafka.common.protocol.Errors + import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse} + import org.apache.kafka.server.util.ShutdownableThread + import org.junit.jupiter.api.Assertions._ + import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo, Timeout} + + import java.time.Duration + import scala.annotation.nowarn + import scala.jdk.CollectionConverters._ + import scala.collection.{Seq, mutable} + import scala.sys.process._ + + /** + * Integration tests for the consumer that cover basic usage as well as server failures + */ + class ConsumerBounceTest extends AbstractConsumerTest with Logging { + val maxGroupSize = 5 + + // Time to process commit and leave group requests in tests when brokers are available + val gracefulCloseTimeMs = Some(1000L) + val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2) + val consumerPollers: mutable.Buffer[ConsumerAssignmentPoller] = mutable.Buffer[ConsumerAssignmentPoller]() + + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false") + + override def generateConfigs: Seq[KafkaConfig] = { + generateKafkaConfigs() + } + + private def generateKafkaConfigs(maxGroupSize: String = maxGroupSize.toString): Seq[KafkaConfig] = { + val properties = new Properties +// properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset +// properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") +// properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout +// properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") +// properties.put(KafkaConfig.GroupMaxSizeProp, maxGroupSize) +// properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") + + FixedPortTestUtils.createBrokerConfigs(brokerCount, metaStoreConnect, testInfo) + .map(KafkaConfig.fromProps(_, properties)) + } + + private var testInfo: TestInfo = _ + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + this.testInfo = testInfo + super.setUp(testInfo) + } + + @AfterEach + override def tearDown(): Unit = { + try { + consumerPollers.foreach(_.shutdown()) + executor.shutdownNow() + // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread + assertTrue(executor.awaitTermination(5000, TimeUnit.MILLISECONDS), "Executor did not terminate") + } finally { + super.tearDown() + } + } + + @Test + @Timeout(value = 120) + def testConsumptionWithBrokerFailures(): Unit = consumeWithBrokerFailures(10) + + /* + * 1. Produce a bunch of messages + * 2. Then consume the messages while killing and restarting brokers at random + */ + def consumeWithBrokerFailures(numIters: Int): Unit = { + val numRecords = 1000 + val producer = createProducer() + producerSend(producer, numRecords) + + var consumed = 0L + val consumer = createConsumer() + + consumer.subscribe(Collections.singletonList(topic)) + Thread.sleep(2000) // sleep sometime to make sure consumer group is active + + // 通过 docker ps -a 打印所有容器 + val cmd = Seq("docker", "ps", "-a") + val processLogger = ProcessLogger( + stdout => info(stdout), + stderr => error(stderr) + ) + Process(cmd).!(processLogger) + val scheduler = new BounceBrokerScheduler(numIters) + scheduler.start() + + while (scheduler.isRunning) { + val records = consumer.poll(Duration.ofMillis(100)).asScala + + for (record <- records) { + assertEquals(consumed, record.offset()) + consumed += 1 + } + println(s"========= consumed = $consumed") + + if (records.nonEmpty) { + consumer.commitSync() + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) + + if (consumer.position(tp) == numRecords) { + consumer.seekToBeginning(Collections.emptyList()) + consumed = 0 + } + } + } + scheduler.shutdown() + } + + @Test + def testSeekAndCommitWithBrokerFailures(): Unit = seekAndCommitWithBrokerFailures(5) + + def seekAndCommitWithBrokerFailures(numIters: Int): Unit = { + val numRecords = 1000 + val producer = createProducer() + producerSend(producer, numRecords) + + val consumer = createConsumer() + consumer.assign(Collections.singletonList(tp)) + consumer.seek(tp, 0) + + Thread.sleep(2000) + + // wait until all the followers have synced the last HW with leader +// TestUtils.waitUntilTrue(() => servers.forall(server => +// server.replicaManager.localLog(tp).get.highWatermark == numRecords +// ), "Failed to update high watermark for followers after timeout") + + val scheduler = new BounceBrokerScheduler(numIters) + scheduler.start() + + while(scheduler.isRunning) { + val coin = TestUtils.random.nextInt(3) + if (coin == 0) { + info("Seeking to end of log") + consumer.seekToEnd(Collections.emptyList()) + assertEquals(numRecords.toLong, consumer.position(tp)) + } else if (coin == 1) { + val pos = TestUtils.random.nextInt(numRecords).toLong + info("Seeking to " + pos) + consumer.seek(tp, pos) + assertEquals(pos, consumer.position(tp)) + } else if (coin == 2) { + info("Committing offset.") + consumer.commitSync() + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) + } + } + } + + @Test + @Disabled("close broker auto create topic") + def testSubscribeWhenTopicUnavailable(): Unit = { + val numRecords = 1000 + val newtopic = "newtopic" + + val consumer = createConsumer() + consumer.subscribe(Collections.singleton(newtopic)) +// Thread.sleep(2000) + executor.schedule(new Runnable { + def run() = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount) + }, 2, TimeUnit.SECONDS) + consumer.poll(time.Duration.ZERO) + + val producer = createProducer() + + def sendRecords(numRecords: Int, topic: String): Unit = { + var remainingRecords = numRecords + val endTimeMs = System.currentTimeMillis + 20000 + while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) { + val futures = (0 until remainingRecords).map { i => + producer.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + } + futures.map { future => + try { + future.get + remainingRecords -= 1 + } catch { + case _: Exception => + } + } + } + assertEquals(0, remainingRecords) + } + + val poller = new ConsumerAssignmentPoller(consumer, List(newtopic)) + consumerPollers += poller + poller.start() + sendRecords(numRecords, newtopic) + receiveExactRecords(poller, numRecords, 10000) + poller.shutdown() + + servers.foreach(server => killBroker(server.config.brokerId)) + Thread.sleep(500) + restartDeadBrokers() + + val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic)) + consumerPollers += poller2 + poller2.start() + sendRecords(numRecords, newtopic) + receiveExactRecords(poller, numRecords, 10000L) + } + + @Test + @Disabled("support FindCoordinator Request V4") + def testClose(): Unit = { + val numRecords = 10 + val producer = createProducer() + producerSend(producer, numRecords) + + checkCloseGoodPath(numRecords, "group1") + checkCloseWithCoordinatorFailure(numRecords, "group2", "group3") + checkCloseWithClusterFailure(numRecords, "group4", "group5") + } + + /** + * Consumer is closed while cluster is healthy. Consumer should complete pending offset commits + * and leave group. New consumer instance should be able join group and start consuming from + * last committed offset. + */ + private def checkCloseGoodPath(numRecords: Int, groupId: String): Unit = { + val consumer = createConsumerAndReceive(groupId, false, numRecords) + val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs) + future.get + checkClosedState(groupId, numRecords) + } + + /** + * Consumer closed while coordinator is unavailable. Close of consumers using group + * management should complete after commit attempt even though commits fail due to rebalance. + * Close of consumers using manual assignment should complete with successful commits since a + * broker is available. + */ + private def checkCloseWithCoordinatorFailure(numRecords: Int, dynamicGroup: String, manualGroup: String): Unit = { + val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords) + val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords) + + killBroker(findCoordinator(dynamicGroup)) + killBroker(findCoordinator(manualGroup)) + + submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs).get + submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs).get + + restartDeadBrokers() + checkClosedState(dynamicGroup, 0) + checkClosedState(manualGroup, numRecords) + } + + private def findCoordinator(group: String): Int = { + val request = new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) + .setCoordinatorKeys(Collections.singletonList(group))).build() + var nodeId = -1 + TestUtils.waitUntilTrue(() => { + val response = connectAndReceive[FindCoordinatorResponse](request) + nodeId = response.node.id + response.error == Errors.NONE + }, s"Failed to find coordinator for group $group") + nodeId + } + + /** + * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since + * there is no coordinator, but close should timeout and return. If close is invoked with a very + * large timeout, close should timeout after request timeout. + */ + private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String): Unit = { + val consumer1 = createConsumerAndReceive(group1, false, numRecords) + + val requestTimeout = 6000 + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) + val consumer2 = createConsumerAndReceive(group2, true, numRecords) + + servers.foreach(server => killBroker(server.config.brokerId)) + val closeTimeout = 2000 + val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout)) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout)) + future1.get + future2.get + } + + /** + * If we have a running consumer group of size N, configure consumer.group.max.size = N-1 and restart all brokers, + * the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config. + * Then, 1 consumer should be left out of the group. + */ +// @Test +// @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13421) +// def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = { +// val group = "group-max-size-test" +// val topic = "group-max-size-test" +// val maxGroupSize = 2 +// val consumerCount = maxGroupSize + 1 +// val partitionCount = consumerCount * 2 +// +// this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") +// this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") +// this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +// val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount) +// +// addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), +// consumerPollers, List[String](topic), partitions, group) +// +// // roll all brokers with a lesser max group size to make sure coordinator has the new config +// val newConfigs = generateKafkaConfigs(maxGroupSize.toString) +// for (serverIdx <- servers.indices) { +// killBroker(serverIdx) +// val config = newConfigs(serverIdx) +// servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId)) +// restartDeadBrokers() +// } +// +// def raisedExceptions: Seq[Throwable] = { +// consumerPollers.flatten(_.thrownException) +// } +// +// // we are waiting for the group to rebalance and one member to get kicked +// TestUtils.waitUntilTrue(() => raisedExceptions.nonEmpty, +// msg = "The remaining consumers in the group could not fetch the expected records", 10000L) +// +// assertEquals(1, raisedExceptions.size) +// assertTrue(raisedExceptions.head.isInstanceOf[GroupMaxSizeReachedException]) +// } + + /** + * When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception + */ + @Test + @Disabled("support set GroupMaxSizeProp for broker") + def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = { + val group = "fatal-exception-test" + val topic = "fatal-exception-test" + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + + val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount) + + // Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group + addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), + consumerPollers, List[String](topic), partitions, group) + val (_, rejectedConsumerPollers) = addConsumersToGroup(1, + mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group) + val rejectedConsumer = rejectedConsumerPollers.head + TestUtils.waitUntilTrue(() => { + rejectedConsumer.thrownException.isDefined + }, "Extra consumer did not throw an exception") + assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException]) + + // assert group continues to live + producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size)) + TestUtils.waitUntilTrue(() => { + consumerPollers.forall(p => p.receivedMessages >= 100) + }, "The consumers in the group could not fetch the expected records", 10000L) + } + + /** + * Consumer is closed during rebalance. Close should leave group and close + * immediately if rebalance is in progress. If brokers are not available, + * close should terminate immediately without sending leave group. + */ + @Test + @Disabled("slow rebalance") + def testCloseDuringRebalance(): Unit = { + val topic = "closetest" + createTopic(topic, 10, brokerCount) + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + checkCloseDuringRebalance("group1", topic, executor, true) + } + + @nowarn("cat=deprecation") + private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = { + + def subscribeAndPoll(consumer: Consumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = { + executor.submit(() => { + consumer.subscribe(Collections.singletonList(topic)) + revokeSemaphore.foreach(s => s.release()) + // requires to used deprecated `poll(long)` to trigger metadata update + consumer.poll(0L) + }, 0) + } + + def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: Consumer[Array[Byte], Array[Byte]]*): Unit = { + val startMs = System.currentTimeMillis + while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) + otherConsumers.foreach(consumer => consumer.poll(time.Duration.ofMillis(100L))) + assertTrue(future.isDone, "Rebalance did not complete in time") + } + + def createConsumerToRebalance(): Future[Any] = { + val consumer = createConsumerWithGroupId(groupId) + val rebalanceSemaphore = new Semaphore(0) + val future = subscribeAndPoll(consumer, Some(rebalanceSemaphore)) + // Wait for consumer to poll and trigger rebalance + assertTrue(rebalanceSemaphore.tryAcquire(2000, TimeUnit.MILLISECONDS), "Rebalance not triggered") + // Rebalance is blocked by other consumers not polling + assertFalse(future.isDone, "Rebalance completed too early") + future + } + val consumer1 = createConsumerWithGroupId(groupId) + waitForRebalance(2000, subscribeAndPoll(consumer1)) + val consumer2 = createConsumerWithGroupId(groupId) + waitForRebalance(2000, subscribeAndPoll(consumer2), consumer1) + val rebalanceFuture = createConsumerToRebalance() + + // consumer1 should leave group and close immediately even though rebalance is in progress + val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs) + + // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group + waitForRebalance(2000, rebalanceFuture, consumer2) + + // Trigger another rebalance and shutdown all brokers + // This consumer poll() doesn't complete and `tearDown` shuts down the executor and closes the consumer + createConsumerToRebalance() + servers.foreach(server => killBroker(server.config.brokerId)) + + // consumer2 should close immediately without LeaveGroup request since there are no brokers available + val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0)) + + // Ensure futures complete to avoid concurrent shutdown attempt during test cleanup + closeFuture1.get(2000, TimeUnit.MILLISECONDS) + closeFuture2.get(2000, TimeUnit.MILLISECONDS) + } + + private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): Consumer[Array[Byte], Array[Byte]] = { + val consumer = createConsumerWithGroupId(groupId) + val consumerPoller = if (manualAssign) + subscribeConsumerAndStartPolling(consumer, List(), Set(tp)) + else + subscribeConsumerAndStartPolling(consumer, List(topic)) + + receiveExactRecords(consumerPoller, numRecords) + consumerPoller.shutdown() + consumer + } + + private def receiveExactRecords(consumer: ConsumerAssignmentPoller, numRecords: Int, timeoutMs: Long = 60000): Unit = { + TestUtils.waitUntilTrue(() => { + consumer.receivedMessages == numRecords + }, s"Consumer did not receive expected $numRecords. It received ${consumer.receivedMessages}", timeoutMs) + } + + private def submitCloseAndValidate(consumer: Consumer[Array[Byte], Array[Byte]], + closeTimeoutMs: Long, minCloseTimeMs: Option[Long], maxCloseTimeMs: Option[Long]): Future[Any] = { + executor.submit(() => { + val closeGraceTimeMs = 2000 + val startMs = System.currentTimeMillis() + info("Closing consumer with timeout " + closeTimeoutMs + " ms.") + consumer.close(time.Duration.ofMillis(closeTimeoutMs)) + val timeTakenMs = System.currentTimeMillis() - startMs + maxCloseTimeMs.foreach { ms => + assertTrue(timeTakenMs < ms + closeGraceTimeMs, "Close took too long " + timeTakenMs) + } + minCloseTimeMs.foreach { ms => + assertTrue(timeTakenMs >= ms, "Close finished too quickly " + timeTakenMs) + } + info("consumer.close() completed in " + timeTakenMs + " ms.") + }, 0) + } + + private def checkClosedState(groupId: String, committedRecords: Int): Unit = { + // Check that close was graceful with offsets committed and leave group sent. + // New instance of consumer should be assigned partitions immediately and should see committed offsets. + val assignSemaphore = new Semaphore(0) + val consumer = createConsumerWithGroupId(groupId) + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { + def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = { + assignSemaphore.release() + } + def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = { + }}) + + TestUtils.waitUntilTrue(() => { + consumer.poll(time.Duration.ofMillis(100L)) + assignSemaphore.tryAcquire() + }, "Assignment did not complete on time") + + if (committedRecords > 0) + assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + consumer.close() + } + + private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false) { + var iter: Int = 0 + + override def doWork(): Unit = { + info(s"!!!!!iter = $iter") + killRandomBroker() + Thread.sleep(500) + restartDeadBrokers() + Thread.sleep(500) + info(s"!!!!!iter = $iter done!!!!!!") + + iter += 1 + if (iter == numIters) + initiateShutdown() + else + Thread.sleep(500) + } + } + + private def createTopicPartitions(topic: String, numPartitions: Int, replicationFactor: Int, + topicConfig: Properties = new Properties): Set[TopicPartition] = { + createTopic(topic, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig) + Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet + } + + private def producerSend(producer: KafkaProducer[Array[Byte], Array[Byte]], + numRecords: Int, + topic: String = this.topic, + numPartitions: Option[Int] = None): Unit = { + var partitionIndex = 0 + def getPartition: Int = { + numPartitions match { + case Some(partitions) => + val nextPart = partitionIndex % partitions + partitionIndex += 1 + nextPart + case None => part + } + } + + val futures = (0 until numRecords).map { i => + producer.send(new ProducerRecord(topic, getPartition, i.toString.getBytes, i.toString.getBytes)) + } + futures.map(_.get) + } + + } diff --git a/app/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/app/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index d0a2c77..185e9c2 100755 --- a/app/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/app/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -41,7 +41,8 @@ class ProducerCompressionTest extends QuorumTestHarness { // TMP_FOR_HSTREAM // val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull) val props = TestUtils.createBrokerConfig(brokerId, metaStoreConnect, testInfo) - broker = createBroker(new KafkaConfig(props)) + val logDir = TestUtils.generateLogDir(testInfo) + broker = createBroker(new KafkaConfig(props), logDir = logDir) } @AfterEach diff --git a/app/src/test/scala/utils/kafka/KafkaServerTestHarness.scala b/app/src/test/scala/utils/kafka/KafkaServerTestHarness.scala index fd217cc..1fbaeb8 100644 --- a/app/src/test/scala/utils/kafka/KafkaServerTestHarness.scala +++ b/app/src/test/scala/utils/kafka/KafkaServerTestHarness.scala @@ -37,7 +37,9 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.utils.Time +import java.nio.file.{Files, Path, Paths} // import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT +import scala.sys.process._ /** * A test harness that brings up some number of broker nodes @@ -111,6 +113,30 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { protected def clientSaslProperties: Option[Properties] = None protected def brokerTime(brokerId: Int): Time = Time.SYSTEM + var testLogDir: Path = _ + + def logDir: Path = { + if (testLogDir != null) { + return testLogDir + } + + val config = configs.head + if ( + config.testingConfig + .getOrElse("container_logs", throw new IllegalArgumentException("container_logs is required")) + .asInstanceOf[Boolean] + ) { + val testFilename = config.testingConfig + .getOrElse("test.filename", throw new IllegalArgumentException("test.filename is required")) + .asInstanceOf[String] + val proj = sys.props.get("user.dir").getOrElse(".") + val containerLogsDir = s"$proj/build/reports/logs/$testFilename-${System.currentTimeMillis()}" + val dirs = Paths.get(containerLogsDir) + testLogDir = dirs + } + testLogDir + } + @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) @@ -121,8 +147,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { // default implementation is a no-op, it is overridden by subclasses if required configureSecurityBeforeServersStart(testInfo) - createBrokers(startup = true) - + createBrokers(startup = true, logDir) // default implementation is a no-op, it is overridden by subclasses if required configureSecurityAfterServersStart() @@ -130,6 +155,26 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { @AfterEach override def tearDown(): Unit = { + val config = configs.head + // Delete all logs + val storeAdminPort = config.testingConfig + .getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required")) + .asInstanceOf[Int] + val deleteLogProc = + s"docker run --rm --network host hstreamdb/hstream bash -c 'echo y | hadmin-store --port $storeAdminPort logs remove --path /hstream -r'" + .run() + val code = deleteLogProc.exitValue() + // TODO: remove a non-exist log should be OK + // if (code != 0) { + // throw new RuntimeException(s"Failed to delete logs, exit code: $code") + // } + + // Delete all metastore(zk) nodes + val metastorePort = config.testingConfig + .getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required")) + .asInstanceOf[Int] + s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".! + TestUtils.shutdownServers(_brokers) super.tearDown() } @@ -281,9 +326,17 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { def killBroker(index: Int): Unit = { if(alive(index)) { + // 记录起始时间 + val start = System.currentTimeMillis() _brokers(index).shutdown() _brokers(index).awaitShutdown() alive(index) = false + // 记录结束时间 + val end = System.currentTimeMillis() + // 打印时间差 + info("!!!!!!!!!!Kill broker %d, time: %ds".format(index, (end - start) / 1000)) + } else { + info("!!!!!!!!!!Broker %d is already dead, skip killBroker".format(index)) } } @@ -300,8 +353,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { if (reconfigure) { _brokers(i) = createBrokerFromConfig(configs(i)) } - _brokers(i).startup() + _brokers(i).startup(logDir) alive(i) = true + info("!!!!!!!!!!Restart broker %d".format(i)) } } @@ -357,7 +411,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { // } // } - private def createBrokers(startup: Boolean): Unit = { + private def createBrokers(startup: Boolean, logDir: Path): Unit = { // Add each broker to `brokers` buffer as soon as it is created to ensure that brokers // are shutdown cleanly in tearDown even if a subsequent broker fails to start val potentiallyRegeneratedConfigs = configs @@ -366,8 +420,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { for (config <- potentiallyRegeneratedConfigs) { val broker = createBrokerFromConfig(config) _brokers += broker + info("Created broker %d at %s".format(broker.config.brokerId, broker.boundPort(listenerName))) if (startup) { - broker.startup() + broker.startup(logDir) alive(_brokers.length - 1) = true } } @@ -385,7 +440,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { // enableZkApiForwarding = isZkMigrationTest() || (config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled) // ) // } - createBroker(config, brokerTime(config.brokerId), startup = false) + createBroker(config, brokerTime(config.brokerId), startup = false, logDir) } def aliveBrokers: Seq[KafkaBroker] = { diff --git a/app/src/test/scala/utils/kafka/QuorumTestHarness.scala b/app/src/test/scala/utils/kafka/QuorumTestHarness.scala index c82807e..8fafddb 100755 --- a/app/src/test/scala/utils/kafka/QuorumTestHarness.scala +++ b/app/src/test/scala/utils/kafka/QuorumTestHarness.scala @@ -16,6 +16,7 @@ package kafka.server import java.io.{ByteArrayOutputStream, File, PrintStream} import java.net.InetSocketAddress +import java.nio.file.Path import java.util import java.util.{Collections, Properties} import java.util.concurrent.{CompletableFuture, TimeUnit} @@ -62,6 +63,7 @@ trait QuorumImplementation { config: KafkaConfig, time: Time = Time.SYSTEM, startup: Boolean = true, + logDir: Path, threadNamePrefix: Option[String] = None ): KafkaBroker @@ -79,11 +81,12 @@ class ZooKeeperQuorumImplementation( config: KafkaConfig, time: Time, startup: Boolean, + logDir: Path, threadNamePrefix: Option[String] ): KafkaBroker = { // val server = new KafkaServer(config, time, threadNamePrefix, false) val server = new KafkaBroker(config, time, threadNamePrefix) - if (startup) server.startup() + if (startup) server.startup(logDir) server } @@ -279,9 +282,10 @@ abstract class QuorumTestHarness extends Logging { config: KafkaConfig, time: Time = Time.SYSTEM, startup: Boolean = true, + logDir: Path, threadNamePrefix: Option[String] = None ): KafkaBroker = { - implementation.createBroker(config, time, startup, threadNamePrefix) + implementation.createBroker(config, time, startup, logDir, threadNamePrefix) } // def shutdownZooKeeper(): Unit = asZk().shutdown() diff --git a/app/src/test/scala/utils/kafka/utils/FixedPortTestUtils.scala b/app/src/test/scala/utils/kafka/utils/FixedPortTestUtils.scala new file mode 100644 index 0000000..d0632e4 --- /dev/null +++ b/app/src/test/scala/utils/kafka/utils/FixedPortTestUtils.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.utils + +import kafka.utils.TestUtils +import org.junit.jupiter.api.TestInfo + +import java.io.IOException +import java.net.ServerSocket +import java.util.Properties + +/** + * DO NOT USE THESE UTILITIES UNLESS YOU ABSOLUTELY MUST + * + * These are utilities for selecting fixed (preselected), ephemeral ports to use with tests. This is not a reliable way + * of testing on most machines because you can easily run into port conflicts. If you're using this class, you're almost + * certainly doing something wrong unless you can prove that your test **cannot function** properly without it. + */ +object FixedPortTestUtils { + def choosePorts(count: Int): Seq[Int] = { + try { + val sockets = (0 until count).map(_ => new ServerSocket(0)) + val ports = sockets.map(_.getLocalPort()) + sockets.foreach(_.close()) + ports + } catch { + case e: IOException => throw new RuntimeException(e) + } + } + + def createBrokerConfigs(numConfigs: Int, + zkConnect: String, + testInfo: TestInfo, + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Seq[Properties] = { + val ports = FixedPortTestUtils.choosePorts(numConfigs) + (0 until numConfigs).map { node => +// TestUtils.createBrokerConfig(node, zkConnect, testInfo, port = ports(node)) + TestUtils.createBrokerConfig(node, zkConnect, testInfo, port = ports(node)) + } + } + +} diff --git a/app/src/test/scala/utils/kafka/utils/TestUtils.scala b/app/src/test/scala/utils/kafka/utils/TestUtils.scala index e897061..8cd9990 100644 --- a/app/src/test/scala/utils/kafka/utils/TestUtils.scala +++ b/app/src/test/scala/utils/kafka/utils/TestUtils.scala @@ -19,7 +19,7 @@ import java.net.{InetAddress, ServerSocket} import java.nio._ import java.nio.channels._ import java.nio.charset.{Charset, StandardCharsets} -import java.nio.file.{Files, StandardOpenOption} +import java.nio.file.{Files, Path, Paths, StandardOpenOption} import java.security.cert.X509Certificate import java.time.Duration import java.util @@ -2775,8 +2775,8 @@ object TestUtils extends Logging { // generate val props = (startingIdNumber to endingIdNumber).zipWithIndex.map { case (nodeId, idx) => val prop = new Properties - val port = basePort + idx * 2 - val gossipPort = basePort + idx * 2 + 1 + val port = basePort + nodeId * 2 + val gossipPort = basePort + nodeId * 2 + 1 // broker config prop.put("broker.id", nodeId.toString) prop.put("port", port.toString) @@ -2820,4 +2820,11 @@ object TestUtils extends Logging { testName.replaceAll("""\(|\)|\s""", "_").replaceAll("_*$", "") } + def generateLogDir(testInfo: TestInfo): Path = { + val testFilename = formatTestNameAsFile(testInfo.getDisplayName) + val proj = sys.props.get("user.dir").getOrElse(".") + val containerLogsDir = s"$proj/build/reports/logs/$testFilename-${System.currentTimeMillis()}" + Paths.get(containerLogsDir) + } + }