Skip to content
This repository was archived by the owner on Mar 22, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Gatling-Kafka [![Build Status](https://travis-ci.org/mnogu/gatling-kafka.svg?branch=master)](https://travis-ci.org/mnogu/gatling-kafka)

An unofficial [Gatling](http://gatling.io/) 2.2 stress test plugin
for [Apache Kafka](http://kafka.apache.org/) 0.10 protocol.
for [Apache Kafka](http://kafka.apache.org/) 1.0.1 protocol.

This plugin supports the Kafka producer API only
and doesn't support the Kafka consumer API.
Expand Down
14 changes: 10 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ name := "gatling-kafka"

organization := "com.github.mnogu"

version := "0.1.2-SNAPSHOT"
version := "0.1.3-SNAPSHOT"

scalaVersion := "2.11.8"
scalaVersion := "2.12.6"

lazy val gatingVersion = "2.3.1"
lazy val kafkaVersion = "1.0.1"
lazy val avro4sVersion = "1.9.0"

libraryDependencies ++= Seq(
"io.gatling" % "gatling-core" % "2.2.3" % "provided",
("org.apache.kafka" % "kafka-clients" % "0.10.1.1")
"com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion,
"io.gatling" % "gatling-core" % gatingVersion % "provided",
("org.apache.kafka" % "kafka-clients" % kafkaVersion)
// Gatling contains slf4j-api
.exclude("org.slf4j", "slf4j-api")
)

// Gatling contains scala-library
assemblyOption in assembly := (assemblyOption in assembly).value
.copy(includeScala = false)

3 changes: 2 additions & 1 deletion project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
//addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7")
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.1.2
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
logLevel := Level.Warn
//logLevel := Level.Warn

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")
//addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
//addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
2 changes: 2 additions & 0 deletions sonatype.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
sonatypeProfileName := "com.github.mnogu"

pomExtra in Global := {
Expand All @@ -21,3 +22,4 @@ pomExtra in Global := {
</developer>
</developers>
}
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.mnogu.gatling.kafka.action

import com.github.mnogu.gatling.kafka.protocol.{KafkaComponents, KafkaProtocol}
import com.github.mnogu.gatling.kafka.request.builder.{Avro4sAttributes, KafkaAttributes}
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer

import scala.collection.JavaConverters._


class KafkaAvro4sActionBuilder[T](avro4sAttributes: Avro4sAttributes[T]) extends ActionBuilder {

override def build( ctx: ScenarioContext, next: Action ): Action = {
import ctx.{coreComponents, protocolComponentsRegistry, system, throttled}

val kafkaComponents: KafkaComponents = protocolComponentsRegistry.components(KafkaProtocol.KafkaProtocolKey)

val producer = new KafkaProducer[Nothing,GenericRecord]( kafkaComponents.kafkaProtocol.properties.asJava )

system.registerOnTermination(producer.close())

new KafkaAvro4sRequestAction(
producer,
avro4sAttributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
next
)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.github.mnogu.gatling.kafka.action

import java.util.Date

import com.github.mnogu.gatling.kafka.protocol.KafkaProtocol
import com.github.mnogu.gatling.kafka.request.builder.{Avro4sAttributes, KafkaAttributes}
import io.gatling.commons.stats.{KO, OK}
import io.gatling.commons.util.ClockSingleton._
import io.gatling.commons.validation.Validation
import io.gatling.core.CoreComponents
import io.gatling.core.action.{Action, ExitableAction}
import io.gatling.core.session._
import io.gatling.core.stats.message.ResponseTimings
import io.gatling.core.util.NameGen
import org.apache.kafka.clients.producer._
import com.sksamuel.avro4s._
import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
import org.apache.avro.Schema
import org.apache.avro.Schema.Field
import org.apache.avro.generic.GenericRecord

import scala.util.{Failure, Success}

class KafkaAvro4sRequestAction[T](val producer: KafkaProducer[Nothing,GenericRecord],
val avro4sAttributes: Avro4sAttributes[T],
val coreComponents: CoreComponents,
val kafkaProtocol: KafkaProtocol,
val throttled: Boolean,
val next: Action )
extends ExitableAction with NameGen {

// DateToValue, DateFromValue and DateToSchema are required to teach avro4s how
// to serialize/deserialize a java.util.Date
implicit object DateToValue extends ToValue[Date] {
override def apply(value: Date): Long = {
value.getTime
}
}

implicit object DateFromValue extends FromValue[Date] {
override def apply(value: Any, field: Field): Date = {
new Date(value.asInstanceOf[Long])
}
}

implicit object DateToSchema extends ToSchema[Date] {
override val schema: Schema = Schema.create(Schema.Type.LONG)
}

implicit val schema = avro4sAttributes.schema
implicit val fromRecord = avro4sAttributes.fromRecord

val statsEngine = coreComponents.statsEngine
override val name = genName("kafkaRequest")

override def execute(session: Session): Unit = recover(session) {

avro4sAttributes requestName session flatMap { requestName =>

val outcome =
sendRequest(
requestName,
producer,
avro4sAttributes,
throttled,
session)

outcome.onFailure(
errorMessage =>
statsEngine.reportUnbuildableRequest(session, requestName, errorMessage)
)

outcome

}

}

private def sendRequest( requestName: String,
producer: Producer[Nothing,GenericRecord],
avro4sAttributes: Avro4sAttributes[T],
throttled: Boolean,
session: Session ): Validation[Unit] = {

avro4sAttributes payload session map { payload =>

val json = payload.asInstanceOf[String]

logger.debug(s"sendRequest received json: ${json}")

// payload should be a json string
val in = new ByteInputStream(json.getBytes("UTF-8"), json.size)
val input = AvroInputStream.json[T](in)
input.singleEntity match {
case Success(cassclass) => // should contain the case class of [T]
val record = new ProducerRecord(kafkaProtocol.topic, avro4sAttributes.avroFormat.to(cassclass))

val requestStartDate = nowMillis

val x = producer.send(record, new Callback() {

override def onCompletion(m: RecordMetadata, e: Exception): Unit = {

val requestEndDate = nowMillis
statsEngine.logResponse(
session,
requestName,
ResponseTimings(startTimestamp = requestStartDate, endTimestamp = requestEndDate),
if (e == null) OK else KO,
None,
if (e == null) None else Some(e.getMessage)
)

if (throttled) {
coreComponents.throttler.throttle(session.scenario, () => next ! session)
} else {
next ! session
}

}
})

case Failure(ex) =>
logger.error("Failure while converting JSON to case class:" + ex.getCause + ":" + ex.getMessage)
throw ex
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.github.mnogu.gatling.kafka.request.builder

import com.github.mnogu.gatling.kafka.action.KafkaRequestActionBuilder
import com.github.mnogu.gatling.kafka.action.{KafkaAvro4sActionBuilder, KafkaRequestActionBuilder}
import com.sksamuel.avro4s.{FromRecord, RecordFormat, SchemaFor}
import io.gatling.core.session._

case class KafkaAttributes[K,V]( requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V] )

case class Avro4sAttributes[T]( requestName: Expression[String],
schema: SchemaFor[T],
avroFormat: RecordFormat[T],
fromRecord: FromRecord[T],
payload: Expression[String]
)

case class KafkaRequestBuilder(requestName: Expression[String]) {

def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None)
Expand All @@ -16,4 +24,6 @@ case class KafkaRequestBuilder(requestName: Expression[String]) {
private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) =
new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload))

def sendAvro[T](schema: SchemaFor[T], format: RecordFormat[T], fromRecord: FromRecord[T], payload: Expression[String]): KafkaAvro4sActionBuilder[T] = new KafkaAvro4sActionBuilder(Avro4sAttributes(requestName, schema, format, fromRecord, payload))

}
11 changes: 11 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
kafka {

brokers = "localhost:9092"

group = "test-streaming"

topics = "composer"

schema-registry-url = "http://localhost:8081"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.mnogu.gatling.kafka.test

import com.github.mnogu.gatling.kafka.Predef._
import com.sksamuel.avro4s.{FromRecord, RecordFormat, SchemaFor, ToRecord}
import com.typesafe.config.ConfigFactory
import io.gatling.core.Predef._
import org.apache.kafka.clients.producer.ProducerConfig

import scala.concurrent.duration._

class Avtro4Simulation extends Simulation {
def loadConfigMap(kafkaBrokers: String, kafkaSchemaRegistryUrl: String): Map[String, String] = {
Map(
ProducerConfig.ACKS_CONFIG -> "1",
// list of Kafka broker hostname and port pairs
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaBrokers,

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
"io.confluent.kafka.serializers.KafkaAvroSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
"io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> kafkaSchemaRegistryUrl
)
}

val config = ConfigFactory.load("application")
val kafkaTopics = config.getString("kafka.topics").split(",").map(_.trim)
val kafkaBrokers = config.getString("kafka.brokers")
val kafkaSchemaRegistryUrl = config.getString("kafka.schema-registry-url")

case class Composer(name: Option[String], birthplace: Option[String], compositions: Seq[String])

/*
Json is used as a data source so that we can consume session attributes

Note: Avro4s requires that all JSON fields are provided, otherwise you'll get an unsupported list exception
*/

val json = "{\"name\":\"ennio morricone\",\"birthplace\":\"rome\",\"compositions\":[\"legend of 1900\",\"ecstasy of gold\"]}"

val recordFormat = RecordFormat[Composer]
val schema = SchemaFor[Composer]
val fromRecord = FromRecord[Composer]
val toRecord = ToRecord[Composer]

val kafkaConf = kafka
// Kafka topic name
.topic("test")
// Kafka producer configs
.properties(loadConfigMap(kafkaBrokers, kafkaSchemaRegistryUrl))

val scn = scenario("Kafka Test")
.exec(
kafka("request")
.sendAvro[Composer](schema, recordFormat, fromRecord, json)
)

setUp(
scn
.inject(constantUsersPerSec(10) during(90 seconds)))
.protocols(kafkaConf)
}