Skip to content

Commit 6b83463

Browse files
authored
Merge pull request #173 from julienrf/replicate-indexes
Replicate indexes when replicating DynamoDB tables
2 parents 169479c + 2d4c2f6 commit 6b83463

File tree

5 files changed

+187
-7
lines changed

5 files changed

+187
-7
lines changed

migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import software.amazon.awssdk.services.dynamodb.model.{
1515
CreateTableRequest,
1616
DescribeStreamRequest,
1717
DescribeTableRequest,
18+
GlobalSecondaryIndex,
19+
LocalSecondaryIndex,
1820
ProvisionedThroughput,
1921
ProvisionedThroughputDescription,
2022
ResourceNotFoundException,
@@ -26,6 +28,7 @@ import software.amazon.awssdk.services.dynamodb.model.{
2628
}
2729
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient
2830

31+
import java.util.stream.Collectors
2932
import scala.util.{ Failure, Success, Try }
3033
import scala.jdk.OptionConverters._
3134

@@ -65,6 +68,41 @@ object DynamoUtils {
6568
} else {
6669
request.billingMode(BillingMode.PAY_PER_REQUEST)
6770
}
71+
if (sourceDescription.hasLocalSecondaryIndexes) {
72+
request.localSecondaryIndexes(
73+
sourceDescription.localSecondaryIndexes.stream
74+
.map(
75+
index =>
76+
LocalSecondaryIndex
77+
.builder()
78+
.indexName(index.indexName())
79+
.keySchema(index.keySchema())
80+
.projection(index.projection())
81+
.build())
82+
.collect(Collectors.toList[LocalSecondaryIndex])
83+
)
84+
}
85+
if (sourceDescription.hasGlobalSecondaryIndexes) {
86+
request.globalSecondaryIndexes(
87+
sourceDescription.globalSecondaryIndexes.stream
88+
.map(
89+
index =>
90+
GlobalSecondaryIndex
91+
.builder()
92+
.indexName(index.indexName())
93+
.keySchema(index.keySchema())
94+
.projection(index.projection())
95+
.provisionedThroughput(
96+
ProvisionedThroughput
97+
.builder()
98+
.readCapacityUnits(index.provisionedThroughput.readCapacityUnits)
99+
.writeCapacityUnits(index.provisionedThroughput.writeCapacityUnits)
100+
.build()
101+
)
102+
.build())
103+
.collect(Collectors.toList[GlobalSecondaryIndex])
104+
)
105+
}
68106

69107
log.info(
70108
s"Table ${target.table} does not exist at destination - creating it according to definition:")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
source:
2+
type: dynamodb
3+
table: TableWithSecondaryIndexes
4+
region: dummy
5+
endpoint:
6+
host: http://dynamodb
7+
port: 8000
8+
credentials:
9+
accessKey: dummy
10+
secretKey: dummy
11+
12+
target:
13+
type: dynamodb
14+
table: TableWithSecondaryIndexes
15+
region: dummy
16+
endpoint:
17+
host: http://scylla
18+
port: 8000
19+
credentials:
20+
accessKey: dummy
21+
secretKey: dummy
22+
streamChanges: false
23+
24+
renames: []
25+
26+
# Below are unused but mandatory settings
27+
savepoints:
28+
path: /app/savepoints
29+
intervalSeconds: 300
30+
skipTokenRanges: []
31+
validation:
32+
compareTimestamps: true
33+
ttlToleranceMillis: 60000
34+
writetimeToleranceMillis: 1000
35+
failuresToFetch: 100
36+
floatingPointTolerance: 0.001
37+
timestampMsTolerance: 0

tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBS3ExportMigrationTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ class DynamoDBS3ExportMigrationTest extends MigratorSuite {
4747
checkSchemaWasMigrated(
4848
tableName,
4949
Seq(KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build()).asJava,
50-
Seq(AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build()).asJava
50+
Seq(AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build()).asJava,
51+
Nil.asJava,
52+
Nil.asJava
5153
)
5254

5355
// Check that the items have been migrated to the target table

tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.scylladb.migrator.alternator
33
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
44
import software.amazon.awssdk.regions.Region
55
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
6-
import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DescribeTableRequest, GetItemRequest, KeySchemaElement, KeyType, ProvisionedThroughput, ResourceNotFoundException, ScalarAttributeType}
6+
import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DescribeTableRequest, GetItemRequest, GlobalSecondaryIndexDescription, KeySchemaElement, KeyType, LocalSecondaryIndex, LocalSecondaryIndexDescription, ProvisionedThroughput, ResourceNotFoundException, ScalarAttributeType}
77

