Skip to content

Commit 3013d6a

Browse files
Artemtarzanek
authored andcommitted
Ensured test validates actual stream replication results
1 parent 0089e68 commit 3013d6a

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

tests/src/test/scala/com/scylladb/migrator/writers/DynamoStreamReplicationIntegrationTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DynamoStreamReplicationIntegrationTest extends MigratorSuiteWithDynamoDBLo
4848
.waiter()
4949
.waitUntilTableExists(DescribeTableRequest.builder().tableName(tableName).build())
5050

51-
sourceDDb().putItem(
51+
targetAlternator().putItem(
5252
PutItemRequest
5353
.builder()
5454
.tableName(tableName)
@@ -60,7 +60,7 @@ class DynamoStreamReplicationIntegrationTest extends MigratorSuiteWithDynamoDBLo
6060
)
6161
.build()
6262
)
63-
sourceDDb().putItem(
63+
targetAlternator().putItem(
6464
PutItemRequest
6565
.builder()
6666
.tableName(tableName)
@@ -108,21 +108,21 @@ class DynamoStreamReplicationIntegrationTest extends MigratorSuiteWithDynamoDBLo
108108
).asJava)
109109
)
110110

111-
val rdd = spark.sparkContext.parallelize(streamEvents)
111+
val rdd = spark.sparkContext.parallelize(streamEvents, 1)
112112
.asInstanceOf[RDD[Option[DynamoStreamReplication.DynamoItem]]]
113113

114114
val targetSettings = TargetSettings.DynamoDB(
115115
table = tableName,
116116
region = Some("eu-central-1"),
117-
endpoint = Some(DynamoDBEndpoint("http://localhost", 8001)),
117+
endpoint = Some(DynamoDBEndpoint("http://localhost", 8000)),
118118
credentials = Some(AWSCredentials("dummy", "dummy", None)),
119119
streamChanges = false,
120120
skipInitialSnapshotTransfer = Some(true),
121121
writeThroughput = None,
122122
throughputWritePercent = None
123123
)
124124

125-
val tableDesc = sourceDDb().describeTable(
125+
val tableDesc = targetAlternator().describeTable(
126126
software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest
127127
.builder()
128128
.tableName(tableName)
@@ -136,7 +136,7 @@ class DynamoStreamReplicationIntegrationTest extends MigratorSuiteWithDynamoDBLo
136136
tableDesc
137137
)
138138

139-
val finalItems = scanAll(sourceDDb(), tableName).sortBy(m => m("id").s)
139+
val finalItems = scanAll(targetAlternator(), tableName).sortBy(m => m("id").s)
140140

141141
assertEquals(finalItems.size, 3)
142142

0 commit comments

Comments
 (0)