Skip to content

Commit b62e388

Browse files
authored
Merge pull request #1426 from guardian/ld/java-sdk-upgrade-kinesis
Java sdk upgrade kinesis: Update path (7)
2 parents a9513f6 + 1fe6f8a commit b62e388

File tree

4 files changed

+40
-28
lines changed

4 files changed

+40
-28
lines changed

app/data/DataStores.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ class DataStores(aws: AWSConfig with SNSAccess, capi: CapiAccess) {
3131
new NotifyingAtomPublisher(
3232
isLive = true,
3333
topicArn = capiContentEventsTopicName,
34-
underlying = new LiveKinesisAtomPublisher(
34+
underlying = new LiveKinesisAtomPublisherV2(
3535
aws.liveKinesisStreamName,
3636
aws.crossAccountKinesisClient
3737
),
3838
sns = aws.snsClient
3939
)
4040

4141
case None =>
42-
new LiveKinesisAtomPublisher(
42+
new LiveKinesisAtomPublisherV2(
4343
aws.liveKinesisStreamName,
4444
aws.crossAccountKinesisClient
4545
)
@@ -50,28 +50,28 @@ class DataStores(aws: AWSConfig with SNSAccess, capi: CapiAccess) {
5050
new NotifyingAtomPublisher(
5151
isLive = true,
5252
topicArn = capiContentEventsTopicName,
53-
underlying = new PreviewKinesisAtomPublisher(
53+
underlying = new PreviewKinesisAtomPublisherV2(
5454
aws.previewKinesisStreamName,
5555
aws.crossAccountKinesisClient
5656
),
5757
sns = aws.snsClient
5858
)
5959

6060
case None =>
61-
new PreviewKinesisAtomPublisher(
61+
new PreviewKinesisAtomPublisherV2(
6262
aws.previewKinesisStreamName,
6363
aws.crossAccountKinesisClient
6464
)
6565
}
6666

6767
val reindexPreview: PreviewAtomReindexer =
68-
new PreviewKinesisAtomReindexer(
68+
new PreviewKinesisAtomReindexerV2(
6969
aws.previewKinesisReindexStreamName,
7070
aws.crossAccountKinesisClient
7171
)
7272

73-
val reindexPublished: PublishedKinesisAtomReindexer =
74-
new PublishedKinesisAtomReindexer(
73+
val reindexPublished: PublishedKinesisAtomReindexerV2 =
74+
new PublishedKinesisAtomReindexerV2(
7575
aws.publishedKinesisReindexStreamName,
7676
aws.crossAccountKinesisClient
7777
)

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ val scroogeVersion = "4.12.0"
88
val awsVersion = "1.11.1034"
99
val awsV2Version = "2.32.26"
1010
val pandaVersion = "10.0.0"
11-
val atomMakerVersion = "6.0.0"
11+
val atomMakerVersion = "8.0.0"
1212
val typesafeConfigVersion =
1313
"1.4.0" // to match what we get from Play transitively
1414
val scanamoVersion = "1.0.0-M28"
@@ -104,7 +104,7 @@ lazy val common = (project in file("common"))
104104
"com.typesafe" % "config" % typesafeConfigVersion,
105105
"com.amazonaws" % "aws-lambda-java-core" % awsLambdaCoreVersion,
106106
"software.amazon.awssdk" % "dynamodb" % awsV2Version,
107-
"com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion,
107+
"software.amazon.awssdk" % "kinesis" % awsV2Version,
108108
"com.gu" %% "play-json-extensions" % playJsonExtensionsVersion,
109109
"ch.qos.logback" % "logback-classic" % logbackClassicVersion,
110110
"com.amazonaws" % "aws-java-sdk-sts" % awsVersion,

common/src/main/scala/com/gu/media/aws/KinesisAccess.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.gu.media.aws
22

3-
import java.nio.ByteBuffer
4-
import java.nio.charset.StandardCharsets
5-
6-
import com.amazonaws.AmazonClientException
7-
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
3+
import software.amazon.awssdk.core.exception.SdkClientException
4+
import software.amazon.awssdk.services.kinesis.model.{
5+
PutRecordRequest,
6+
DescribeStreamRequest
7+
}
8+
import software.amazon.awssdk.services.kinesis.KinesisClient
89
import com.gu.media.Settings
910
import com.gu.media.logging.Logging
1011
import play.api.libs.json.{Json, Writes}
12+
import software.amazon.awssdk.core.SdkBytes
1113

1214
trait KinesisAccess { this: Settings with AwsAccess with Logging =>
1315
val liveKinesisStreamName: String = getMandatoryString(
@@ -30,16 +32,16 @@ trait KinesisAccess { this: Settings with AwsAccess with Logging =>
3032

3133
val syncWithPluto: Boolean = getBoolean("pluto.sync").getOrElse(false)
3234

33-
lazy val crossAccountKinesisClient = AmazonKinesisClientBuilder
34-
.standard()
35-
.withCredentials(credentials.crossAccount.awsV1Creds)
36-
.withRegion(region.getName)
35+
lazy val crossAccountKinesisClient = KinesisClient
36+
.builder()
37+
.credentialsProvider(credentials.crossAccount.awsV2Creds)
38+
.region(awsV2Region)
3739
.build()
3840

39-
lazy val kinesisClient = AmazonKinesisClientBuilder
40-
.standard()
41-
.withCredentials(credentials.instance.awsV1Creds)
42-
.withRegion(region.getName)
41+
lazy val kinesisClient = KinesisClient
42+
.builder()
43+
.credentialsProvider(credentials.instance.awsV2Creds)
44+
.region(awsV2Region)
4345
.build()
4446

4547
def sendOnKinesis[T: Writes](
@@ -50,16 +52,23 @@ trait KinesisAccess { this: Settings with AwsAccess with Logging =>
5052
val json = Json.stringify(Json.toJson(value))
5153
log.info(s"Sending JSON on Kinesis [$streamName]: $json")
5254

53-
val bytes = ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8))
55+
val putRecordRequest = PutRecordRequest
56+
.builder()
57+
.streamName(streamName)
58+
.data(SdkBytes.fromUtf8String(json))
59+
.partitionKey(partitionKey)
60+
.build()
5461

55-
kinesisClient.putRecord(streamName, bytes, partitionKey)
62+
kinesisClient.putRecord(putRecordRequest)
5663
}
5764

5865
def testKinesisAccess(streamName: String): Boolean = try {
59-
crossAccountKinesisClient.describeStream(streamName)
66+
val describeStream =
67+
DescribeStreamRequest.builder().streamName(streamName).build()
68+
crossAccountKinesisClient.describeStream(describeStream)
6069
true
6170
} catch {
62-
case e: AmazonClientException =>
71+
case e: SdkClientException =>
6372
false
6473
}
6574
}

uploader/src/main/scala/com/gu/media/upload/AddAssetToAtom.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package com.gu.media.upload
22

33
import java.util.Date
44
import com.gu.atom.data.PreviewDynamoDataStore
5-
import com.gu.atom.publish.PreviewKinesisAtomPublisher
5+
import com.gu.atom.publish.{
6+
PreviewKinesisAtomPublisher,
7+
PreviewKinesisAtomPublisherV2
8+
}
69
import com.gu.contentatom.thrift.{Atom, ContentAtomEvent, EventType}
710
import com.gu.media.aws.{DynamoAccess, KinesisAccess, UploadAccess}
811
import com.gu.media.lambda.LambdaWithParams
@@ -30,7 +33,7 @@ class AddAssetToAtom
3033
)
3134

3235
private val store = new PreviewDynamoDataStore(dynamoDB, dynamoTableName)
33-
private val publisher = new PreviewKinesisAtomPublisher(
36+
private val publisher = new PreviewKinesisAtomPublisherV2(
3437
previewKinesisStreamName,
3538
crossAccountKinesisClient
3639
)

0 commit comments

Comments
 (0)