88
import java.net.URI
99
import scala.util.chaining._
@@ -57,7 +57,7 @@ trait MigratorSuite extends munit.FunSuite {
5757
.tableName(name)
5858
.keySchema(KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build())
5959
.attributeDefinitions(AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build())
60-
.provisionedThroughput(ProvisionedThroughput.builder.readCapacityUnits(25L).writeCapacityUnits(25L).build())
60+
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(25L).writeCapacityUnits(25L).build())
6161
.build()
6262
sourceDDb.createTable(createTableRequest)
6363
val waiterResponse =
@@ -67,7 +67,7 @@ trait MigratorSuite extends munit.FunSuite {
6767
assert(waiterResponse.matched().response().isPresent, s"Failed to create table ${name}: ${waiterResponse.matched().exception().get()}")
6868
} catch {
6969
case any: Throwable =>
70-
fail(s"Failed to created table ${name} in database ${sourceDDb}", any)
70+
fail(s"Failed to create table ${name} in database ${sourceDDb}", any)
7171
}
7272
name
7373
},
@@ -112,18 +112,44 @@ trait MigratorSuite extends munit.FunSuite {
112112
checkSchemaWasMigrated(
113113
tableName,
114114
sourceTableDesc.keySchema,
115-
sourceTableDesc.attributeDefinitions
115+
sourceTableDesc.attributeDefinitions,
116+
sourceTableDesc.localSecondaryIndexes,
117+
sourceTableDesc.globalSecondaryIndexes
116118
)
117119
}
118120

