Skip to content

Commit 653eb3e

Browse files
authored
Merge pull request #529 from fd4s/nested-resource-refactor
Refactor TransactionalKafkaProducer
2 parents b1ddc60 + eb6e94e commit 653eb3e

1 file changed

Lines changed: 49 additions & 49 deletions

File tree

modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -55,61 +55,61 @@ object TransactionalKafkaProducer {
5555
implicit F: ConcurrentEffect[F],
5656
context: ContextShift[F]
5757
): Resource[F, TransactionalKafkaProducer[F, K, V]] =
58-
Resource.liftF(settings.producerSettings.keySerializer).flatMap { keySerializer =>
59-
Resource.liftF(settings.producerSettings.valueSerializer).flatMap { valueSerializer =>
60-
WithProducer(settings).map { withProducer =>
61-
new TransactionalKafkaProducer[F, K, V] {
62-
override def produce[P](
63-
records: TransactionalProducerRecords[F, K, V, P]
64-
): F[ProducerResult[K, V, P]] =
65-
produceTransaction(records)
66-
.map(ProducerResult(_, records.passthrough))
58+
(
59+
Resource.liftF(settings.producerSettings.keySerializer),
60+
Resource.liftF(settings.producerSettings.valueSerializer),
61+
WithProducer(settings)
62+
).mapN { (keySerializer, valueSerializer, withProducer) =>
63+
new TransactionalKafkaProducer[F, K, V] {
64+
override def produce[P](
65+
records: TransactionalProducerRecords[F, K, V, P]
66+
): F[ProducerResult[K, V, P]] =
67+
produceTransaction(records)
68+
.map(ProducerResult(_, records.passthrough))
6769

68-
private[this] def produceTransaction[P](
69-
records: TransactionalProducerRecords[F, K, V, P]
70-
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
71-
if (records.records.isEmpty) F.pure(Chunk.empty)
72-
else {
73-
val batch =
74-
CommittableOffsetBatch.fromFoldableMap(records.records)(_.offset)
70+
private[this] def produceTransaction[P](
71+
records: TransactionalProducerRecords[F, K, V, P]
72+
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
73+
if (records.records.isEmpty) F.pure(Chunk.empty)
74+
else {
75+
val batch =
76+
CommittableOffsetBatch.fromFoldableMap(records.records)(_.offset)
7577

76-
val consumerGroupId =
77-
if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1)
78-
F.raiseError(ConsumerGroupException(batch.consumerGroupIds))
79-
else F.pure(batch.consumerGroupIds.head)
78+
val consumerGroupId =
79+
if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1)
80+
F.raiseError(ConsumerGroupException(batch.consumerGroupIds))
81+
else F.pure(batch.consumerGroupIds.head)
8082

81-
consumerGroupId.flatMap { groupId =>
82-
withProducer { (producer, blocking) =>
83-
blocking(producer.beginTransaction())
84-
.bracketCase { _ =>
85-
records.records
86-
.flatMap(_.records)
87-
.traverse(
88-
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer)
83+
consumerGroupId.flatMap { groupId =>
84+
withProducer { (producer, blocking) =>
85+
blocking(producer.beginTransaction())
86+
.bracketCase { _ =>
87+
records.records
88+
.flatMap(_.records)
89+
.traverse(
90+
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer)
91+
)
92+
.map(_.sequence)
93+
.flatTap { _ =>
94+
blocking {
95+
producer.sendOffsetsToTransaction(
96+
batch.offsets.asJava,
97+
groupId
8998
)
90-
.map(_.sequence)
91-
.flatTap { _ =>
92-
blocking {
93-
producer.sendOffsetsToTransaction(
94-
batch.offsets.asJava,
95-
groupId
96-
)
97-
}
98-
}
99-
} {
100-
case (_, ExitCase.Completed) =>
101-
blocking(producer.commitTransaction())
102-
case (_, ExitCase.Canceled | ExitCase.Error(_)) =>
103-
blocking(producer.abortTransaction())
99+
}
104100
}
105-
}.flatten
106-
}
107-
}
108-
109-
override def toString: String =
110-
"TransactionalKafkaProducer$" + System.identityHashCode(this)
101+
} {
102+
case (_, ExitCase.Completed) =>
103+
blocking(producer.commitTransaction())
104+
case (_, ExitCase.Canceled | ExitCase.Error(_)) =>
105+
blocking(producer.abortTransaction())
106+
}
107+
}.flatten
108+
}
111109
}
112-
}
110+
111+
override def toString: String =
112+
"TransactionalKafkaProducer$" + System.identityHashCode(this)
113113
}
114114
}
115115

0 commit comments

Comments
 (0)