@@ -55,61 +55,61 @@ object TransactionalKafkaProducer {
5555 implicit F : ConcurrentEffect [F ],
5656 context : ContextShift [F ]
5757 ): Resource [F , TransactionalKafkaProducer [F , K , V ]] =
58- Resource .liftF(settings.producerSettings.keySerializer).flatMap { keySerializer =>
59- Resource .liftF(settings.producerSettings.valueSerializer).flatMap { valueSerializer =>
60- WithProducer (settings).map { withProducer =>
61- new TransactionalKafkaProducer [F , K , V ] {
62- override def produce [P ](
63- records : TransactionalProducerRecords [F , P , K , V ]
64- ): F [ProducerResult [P , K , V ]] =
65- produceTransaction(records)
66- .map(ProducerResult (_, records.passthrough))
58+ (
59+ Resource .liftF(settings.producerSettings.keySerializer),
60+ Resource .liftF(settings.producerSettings.valueSerializer),
61+ WithProducer (settings)
62+ ).mapN { (keySerializer, valueSerializer, withProducer) =>
63+ new TransactionalKafkaProducer [F , K , V ] {
64+ override def produce [P ](
65+ records : TransactionalProducerRecords [F , P , K , V ]
66+ ): F [ProducerResult [P , K , V ]] =
67+ produceTransaction(records)
68+ .map(ProducerResult (_, records.passthrough))
6769
68- private [this ] def produceTransaction [P ](
69- records : TransactionalProducerRecords [F , P , K , V ]
70- ): F [Chunk [(ProducerRecord [K , V ], RecordMetadata )]] =
71- if (records.records.isEmpty) F .pure(Chunk .empty)
72- else {
73- val batch =
74- CommittableOffsetBatch .fromFoldableMap(records.records)(_.offset)
70+ private [this ] def produceTransaction [P ](
71+ records : TransactionalProducerRecords [F , P , K , V ]
72+ ): F [Chunk [(ProducerRecord [K , V ], RecordMetadata )]] =
73+ if (records.records.isEmpty) F .pure(Chunk .empty)
74+ else {
75+ val batch =
76+ CommittableOffsetBatch .fromFoldableMap(records.records)(_.offset)
7577
76- val consumerGroupId =
77- if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1 )
78- F .raiseError(ConsumerGroupException (batch.consumerGroupIds))
79- else F .pure(batch.consumerGroupIds.head)
78+ val consumerGroupId =
79+ if (batch.consumerGroupIdsMissing || batch.consumerGroupIds.size != 1 )
80+ F .raiseError(ConsumerGroupException (batch.consumerGroupIds))
81+ else F .pure(batch.consumerGroupIds.head)
8082
81- consumerGroupId.flatMap { groupId =>
82- withProducer { (producer, blocking) =>
83- blocking(producer.beginTransaction())
84- .bracketCase { _ =>
85- records.records
86- .flatMap(_.records)
87- .traverse(
88- KafkaProducer .produceRecord(keySerializer, valueSerializer, producer)
83+ consumerGroupId.flatMap { groupId =>
84+ withProducer { (producer, blocking) =>
85+ blocking(producer.beginTransaction())
86+ .bracketCase { _ =>
87+ records.records
88+ .flatMap(_.records)
89+ .traverse(
90+ KafkaProducer .produceRecord(keySerializer, valueSerializer, producer)
91+ )
92+ .map(_.sequence)
93+ .flatTap { _ =>
94+ blocking {
95+ producer.sendOffsetsToTransaction(
96+ batch.offsets.asJava,
97+ groupId
8998 )
90- .map(_.sequence)
91- .flatTap { _ =>
92- blocking {
93- producer.sendOffsetsToTransaction(
94- batch.offsets.asJava,
95- groupId
96- )
97- }
98- }
99- } {
100- case (_, ExitCase .Completed ) =>
101- blocking(producer.commitTransaction())
102- case (_, ExitCase .Canceled | ExitCase .Error (_)) =>
103- blocking(producer.abortTransaction())
99+ }
104100 }
105- }.flatten
106- }
107- }
108-
109- override def toString : String =
110- " TransactionalKafkaProducer$" + System .identityHashCode(this )
101+ } {
102+ case (_, ExitCase .Completed ) =>
103+ blocking(producer.commitTransaction())
104+ case (_, ExitCase .Canceled | ExitCase .Error (_)) =>
105+ blocking(producer.abortTransaction())
106+ }
107+ }.flatten
108+ }
111109 }
112- }
110+
111+ override def toString : String =
112+ " TransactionalKafkaProducer$" + System .identityHashCode(this )
113113 }
114114 }
115115
0 commit comments