119121
/** Check that the table schema in the target database is equal to the provided schema */
120-
def checkSchemaWasMigrated(tableName: String, keySchema: java.util.List[KeySchemaElement], attributeDefinitions: java.util.List[AttributeDefinition]): Unit = {
122+
def checkSchemaWasMigrated(
123+
tableName: String,
124+
keySchema: java.util.List[KeySchemaElement],
125+
attributeDefinitions: java.util.List[AttributeDefinition],
126+
localSecondaryIndexes: java.util.List[LocalSecondaryIndexDescription],
127+
globalSecondaryIndexes: java.util.List[GlobalSecondaryIndexDescription]): Unit = {
121128
targetAlternator
122129
.describeTable(describeTableRequest(tableName))
123130
.table
124131
.tap { targetTableDesc =>
132+
// Partition key
125133
assertEquals(targetTableDesc.keySchema, keySchema)
126-
assertEquals(targetTableDesc.attributeDefinitions, attributeDefinitions)
134+
135+
// Attribute definitions
136+
assertEquals(targetTableDesc.attributeDefinitions.asScala.toSet, attributeDefinitions.asScala.toSet)
137+
138+
// Local secondary indexes: do not compare their ARN, which always unique
139+
def localIndexRelevantProperties(index: LocalSecondaryIndexDescription) =
140+
(index.indexName, index.keySchema, index.projection)
141+
assertEquals(
142+
targetTableDesc.localSecondaryIndexes.asScala.map(localIndexRelevantProperties),
143+
localSecondaryIndexes.asScala.map(localIndexRelevantProperties)
144+
)
145+
146+
// Global secondary indexes: do not compare ARN and provisioned throughput (see https://github.com/scylladb/scylladb/issues/19718)
147+
def globalIndexRelevantProperties(index: GlobalSecondaryIndexDescription) =
148+
(index.indexName, index.keySchema, index.projection/*, index.provisionedThroughput*/)
149+
assertEquals(
150+
targetTableDesc.globalSecondaryIndexes.asScala.map(globalIndexRelevantProperties),
151+
globalSecondaryIndexes.asScala.map(globalIndexRelevantProperties)
152+
)
127153
}
128154
}
129155

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.scylladb.migrator.alternator
2+
3+
import com.scylladb.migrator.SparkUtils.successfullyPerformMigration
4+
import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, CreateTableRequest, DeleteTableRequest, GlobalSecondaryIndex, KeySchemaElement, KeyType, LocalSecondaryIndex, Projection, ProjectionType, ProvisionedThroughput, ScalarAttributeType}
5+
6+
class SecondaryIndexesTest extends MigratorSuite {
7+
8+
val tableName = "TableWithSecondaryIndexes"
9+
val withResources: FunFixture[Unit] = FunFixture(
10+
setup = _ => {
11+
deleteTableIfExists(sourceDDb, tableName)
12+
deleteTableIfExists(targetAlternator, tableName)
13+
try {
14+
val createTableRequest =
15+
CreateTableRequest
16+
.builder()
17+
.tableName(tableName)
18+
.keySchema(
19+
KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build(),
20+
KeySchemaElement.builder().attributeName("z").keyType(KeyType.RANGE).build()
21+
)
22+
.attributeDefinitions(
23+
AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build(),
24+
AttributeDefinition.builder().attributeName("x").attributeType(ScalarAttributeType.N).build(),
25+
AttributeDefinition.builder().attributeName("y").attributeType(ScalarAttributeType.N).build(),
26+
AttributeDefinition.builder().attributeName("z").attributeType(ScalarAttributeType.N).build()
27+
)
28+
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(25L).writeCapacityUnits(25L).build())
29+
.localSecondaryIndexes(
30+
LocalSecondaryIndex
31+
.builder()
32+
.indexName("local")
33+
.keySchema(
34+
KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build(),
35+
KeySchemaElement.builder().attributeName("y").keyType(KeyType.RANGE).build(),
36+
)
37+
.projection(Projection.builder().projectionType(ProjectionType.ALL).build())
38+
.build()
39+
)
40+
.globalSecondaryIndexes(
41+
GlobalSecondaryIndex
42+
.builder()
43+
.indexName("global")
44+
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(1L).writeCapacityUnits(1L).build())
45+
.keySchema(
46+
KeySchemaElement.builder().attributeName("x").keyType(KeyType.HASH).build(),
47+
KeySchemaElement.builder().attributeName("z").keyType(KeyType.RANGE).build()
48+
)
49+
.projection(Projection.builder().projectionType(ProjectionType.ALL).build())
50+
.build()
51+
)
52+
.build()
53+
sourceDDb.createTable(createTableRequest)
54+
val waiterResponse =
55+
sourceDDb
56+
.waiter()
57+
.waitUntilTableExists(describeTableRequest(tableName))
58+
assert(waiterResponse.matched().response().isPresent, s"Failed to create table ${tableName}: ${waiterResponse.matched().exception().get()}")
59+
} catch {
60+
case any: Throwable =>
61+
fail(s"Failed to create table ${tableName} in database ${sourceDDb}", any)
62+
}
63+
()
64+
},
65+
teardown = _ => {
66+
targetAlternator.deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
67+
sourceDDb.deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
68+
()
69+
}
70+
)
71+
72+
withResources.test("The secondary indexes of a table are correctly replicated") { _ =>
73+
successfullyPerformMigration("dynamodb-to-alternator-secondary-indexes.yaml")
74+
checkSchemaWasMigrated(tableName)
75+
}
76+
77+
}

0 commit comments

Comments
 (0)