Skip to content

Consume bounce #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions app/src/main/scala/hstream/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka.server
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.common.utils.Time
import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import kafka.utils.Logging
import kafka.network.SocketServer

Expand Down Expand Up @@ -32,8 +32,11 @@ class KafkaBroker(
s"kafka-tests-${scala.util.Properties.userName}-$rand"
}

var logDir: Path = _

// TODO: TMP_FOR_HSTREAM
def startup() = {
def startup(logPath: Path) = {
logDir = logPath
if (sys.env.getOrElse("CONFIG_FILE", "").trim.isEmpty) {
// TODO
throw new NotImplementedError("KafkaBroker.startup")
Expand Down Expand Up @@ -65,7 +68,10 @@ class KafkaBroker(
val dockerCmd =
s"docker run $rmArg -d --network host --name $containerName -v $storeDir:/data/store $image $command"
info(s"=> Start hserver by: $dockerCmd")
dockerCmd.run()
val code = dockerCmd.!
if (code != 0) {
throw new RuntimeException(s"Failed to start broker, exit code: $code")
}
} else {
throw new NotImplementedError("startup: spec is invalid!")
}
Expand Down Expand Up @@ -93,41 +99,72 @@ class KafkaBroker(
.getOrElse("container_logs", throw new IllegalArgumentException("container_logs is required"))
.asInstanceOf[Boolean]
) {
val testFilename = config.testingConfig
.getOrElse("test.filename", throw new IllegalArgumentException("test.filename is required"))
.asInstanceOf[String]
val proj = sys.props.get("user.dir").getOrElse(".")
val containerLogsDir = s"$proj/build/reports/logs/$testFilename-${System.currentTimeMillis()}"
Files.createDirectories(Paths.get(containerLogsDir))
s"bash -c 'docker logs $containerName > $containerLogsDir/$containerName.log 2>&1'".!
Files.createDirectories(logDir)
val fileName = Paths.get(s"$logDir/$containerName.log")
if (!Files.exists(fileName)) {
Files.createFile(fileName)
}
// val code = s"bash -c 'docker logs $containerName >> $fileName 2>&1'".!
val cmd = Seq("docker", "logs", containerName)

val processLogger = ProcessLogger(
stdout => Files.writeString(fileName, stdout + "\n", StandardOpenOption.APPEND),
stderr => Files.writeString(fileName, stderr + "\n", StandardOpenOption.APPEND)
)
info(s"get logs from $containerName in ${System.currentTimeMillis()}")
val code = Process(cmd).!(processLogger)

if (code != 0) {
error(s"Failed to dump logs to $fileName, exit code: $code")
// 执行 docker ps -a 并打印结果
val psCmd = Seq("docker", "ps", "-a")
val psProcessLogger = ProcessLogger(
stdout => info(stdout),
stderr => error(stderr)
)
Process(psCmd).!(psProcessLogger)
} else {
Files.writeString(fileName, "==============================================\n", StandardOpenOption.APPEND)
info(s"Dump logs to $fileName")
}
}
// Remove broker container
if (
config.testingConfig
.getOrElse("container_remove", throw new IllegalArgumentException("container_remove is required"))
.asInstanceOf[Boolean]
) {
s"docker rm -f $containerName".!
val cmd = s"docker rm -f $containerName"
Process(cmd).!
info(s"Remove container $containerName in ${System.currentTimeMillis()}")
info(s"------- Current containers -------")
val psCmd = Seq("docker", "ps", "-a")
val psProcessLogger = ProcessLogger(
stdout => info(stdout),
stderr => error(stderr)
)
Process(psCmd).!(psProcessLogger)
info(s"------- End of show current containers -------")
}

// Delete all logs
val storeAdminPort = config.testingConfig
.getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required"))
.asInstanceOf[Int]
val deleteLogProc =
s"docker run --rm --network host hstreamdb/hstream bash -c 'echo y | hadmin-store --port $storeAdminPort logs remove --path /hstream -r'"
.run()
val code = deleteLogProc.exitValue()
// TODO: remove a non-exist log should be OK
// if (code != 0) {
// throw new RuntimeException(s"Failed to delete logs, exit code: $code")
// }

// Delete all metastore(zk) nodes
val metastorePort = config.testingConfig
.getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required"))
.asInstanceOf[Int]
s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".!
// // Delete all logs
// val storeAdminPort = config.testingConfig
// .getOrElse("store_admin_port", throw new IllegalArgumentException("store_admin_port is required"))
// .asInstanceOf[Int]
// val deleteLogProc =
// s"docker run --rm --network host hstreamdb/hstream bash -c 'echo y | hadmin-store --port $storeAdminPort logs remove --path /hstream -r'"
// .run()
// val code = deleteLogProc.exitValue()
// // TODO: remove a non-exist log should be OK
// // if (code != 0) {
// // throw new RuntimeException(s"Failed to delete logs, exit code: $code")
// // }
//
// // Delete all metastore(zk) nodes
// val metastorePort = config.testingConfig
// .getOrElse("metastore_port", throw new IllegalArgumentException("metastore_port is required"))
// .asInstanceOf[Int]
// s"docker run --rm --network host zookeeper:3.7 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".!
} else {
throw new NotImplementedError("shutdown: spec is invalid!")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.admin

import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._

class ListOffsetsIntegrationTest extends KafkaServerTestHarness {

val topicName = "foo"
var adminClient: Admin = _
private var testInfo: TestInfo = _

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
this.testInfo = testInfo
super.setUp(testInfo)
createTopic(topicName, 1, 1.toShort)
produceMessages()
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
).asJava)
}

@AfterEach
override def tearDown(): Unit = {
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
super.tearDown()
}

@Test
def testEarliestOffset(): Unit = {
val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
assertEquals(0, earliestOffset.offset())
}

@Test
def testLatestOffset(): Unit = {
val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
assertEquals(3, latestOffset.offset())
}

@Test
@Disabled("HSTREAM_TESTS: disabled until hstream supports maxTimestamp() offset spec. (ListOffsets v7+)")
def testMaxTimestampOffset(): Unit = {
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
assertEquals(1, maxTimestampOffset.offset())
}

private def runFetchOffsets(adminClient: Admin,
offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
val tp = new TopicPartition(topicName, 0)
adminClient.listOffsets(Map(
tp -> offsetSpec
).asJava, new ListOffsetsOptions()).all().get().get(tp)
}

def produceMessages(): Unit = {
val records = Seq(
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
null, new Array[Byte](10000)),
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
null, new Array[Byte](10000)),
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
null, new Array[Byte](10000)),
)
TestUtils.produceMessages(servers, records, -1)
}

def generateConfigs: Seq[KafkaConfig] =
// KAFKA
// TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
TestUtils.createBrokerConfigs(1, metaStoreConnect, testInfo).map(KafkaConfig.fromProps)
}
Loading
Loading