Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,70 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription }
import software.amazon.awssdk.services.dynamodb.model.{
AttributeValue,
DeleteItemRequest,
TableDescription
}

import java.util
import java.util.stream.Collectors

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB")

def deleteRDD(target: TargetSettings.DynamoDB,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there should be function for delete operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is exactly the big question

Copy link
Contributor

@tarzanek tarzanek Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told that current version will remove rows that should be deleted but keeps row key with the "_dynamo_op_type" , all other cells are gone
which is what confuses me and I didn't try myself if it really is like that (since the person who reported it might not know fully)

Copy link
Collaborator Author

@pizzaeueu pizzaeueu Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any stage to test it e2e ?
Within the integration test in this PR I can observe that row is not removed w/o this additional delete operation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible , since rec. event type is not used later and dynamowriteable item doesn't seem to have an optype

I am curious if the rdd we have could be reused for both operations to avoid creating new client connection

targetTableDesc: TableDescription,
rdd: RDD[util.Map[String, AttributeValue]])(implicit spark: SparkSession): Unit = {

val keySchema = targetTableDesc.keySchema()

rdd.foreachPartition { partition =>
if (partition.nonEmpty) {
val dynamoDB = DynamoUtils.buildDynamoClient(
target.endpoint,
target.finalCredentials.map(_.toProvider),
target.region,
Seq.empty
)

try {
partition.foreach { item =>
val keyToDelete =
new util.HashMap[String, AttributeValue]()

keySchema.forEach { keyElement =>
val keyName = keyElement.attributeName()
if (item.containsKey(keyName)) {
keyToDelete.put(keyName, item.get(keyName))
}
}

if (!keyToDelete.isEmpty) {
try {
dynamoDB.deleteItem(
DeleteItemRequest
.builder()
.tableName(target.table)
.key(keyToDelete)
.build()
)
} catch {
case e: Exception =>
log.error(
s"Failed to delete item with key ${keyToDelete} from table ${target.table}",
e)
}
Comment on lines +52 to +66
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The isEmpty check is redundant since the forEach loop will naturally skip if keySchema is empty, and the delete operation will fail gracefully if keyToDelete is empty.

Suggested change
if (!keyToDelete.isEmpty) {
try {
dynamoDB.deleteItem(
DeleteItemRequest
.builder()
.tableName(target.table)
.key(keyToDelete)
.build()
)
} catch {
case e: Exception =>
log.error(
s"Failed to delete item with key ${keyToDelete} from table ${target.table}",
e)
}
try {
dynamoDB.deleteItem(
DeleteItemRequest
.builder()
.tableName(target.table)
.key(keyToDelete)
.build()
)
} catch {
case e: Exception =>
log.error(
s"Failed to delete item with key ${keyToDelete} from table ${target.table}",
e)

Copilot uses AI. Check for mistakes.
}
}
} finally {
dynamoDB.close()
}
}
}
}

def writeRDD(target: TargetSettings.DynamoDB,
renamesMap: Map[String, String],
rdd: RDD[(Text, DynamoDBItemWritable)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@ import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.amazonaws.services.dynamodbv2.model.{ AttributeValue => AttributeValueV1 }
import com.scylladb.migrator.AttributeValueUtils
import com.scylladb.migrator.config.{ AWSCredentials, SourceSettings, TargetSettings }
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.io.Text
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{
KinesisDynamoDBInputDStream,
KinesisInitialPositions,
SparkAWSCredentials
}
import software.amazon.awssdk.services.dynamodb.model.TableDescription
import com.scylladb.migrator.DynamoUtils
import software.amazon.awssdk.services.dynamodb.model.{
AttributeValue => AttributeValueV2,
DeleteItemRequest,
PutItemRequest,
TableDescription
}

import java.util
import java.util.stream.Collectors
import scala.jdk.CollectionConverters._

object DynamoStreamReplication {
val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoStreamReplication")

type DynamoItem = util.Map[String, AttributeValueV1]

// We enrich the table items with a column `operationTypeColumn` describing the type of change
// applied to the item.
// We have to deal with multiple representation of the data because `spark-kinesis-dynamodb`
Expand All @@ -30,6 +38,78 @@ object DynamoStreamReplication {
private val putOperation = new AttributeValueV1().withBOOL(true)
private val deleteOperation = new AttributeValueV1().withBOOL(false)

private[writers] def run(
msgs: RDD[Option[DynamoItem]],
target: TargetSettings.DynamoDB,
renamesMap: Map[String, String],
targetTableDesc: TableDescription)(implicit spark: SparkSession): Unit = {
val rdd = msgs.flatMap(_.toSeq)

val putCount = spark.sparkContext.longAccumulator("putCount")
val deleteCount = spark.sparkContext.longAccumulator("deleteCount")
val keyAttributeNames = targetTableDesc.keySchema.asScala.map(_.attributeName).toSet

rdd.foreachPartition { partition =>
if (partition.nonEmpty) {
val client =
DynamoUtils.buildDynamoClient(
target.endpoint,
target.finalCredentials.map(_.toProvider),
target.region,
Seq.empty
)
try {
partition.foreach { item =>
val isPut = item.get(operationTypeColumn) == putOperation

val itemWithoutOp = item.asScala.collect {
case (k, v) if k != operationTypeColumn => k -> AttributeValueUtils.fromV1(v)
}.asJava
Comment on lines +65 to +67
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a new collection for each item processing. Consider pre-filtering the operation type column or using a more efficient approach to avoid repeated collection transformations.

Suggested change
val itemWithoutOp = item.asScala.collect {
case (k, v) if k != operationTypeColumn => k -> AttributeValueUtils.fromV1(v)
}.asJava
val itemWithoutOp = {
val m = new java.util.HashMap[String, AttributeValueV2]()
val it = item.entrySet().iterator()
while (it.hasNext) {
val entry = it.next()
if (entry.getKey != operationTypeColumn) {
m.put(entry.getKey, AttributeValueUtils.fromV1(entry.getValue))
}
}
m
}

Copilot uses AI. Check for mistakes.

if (isPut) {
putCount.add(1)
val finalItem = itemWithoutOp.asScala.map {
case (key, value) => renamesMap.getOrElse(key, key) -> value
}.asJava
try {
client.putItem(
PutItemRequest.builder().tableName(target.table).item(finalItem).build())
} catch {
case e: Exception =>
log.error(s"Failed to put item into ${target.table}", e)
}
} else {
deleteCount.add(1)
val keyToDelete = itemWithoutOp.asScala
.filter { case (key, _) => keyAttributeNames.contains(key) }
.map { case (key, value) => renamesMap.getOrElse(key, key) -> value }
.asJava
try {
client.deleteItem(
DeleteItemRequest.builder().tableName(target.table).key(keyToDelete).build())
} catch {
case e: Exception =>
log.error(s"Failed to delete item from ${target.table}", e)
}
}
}
} finally {
client.close()
}
}
}

if (putCount.value > 0 || deleteCount.value > 0) {
log.info(s"""
|Changes to be applied:
| - ${putCount.value} items to UPSERT
| - ${deleteCount.value} items to DELETE
|""".stripMargin)
} else {
log.info("No changes to apply")
}
}

def createDStream(spark: SparkSession,
streamingContext: StreamingContext,
src: SourceSettings.DynamoDB,
Expand All @@ -45,7 +125,7 @@ object DynamoStreamReplication {
messageHandler = {
case recAdapter: RecordAdapter =>
val rec = recAdapter.getInternalObject
val newMap = new util.HashMap[String, AttributeValueV1]()
val newMap: DynamoItem = new util.HashMap[String, AttributeValueV1]()

if (rec.getDynamodb.getNewImage ne null) {
newMap.putAll(rec.getDynamodb.getNewImage)
Expand Down Expand Up @@ -76,50 +156,7 @@ object DynamoStreamReplication {
}
.getOrElse(SparkAWSCredentials.builder.build())
).foreachRDD { msgs =>
val rdd = msgs
.collect { case Some(item) => item: util.Map[String, AttributeValueV1] }
.repartition(Runtime.getRuntime.availableProcessors() * 2)

val changes =
rdd
.groupBy { item =>
item.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
}
}
.mapValues(_.size)
.collect()
if (changes.nonEmpty) {
log.info("Changes to be applied:")
for ((operation, count) <- changes) {
log.info(s"${operation}: ${count}")
}
} else {
log.info("No changes to apply")
}

val writableRdd =
rdd.map { item =>
(
new Text,
new DynamoDBItemWritable(
item
.entrySet()
.stream()
.collect(
Collectors.toMap(
(e: util.Map.Entry[String, AttributeValueV1]) => e.getKey,
(e: util.Map.Entry[String, AttributeValueV1]) =>
AttributeValueUtils.fromV1(e.getValue)
)
)
)
)
}

DynamoDB.writeRDD(target, renamesMap, writableRdd, targetTableDesc)(spark)
run(msgs, target, renamesMap, targetTableDesc)(spark)
}

}
Loading
Loading