Skip to content

support converting null TTLs to default TTL relative to write timestamp #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ source:
# Preserve TTLs and WRITETIMEs of cells in the source database. Note that this
# option is *incompatible* when copying tables with collections (lists, maps, sets).
preserveTimestamps: true
# optional, default TTL in seconds to use if source.preserveTimestamps AND IF original TTL is null, final TTL to be set, if 0 or unset it will be ignored
# final write TTL set will be relative to writetimestamp, so: defaultTTL - now - writetimestamp , rounded to milliseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# final write TTL set will be relative to writetimestamp, so: defaultTTL - now - writetimestamp , rounded to milliseconds
# final write TTL set will be relative to writetimestamp, so: defaultTTL - (now - writetimestamp), rounded to milliseconds

# it assumes writetimestamps are in UTC
#defaultTTL: 0
# Number of splits to use - this should be at minimum the amount of cores
# available in the Spark cluster, and optimally more; higher splits will lead
# to more fine-grained resumes. Aim for 8 * (Spark cores).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object SourceSettings {
connections: Option[Int],
fetchSize: Int,
preserveTimestamps: Boolean,
defaultTTL: Option[Long],
where: Option[String])
extends SourceSettings
case class DynamoDB(endpoint: Option[DynamoDBEndpoint],
Expand Down
28 changes: 22 additions & 6 deletions src/main/scala/com/scylladb/migrator/readers/Cassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.spark.sql.types.{ IntegerType, LongType, StructField, StructTy
import org.apache.spark.sql.{ DataFrame, Row, SparkSession }
import org.apache.spark.unsafe.types.UTF8String

import java.time.Instant
import scala.collection.mutable.ArrayBuffer

object Cassandra {
Expand Down Expand Up @@ -92,7 +93,8 @@ object Cassandra {
def explodeRow(row: Row,
schema: StructType,
primaryKeyOrdinals: Map[String, Int],
regularKeyOrdinals: Map[String, (Int, Int, Int)]) =
regularKeyOrdinals: Map[String, (Int, Int, Int)],
defaultTTL: Long) =
if (regularKeyOrdinals.isEmpty) List(row)
else {
val rowTimestampsToFields =
Expand Down Expand Up @@ -126,7 +128,17 @@ object Cassandra {

timestampsToFields
.map {
case ((ttl, writetime), fields) =>
case ((ttl, writetime), fields) => {
val writetimestamp = writetime.getOrElse(CassandraOption.Unset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspicious because writetime has type Option[Long] and CassandraOption.Unset has a type that is disjoint from Long. See my comment below for a suggestion.

val baseTTL = if (defaultTTL > 0) defaultTTL else 0L
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be clearer to keep the Option in the defaultTTL parameter.

So, the parameter would have type defaultTTL: Option[Long], and here you would write val baseTTL = defaultTTL.getOrElse(0).

val deltaTTL = if (writetimestamp == CassandraOption.Unset) {
0
} else {
baseTTL - (System.currentTimeMillis - writetimestamp.asInstanceOf[Long] / 1000) / 1000
}
Comment on lines +134 to +138
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to avoid using asInstanceOf. I suggest the following changes, which I believe are more readable:

Suggested change
val deltaTTL = if (writetimestamp == CassandraOption.Unset) {
0
} else {
baseTTL - (System.currentTimeMillis - writetimestamp.asInstanceOf[Long] / 1000) / 1000
}
val deltaTTL = writetime match {
case None => 0
case Some(writetimestamp) =>
baseTTL - (System.currentTimeMillis - writetimestamp / 1000) / 1000
}

// expire the record in 1s in case delta is negative, use given TTL if write timestamp wasn't found
val finalttl =
if (deltaTTL > 0) deltaTTL else if (deltaTTL == 0) baseTTL else 1L
Copy link
Collaborator

@julienrf julienrf Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here use assume that deltaTTL == 0 is always the case when the write timestamp wasn’t found. But it seems it could also happen e.g. if baseTTL == 5 and the write timestamp was 5 seconds ago.

Maybe use the type Option again to model the fact that the deltaTTL might be unset. So, you would have something like:

Suggested change
if (deltaTTL > 0) deltaTTL else if (deltaTTL == 0) baseTTL else 1L
deltaTTL.map(_.max(1L)).getOrElse(baseTTL)

val newValues = schema.fields.map { field =>
primaryKeyOrdinals
.get(field.name)
Expand All @@ -135,9 +147,10 @@ object Cassandra {
else Some(row.get(ord))
}
.getOrElse(fields.getOrElse(field.name, CassandraOption.Unset))
} ++ Seq(ttl.getOrElse(0L), writetime.getOrElse(CassandraOption.Unset))
} ++ Seq(ttl.getOrElse(finalttl), writetimestamp)

Row(newValues: _*)
}
}
}

Expand Down Expand Up @@ -168,7 +181,8 @@ object Cassandra {
df: DataFrame,
timestampColumns: Option[TimestampColumns],
origSchema: StructType,
tableDef: TableDef): DataFrame =
tableDef: TableDef,
defaultTTL: Long): DataFrame =
timestampColumns match {
case None => df
case Some(TimestampColumns(ttl, writeTime)) =>
Expand All @@ -193,7 +207,8 @@ object Cassandra {
_,
broadcastSchema.value,
broadcastPrimaryKeyOrdinals.value,
broadcastRegularKeyOrdinals.value)
broadcastRegularKeyOrdinals.value,
defaultTTL)
}(RowEncoder(finalSchema))

}
Expand Down Expand Up @@ -269,7 +284,8 @@ object Cassandra {
spark.createDataFrame(rdd, selection.schema),
selection.timestampColumns,
origSchema,
tableDef
tableDef,
source.defaultTTL.getOrElse(0L)
)

SourceDataFrame(resultingDataframe, selection.timestampColumns, true)
Expand Down