1212 */
1313package com .snowplowanalytics .snowplow .enrich .kinesis
1414
15- import java .nio .ByteBuffer
15+ import java .nio .charset . StandardCharsets
1616import java .util .UUID
1717
18- import scala .collection . JavaConverters ._
18+ import scala .jdk . CollectionConverters ._
1919
2020import cats .implicits ._
2121import cats .{Monoid , Parallel }
@@ -29,10 +29,10 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
2929import retry .syntax .all ._
3030import retry .RetryPolicy
3131
32- import com . amazonaws . client . builder . AwsClientBuilder . EndpointConfiguration
33-
34- import com . amazonaws . services .kinesis .model . _
35- import com . amazonaws . services .kinesis .{ AmazonKinesis , AmazonKinesisClientBuilder }
32+ import software . amazon . awssdk . core . SdkBytes
33+ import software . amazon . awssdk . regions . Region
34+ import software . amazon . awssdk . services .kinesis .KinesisClient
35+ import software . amazon . awssdk . services .kinesis .model . _
3636
3737import com .snowplowanalytics .snowplow .enrich .common .fs2 .{AttributedByteSink , AttributedData , ByteSink }
3838import com .snowplowanalytics .snowplow .enrich .common .fs2 .config .io .Output
@@ -58,8 +58,8 @@ object Sink {
5858 o.region.orElse(getRuntimeRegion) match {
5959 case Some (region) =>
6060 for {
61- producer <- Resource .eval[ F , AmazonKinesis ]( mkProducer(o, region) )
62- } yield records => writeToKinesis(o, producer, toKinesisRecords(records))
61+ producer <- mkProducer(o, region)
62+ } yield ( records : List [ AttributedData [ Array [ Byte ]]]) => writeToKinesis(o, producer, toKinesisRecords(records))
6363 case None =>
6464 Resource .eval(Sync [F ].raiseError(new RuntimeException (s " Region not found in the config and in the runtime " )))
6565 }
@@ -70,23 +70,23 @@ object Sink {
7070 private def mkProducer [F [_]: Sync ](
7171 config : Output .Kinesis ,
7272 region : String
73- ): F [ AmazonKinesis ] =
74- for {
75- builder <- Sync [F ].delay( AmazonKinesisClientBuilder .standard)
76- withEndpoint <- config.customEndpoint match {
77- case Some (endpoint) =>
78- Sync [ F ].delay(builder.withEndpointConfiguration( new EndpointConfiguration (endpoint.toString, region) ))
79- case None =>
80- Sync [ F ].delay (builder.withRegion(region) )
81- }
82- kinesis <- Sync [ F ].delay(withEndpoint. build() )
83- _ <- streamExists(kinesis, config.streamName )
84- } yield kinesis
73+ ): Resource [ F , KinesisClient ] =
74+ Resource
75+ .fromAutoCloseable( Sync [F ].delay {
76+ val builder = KinesisClient
77+ .builder()
78+ .region( Region .of( region))
79+ config.customEndpoint
80+ .map (builder.endpointOverride )
81+ .getOrElse(builder)
82+ . build()
83+ } )
84+ .evalTap( kinesis => streamExists(kinesis, config.streamName))
8585
86- private def streamExists [F [_]: Sync ](kinesis : AmazonKinesis , stream : String ): F [Unit ] =
86+ private def streamExists [F [_]: Sync ](kinesis : KinesisClient , stream : String ): F [Unit ] =
8787 for {
88- described <- Sync [F ].delay(kinesis.describeStream(stream))
89- status = described.getStreamDescription.getStreamStatus
88+ described <- Sync [F ].delay(kinesis.describeStream(DescribeStreamRequest .builder().streamName( stream).build() ))
89+ status = described.streamDescription.streamStatus.toString
9090 exists <- status match {
9191 case " ACTIVE" | " UPDATING" =>
9292 Sync [F ].unit
@@ -97,7 +97,7 @@ object Sink {
9797
9898 private def writeToKinesis [F [_]: Async : Parallel ](
9999 config : Output .Kinesis ,
100- kinesis : AmazonKinesis ,
100+ kinesis : KinesisClient ,
101101 records : List [PutRecordsRequestEntry ]
102102 ): F [Unit ] = {
103103 val policyForErrors = Retries .fullJitter[F ](config.backoffPolicy)
@@ -162,7 +162,7 @@ object Sink {
162162 }
163163
164164 private def getRecordSize (record : PutRecordsRequestEntry ) =
165- record.getData.array.size + record.getPartitionKey .getBytes.size
165+ record.data().asByteArray().length + record.partitionKey() .getBytes( StandardCharsets . UTF_8 ).length
166166
167167 /**
168168 * Try writing a batch, and returns a list of the failures to be retried:
@@ -173,7 +173,7 @@ object Sink {
173173 */
174174 private def tryWriteToKinesis [F [_]: Async ](
175175 config : Output .Kinesis ,
176- kinesis : AmazonKinesis ,
176+ kinesis : KinesisClient ,
177177 records : List [PutRecordsRequestEntry ],
178178 retryPolicy : RetryPolicy [F ]
179179 ): F [Vector [PutRecordsRequestEntry ]] =
@@ -203,12 +203,11 @@ object Sink {
203203
204204 private def toKinesisRecords (records : List [AttributedData [Array [Byte ]]]): List [PutRecordsRequestEntry ] =
205205 records.map { r =>
206- val binaryData = r.data
207- val data = ByteBuffer .wrap(binaryData)
208- val prre = new PutRecordsRequestEntry ()
209- prre.setPartitionKey(r.partitionKey)
210- prre.setData(data)
211- prre
206+ PutRecordsRequestEntry
207+ .builder()
208+ .partitionKey(r.partitionKey)
209+ .data(SdkBytes .fromByteArray(r.data))
210+ .build()
212211 }
213212
214213 /**
@@ -244,35 +243,34 @@ object Sink {
244243 )
245244 }
246245
247- def build (records : List [PutRecordsRequestEntry ], prr : PutRecordsResult ): TryBatchResult =
248- if (prr.getFailedRecordCount .toInt =!= 0 )
246+ def build (records : List [PutRecordsRequestEntry ], prr : PutRecordsResponse ): TryBatchResult =
247+ if (prr.failedRecordCount() .toInt =!= 0 )
249248 records
250- .zip(prr.getRecords .asScala)
249+ .zip(prr.records() .asScala)
251250 .foldMap { case (orig, recordResult) =>
252- Option (recordResult.getErrorCode ) match {
251+ Option (recordResult.errorCode() ) match {
253252 case None =>
254253 TryBatchResult (Vector .empty, true , false , None )
255254 case Some (" ProvisionedThroughputExceededException" ) =>
256255 TryBatchResult (Vector (orig), false , true , None )
257256 case Some (_) =>
258- TryBatchResult (Vector (orig), false , false , Option (recordResult.getErrorMessage ))
257+ TryBatchResult (Vector (orig), false , false , Option (recordResult.errorMessage() ))
259258 }
260259 }
261260 else
262261 TryBatchResult (Vector .empty, true , false , None )
263262 }
264263
265264 private def putRecords (
266- kinesis : AmazonKinesis ,
265+ kinesis : KinesisClient ,
267266 streamName : String ,
268267 records : List [PutRecordsRequestEntry ]
269- ): PutRecordsResult = {
270- val putRecordsRequest = {
271- val prr = new PutRecordsRequest ()
272- prr.setStreamName(streamName)
273- prr.setRecords(records.asJava)
274- prr
275- }
268+ ): PutRecordsResponse = {
269+ val putRecordsRequest = PutRecordsRequest
270+ .builder()
271+ .streamName(streamName)
272+ .records(records.asJava)
273+ .build()
276274 kinesis.putRecords(putRecordsRequest)
277275 }
278276
0 commit comments