Skip to content

Commit f4fbb29

Browse files
istreeterAlexBenny
authored andcommitted
Kafka sink use a dedicated thread for potentially blocking send
In #431 we improved performance of the Kafka sink by calling `producer.send()` on a compute thread not on a blocking thread. This is a great improvement if it is always true that `producer.send()` never blocks. But `producer.send()` does in fact block if the producer needs to re-fetch topic metadata. This happens every 5 minutes by default, and is configured by the kafka setting `metadata.max.age.ms`. If `producer.send()` blocks on a compute thread, then it can potentially cause thread starvation, and negatively impact the collector's responsiveness to requests. This PR changes to executing `send()` on a dedicated thread. From the point of view of the collector, it is ok if `send()` is blocking on the dedicated thread. It is better than running it inside `Sync[F].blocking` (which is what we did before) because that tended to create a huge number of threads under some circumstances. In theory this shouldn't negatively affect performance much, even though it's a single thread. Because most of the time `send()` does not block; and when it does block (i.e. once per 5 minutes) then it is ok for the other events to get enqueued as a backlog of Callables on the thread. Kafka sink use a dedicated thread for potentially blocking send: Amendment 1
1 parent fd4e187 commit f4fbb29

File tree

1 file changed

+27
-7
lines changed
  • kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks

1 file changed

+27
-7
lines changed

kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala

+27-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream
1212
package sinks
1313

1414
import cats.implicits._
15+
import cats.effect.implicits._
1516
import cats.effect._
1617
import org.typelevel.log4cats.Logger
1718
import org.typelevel.log4cats.slf4j.Slf4jLogger
@@ -20,6 +21,8 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecor
2021
import com.snowplowanalytics.snowplow.collector.core.{Config, Sink}
2122

2223
import scala.jdk.CollectionConverters._
24+
import scala.concurrent.ExecutionContext
25+
import java.util.concurrent.Executors
2326

2427
/**
2528
* Kafka Sink for the Scala Stream Collector
@@ -28,7 +31,8 @@ class KafkaSink[F[_]: Async: Logger](
2831
val maxBytes: Int,
2932
isHealthyState: Ref[F, Boolean],
3033
kafkaProducer: KafkaProducer[String, Array[Byte]],
31-
topicName: String
34+
topicName: String,
35+
ec: ExecutionContext
3236
) extends Sink[F] {
3337

3438
override def isHealthy: F[Boolean] = isHealthyState.get
@@ -40,14 +44,20 @@ class KafkaSink[F[_]: Async: Logger](
4044
* @param key The partition key to use
4145
*/
4246
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
47+
storeRawEventsAndWait(events, key).start.void
48+
49+
private def storeRawEventsAndWait(events: List[Array[Byte]], key: String): F[Unit] =
4350
Logger[F].debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") *>
44-
events.traverse_ { e =>
51+
events.parTraverse_ { e =>
4552
def go: F[Unit] =
4653
Async[F]
47-
.async_[Unit] { cb =>
48-
val record = new ProducerRecord(topicName, key, e)
49-
kafkaProducer.send(record, callback(cb))
50-
()
54+
.async[Unit] { cb =>
55+
val blockingSend = Sync[F].delay {
56+
val record = new ProducerRecord(topicName, key, e)
57+
kafkaProducer.send(record, callback(cb))
58+
Option.empty[F[Unit]]
59+
}
60+
Async[F].startOn(blockingSend, ec).map(f => Some(f.cancel))
5161
}
5262
.handleErrorWith { e =>
5363
handlePublishError(e) >> go
@@ -80,11 +90,13 @@ object KafkaSink {
8090
for {
8191
isHealthyState <- Resource.eval(Ref.of[F, Boolean](false))
8292
kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass)
93+
ec <- createExecutionContext
8394
} yield new KafkaSink(
8495
sinkConfig.config.maxBytes,
8596
isHealthyState,
8697
kafkaProducer,
87-
sinkConfig.name
98+
sinkConfig.name,
99+
ec
88100
)
89101

90102
/**
@@ -113,4 +125,12 @@ object KafkaSink {
113125
}
114126
Resource.make(make)(p => Sync[F].blocking(p.close))
115127
}
128+
129+
def createExecutionContext[F[_]: Sync]: Resource[F, ExecutionContext] = {
130+
val make = Sync[F].delay {
131+
Executors.newSingleThreadExecutor
132+
}
133+
Resource.make(make)(e => Sync[F].blocking(e.shutdown)).map(ExecutionContext.fromExecutorService(_))
134+
}
135+
116136
}

0 commit comments

Comments
 (0)