66 */
77package com .demandware .carbonj .service .engine ;
88
9- import com .amazonaws .services .dynamodbv2 .AmazonDynamoDB ;
10- import com .amazonaws .services .dynamodbv2 .document .DynamoDB ;
11- import com .amazonaws .services .dynamodbv2 .document .Table ;
12- import com .amazonaws .services .dynamodbv2 .model .AttributeDefinition ;
13- import com .amazonaws .services .dynamodbv2 .model .AttributeValue ;
14- import com .amazonaws .services .dynamodbv2 .model .CreateTableRequest ;
15- import com .amazonaws .services .dynamodbv2 .model .GetItemRequest ;
16- import com .amazonaws .services .dynamodbv2 .model .KeySchemaElement ;
17- import com .amazonaws .services .dynamodbv2 .model .KeyType ;
18- import com .amazonaws .services .dynamodbv2 .model .ProvisionedThroughput ;
19- import com .amazonaws .services .dynamodbv2 .model .ScalarAttributeType ;
209import org .slf4j .Logger ;
2110import org .slf4j .LoggerFactory ;
11+ import software .amazon .awssdk .services .dynamodb .DynamoDbAsyncClient ;
12+ import software .amazon .awssdk .services .dynamodb .model .AttributeDefinition ;
13+ import software .amazon .awssdk .services .dynamodb .model .AttributeValue ;
14+ import software .amazon .awssdk .services .dynamodb .model .CreateTableRequest ;
15+ import software .amazon .awssdk .services .dynamodb .model .CreateTableResponse ;
16+ import software .amazon .awssdk .services .dynamodb .model .GetItemRequest ;
17+ import software .amazon .awssdk .services .dynamodb .model .GetItemResponse ;
18+ import software .amazon .awssdk .services .dynamodb .model .KeySchemaElement ;
19+ import software .amazon .awssdk .services .dynamodb .model .KeyType ;
20+ import software .amazon .awssdk .services .dynamodb .model .ProvisionedThroughput ;
21+ import software .amazon .awssdk .services .dynamodb .model .ScalarAttributeType ;
22+ import software .amazon .awssdk .services .dynamodb .model .UpdateItemRequest ;
2223
2324import java .util .Date ;
2425import java .util .HashMap ;
2526import java .util .Map ;
27+ import java .util .concurrent .CompletableFuture ;
2628import java .util .concurrent .TimeUnit ;
2729
2830public class DynamoDbCheckPointMgr implements CheckPointMgr <Date > {
@@ -31,71 +33,74 @@ public class DynamoDbCheckPointMgr implements CheckPointMgr<Date> {
3133 private final String tableName ;
3234 private final int defaultOffsetMins ;
3335
34- private final AmazonDynamoDB client ;
35- private final DynamoDB dynamoDB ;
36+ private final DynamoDbAsyncClient client ;
37+ private final int checkPointDynamodbTimout ;
3638
37- public DynamoDbCheckPointMgr (AmazonDynamoDB client , String kinesisApplicationName , int defaultOffsetMins ,
38- int provisionedThroughput ) throws Exception {
39+ public DynamoDbCheckPointMgr (DynamoDbAsyncClient client , String kinesisApplicationName , int defaultOffsetMins ,
40+ int provisionedThroughput , int checkPointDynamodbTimout ) throws Exception {
3941 this .client = client ;
40- this .dynamoDB = new DynamoDB (client );
4142 this .defaultOffsetMins = defaultOffsetMins ;
4243 this .tableName = "checkpoints-" + kinesisApplicationName ;
43- if (!DynamoDbUtils .isTablePresent (dynamoDB , tableName )) {
44+ this .checkPointDynamodbTimout = checkPointDynamodbTimout ;
45+ if (!DynamoDbUtils .isTablePresent (client , tableName , checkPointDynamodbTimout )) {
4446 createTable (tableName , provisionedThroughput );
4547 }
4648 }
4749
4850 private void createTable (String tableName , int provisionedThroughput ) throws Exception {
49- CreateTableRequest request = new CreateTableRequest ()
50- .withAttributeDefinitions (
51- new AttributeDefinition ("checkPointType" , ScalarAttributeType .S ))
52- .withKeySchema (
53- new KeySchemaElement ("checkPointType" , KeyType .HASH ))
54- .withProvisionedThroughput (
55- new ProvisionedThroughput ((long )provisionedThroughput , (long )provisionedThroughput ))
56- .withTableName (tableName );
51+ CreateTableRequest request = CreateTableRequest .builder ()
52+ .tableName (tableName )
53+ .attributeDefinitions (AttributeDefinition .builder ().attributeName ("checkPointType" ).attributeType (ScalarAttributeType .S ).build ())
54+ .keySchema (KeySchemaElement .builder ().attributeName ("checkPointType" ).keyType (KeyType .HASH ).build ())
55+ .provisionedThroughput (ProvisionedThroughput .builder ()
56+ .readCapacityUnits ((long )provisionedThroughput )
57+ .writeCapacityUnits ((long )provisionedThroughput )
58+ .build ())
59+ .build ();
5760 log .info ("Issuing CreateTable request for " + tableName );
58- Table newlyCreatedTable = dynamoDB .createTable (request );
61+ CompletableFuture < CreateTableResponse > createTableResponse = this . client .createTable (request );
5962 log .info ("Waiting for " + tableName + " to be created...this may take a while..." );
60- newlyCreatedTable . waitForActive ( );
63+ createTableResponse . get ( checkPointDynamodbTimout , TimeUnit . SECONDS );
6164 }
6265
6366 @ Override
6467 public void checkPoint (Date checkPoint ) throws Exception {
65- Table table = dynamoDB .getTable (tableName );
6668
67- HashMap <String , String > expressionAttributeNames = new HashMap <>();
69+ Map <String , String > expressionAttributeNames = new HashMap <>();
6870 expressionAttributeNames .put ("#V" , "checkPointValue" );
6971
70- HashMap <String , Object > expressionAttributeValues = new HashMap <>();
71- expressionAttributeValues .put (":val1" , checkPoint .getTime ());
72+ Map <String , AttributeValue > expressionAttributeValues = new HashMap <>();
73+ expressionAttributeValues .put (":val1" , AttributeValue . builder (). n ( String . valueOf ( checkPoint .getTime ())). build ());
7274
73- table .updateItem (
74- "checkPointType" , // key attribute name
75- "timestamp" , // key attribute value
76- "set #V = :val1" , // UpdateExpression
77- expressionAttributeNames ,
78- expressionAttributeValues );
75+ client .updateItem (UpdateItemRequest . builder ()
76+ . tableName ( tableName )
77+ . key ( Map . of ( "checkPointType" , AttributeValue . builder (). s ( "timestamp" ). build ()))
78+ . updateExpression ( "set #V = :val1" )
79+ . expressionAttributeNames ( expressionAttributeNames )
80+ . expressionAttributeValues ( expressionAttributeValues ). build () );
7981 }
8082
8183 @ Override
8284 public Date lastCheckPoint () throws Exception {
83- HashMap <String , AttributeValue > keyToGet = new HashMap <String , AttributeValue >();
84- keyToGet .put ( "checkPointType" , new AttributeValue ( "timestamp" ) );
85- GetItemRequest request = new GetItemRequest ()
86- .withKey ( keyToGet )
87- .withTableName ( tableName );
88-
89- Map <String , AttributeValue > item = client .getItem ( request ).getItem ();
90- if ( item == null ) {
85+
86+ GetItemRequest request = GetItemRequest .builder ()
87+ .tableName (tableName )
88+ .key (Map .of ("checkPointType" , AttributeValue .builder ().s ("timestamp" ).build ()))
89+ .build ();
90+
91+ GetItemResponse getItemResponse = this .client .getItem (request ).get (checkPointDynamodbTimout , TimeUnit .SECONDS );
92+
93+ if (!getItemResponse .hasItem ()) {
9194 return getDefaultCheckPoint ();
9295 }
93- String value = item .get ( "checkPointValue" ).getN ();
96+
97+ Map <String , AttributeValue > item = getItemResponse .item ();
98+ String value = item .get ("checkPointValue" ).n ();
9499 if ( value == null ) {
95100 return getDefaultCheckPoint ();
96101 }
97102
98- return new Date ( Long .parseLong ( value ) );
103+ return new Date (Long .parseLong (value ) );
99104 }
100105
101106 private Date getDefaultCheckPoint () {
@@ -104,4 +109,3 @@ private Date getDefaultCheckPoint() {
104109 return checkPoint ;
105110 }
106111}
107-
0 commit comments