-
Notifications
You must be signed in to change notification settings - Fork 45
do not save _dynamo_op_type during dynamo stream replication #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| item | ||
| .entrySet() | ||
| .stream() | ||
| .filter(e => e.getKey != operationTypeColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filtered out
| item | ||
| .entrySet() | ||
| .stream() | ||
| .filter(e => e.getKey != operationTypeColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filtered out
|
|
||
| val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB") | ||
|
|
||
| def deleteRDD(target: TargetSettings.DynamoDB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there should be function for delete operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is exactly the big question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was told that current version will remove rows that should be deleted but keeps row key with the "_dynamo_op_type" , all other cells are gone
which is what confuses me and I didn't try myself if it really is like that (since the person who reported it might not know fully)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any stage to test it e2e ?
Within the integration test in this PR I can observe that row is not removed w/o this additional delete operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible , since rec. event type is not used later and dynamowriteable item doesn't seem to have an optype
I am curious if the rdd we have could be reused for both operations to avoid creating new client connection
|
Hi @tarzanek |
|
yes please, my expectation is that the docker with Scylla alternator doesn't start (version upgrade?) or dynamo mock service needs version upgrade |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a fix to prevent saving the _dynamo_op_type column during DynamoDB stream replication, addressing issue #237. The change ensures that internal operation metadata is filtered out when writing to the target database.
- Refactored stream processing logic to separate PUT and DELETE operations
- Added filtering to exclude
_dynamo_op_typefrom saved data - Implemented dedicated DELETE operation handling
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| DynamoStreamReplicationIntegrationTest.scala | New integration test verifying operation type column exclusion |
| DynamoStreamReplication.scala | Refactored to filter operation type and handle DELETE operations separately |
| DynamoDB.scala | Added new deleteRDD method for handling DELETE operations |
| .gitignore files | Updated to ignore Docker data directories |
| tests.yml | Added permissions fix for Docker directories |
| ToRemove.yml | Temporary test workflow file |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/scylladb/migrator/writers/DynamoStreamReplicationIntegrationTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| .collect { case Some(item) => item: util.Map[String, AttributeValueV1] } | ||
| .repartition(Runtime.getRuntime.availableProcessors() * 2) | ||
|
|
||
| val allOperationsSorted = rdd.collect().sortBy(_.get(sequenceNumberColumn).getS) |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collecting all operations into the driver memory could cause OOM errors for large datasets. Consider using distributed sorting with sortBy() on the RDD instead of collecting first.
| val allOperationsSorted = rdd.collect().sortBy(_.get(sequenceNumberColumn).getS) | |
| val allOperationsSorted = rdd.sortBy(_.get(sequenceNumberColumn).getS).collect() |
| (lastType, lastBatch :+ item) :: rest | ||
| } else { | ||
| (currentType, List(item)) :: (lastType, lastBatch) :: rest | ||
| } | ||
| } | ||
| .reverse |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using :+ for list append is O(n) operation. Consider using a mutable buffer or reversing the accumulation strategy to use :: (prepend) which is O(1).
| (lastType, lastBatch :+ item) :: rest | |
| } else { | |
| (currentType, List(item)) :: (lastType, lastBatch) :: rest | |
| } | |
| } | |
| .reverse | |
| (lastType, item :: lastBatch) :: rest | |
| } else { | |
| (currentType, List(item)) :: (lastType, lastBatch) :: rest | |
| } | |
| } | |
| .reverse | |
| .map { case (opType, batch) => (opType, batch.reverse) } |
| if (!keyToDelete.isEmpty) { | ||
| try { | ||
| dynamoDB.deleteItem( | ||
| DeleteItemRequest | ||
| .builder() | ||
| .tableName(target.table) | ||
| .key(keyToDelete) | ||
| .build() | ||
| ) | ||
| } catch { | ||
| case e: Exception => | ||
| log.error( | ||
| s"Failed to delete item with key ${keyToDelete} from table ${target.table}", | ||
| e) | ||
| } |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The isEmpty check is redundant since the forEach loop will naturally skip if keySchema is empty, and the delete operation will fail gracefully if keyToDelete is empty.
| if (!keyToDelete.isEmpty) { | |
| try { | |
| dynamoDB.deleteItem( | |
| DeleteItemRequest | |
| .builder() | |
| .tableName(target.table) | |
| .key(keyToDelete) | |
| .build() | |
| ) | |
| } catch { | |
| case e: Exception => | |
| log.error( | |
| s"Failed to delete item with key ${keyToDelete} from table ${target.table}", | |
| e) | |
| } | |
| try { | |
| dynamoDB.deleteItem( | |
| DeleteItemRequest | |
| .builder() | |
| .tableName(target.table) | |
| .key(keyToDelete) | |
| .build() | |
| ) | |
| } catch { | |
| case e: Exception => | |
| log.error( | |
| s"Failed to delete item with key ${keyToDelete} from table ${target.table}", | |
| e) |
|
@pizzaeueu won't the batching change order if we parallelize by "batch"? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| val itemWithoutOp = item | ||
| .entrySet() | ||
| .stream() | ||
| .filter(e => e.getKey != operationTypeColumn) | ||
| .collect( | ||
| Collectors.toMap( | ||
| (e: util.Map.Entry[String, AttributeValueV1]) => e.getKey, | ||
| (e: util.Map.Entry[String, AttributeValueV1]) => | ||
| AttributeValueUtils.fromV1(e.getValue) | ||
| ) | ||
| ) |
Copilot
AI
Sep 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This Java-style stream processing is unnecessarily complex for Scala. Consider using Scala collections for better readability: item.asScala.filter(_._1 != operationTypeColumn).map { case (k, v) => k -> AttributeValueUtils.fromV1(v) }.asJava
| val itemWithoutOp = item | |
| .entrySet() | |
| .stream() | |
| .filter(e => e.getKey != operationTypeColumn) | |
| .collect( | |
| Collectors.toMap( | |
| (e: util.Map.Entry[String, AttributeValueV1]) => e.getKey, | |
| (e: util.Map.Entry[String, AttributeValueV1]) => | |
| AttributeValueUtils.fromV1(e.getValue) | |
| ) | |
| ) | |
| val itemWithoutOp = item.asScala | |
| .filter { case (k, _) => k != operationTypeColumn } | |
| .map { case (k, v) => k -> AttributeValueUtils.fromV1(v) } | |
| .asJava |
|
|
||
| DynamoDB.writeRDD(target, renamesMap, writableRdd, targetTableDesc)(spark) | ||
| run( | ||
| msgs.asInstanceOf[RDD[Option[util.Map[String, AttributeValueV1]]]], |
Copilot
AI
Sep 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using asInstanceOf for type casting is unsafe and can lead to runtime ClassCastException. The type should already be correct based on the DStream definition - consider removing the cast or adding proper type validation.
| msgs.asInstanceOf[RDD[Option[util.Map[String, AttributeValueV1]]]], | |
| msgs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| tableDesc | ||
| ) | ||
|
|
||
| val finalItems = scanAll(sourceDDb(), tableName).sortBy(m => m("id").s) |
Copilot
AI
Sep 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is scanning the source DynamoDB table instead of the target table. This should scan targetAlternator() to verify that the operations were correctly applied to the target database.
| val finalItems = scanAll(sourceDDb(), tableName).sortBy(m => m("id").s) | |
| val finalItems = scanAll(targetAlternator(), tableName).sortBy(m => m("id").s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually a valid comment, why is source validated ? @pizzaeueu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| val itemWithoutOp = item.asScala.collect { | ||
| case (k, v) if k != operationTypeColumn => k -> AttributeValueUtils.fromV1(v) | ||
| }.asJava |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates a new collection for each item processing. Consider pre-filtering the operation type column or using a more efficient approach to avoid repeated collection transformations.
| val itemWithoutOp = item.asScala.collect { | |
| case (k, v) if k != operationTypeColumn => k -> AttributeValueUtils.fromV1(v) | |
| }.asJava | |
| val itemWithoutOp = { | |
| val m = new java.util.HashMap[String, AttributeValueV2]() | |
| val it = item.entrySet().iterator() | |
| while (it.hasNext) { | |
| val entry = it.next() | |
| if (entry.getKey != operationTypeColumn) { | |
| m.put(entry.getKey, AttributeValueUtils.fromV1(entry.getValue)) | |
| } | |
| } | |
| m | |
| } |
| .tableName(tableName) | ||
| .item( | ||
| Map( | ||
| "id" -> AttributeValue.builder.s("toDelete").build, |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing parentheses after builder. It should be AttributeValue.builder().s(\"toDelete\").build() to properly chain the method calls.
| .item( | ||
| Map( | ||
| "id" -> AttributeValue.builder.s("toDelete").build, | ||
| "value" -> AttributeValue.builder.s("value1").build |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing parentheses after builder. It should be AttributeValue.builder().s(\"value1\").build() to properly chain the method calls.
| "id" -> AttributeValue.builder.s("toUpdate").build, | ||
| "value" -> AttributeValue.builder.s("value2").build |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing parentheses after builder. It should be AttributeValue.builder().s(\"toUpdate\").build() and AttributeValue.builder().s(\"value2\").build() to properly chain the method calls.
Relates to #237