Skip to content

Commit 4babb71

Browse files
committed
add support to set default TTL as relative to existing writetimestamp and now (in UTC)
1 parent b2caaeb commit 4babb71

File tree

3 files changed

+26
-6
lines changed

3 files changed

+26
-6
lines changed

Diff for: config.yaml.example

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ source:
2828
# Preserve TTLs and WRITETIMEs of cells in the source database. Note that this
2929
# option is *incompatible* when copying tables with collections (lists, maps, sets).
3030
preserveTimestamps: true
31+
# optional, default TTL in seconds to use if source.preserveTimestamps AND IF original TTL is null, final TTL to be set, if 0 or unset it will be ignored
32+
# final write TTL set will be relative to writetimestamp, so: defaultTTL - now - writetimestamp , rounded to milliseconds
33+
# it assumes writetimestamps are in UTC
34+
#defaultTTL: 0
3135
# Number of splits to use - this should be at minimum the amount of cores
3236
# available in the Spark cluster, and optimally more; higher splits will lead
3337
# to more fine-grained resumes. Aim for 8 * (Spark cores).

Diff for: src/main/scala/com/scylladb/migrator/config/SourceSettings.scala

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ object SourceSettings {
2727
connections: Option[Int],
2828
fetchSize: Int,
2929
preserveTimestamps: Boolean,
30+
defaultTTL: Option[Long],
3031
where: Option[String])
3132
extends SourceSettings
3233
case class DynamoDB(endpoint: Option[DynamoDBEndpoint],

Diff for: src/main/scala/com/scylladb/migrator/readers/Cassandra.scala

+21-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.apache.spark.sql.types.{ IntegerType, LongType, StructField, StructTy
1515
import org.apache.spark.sql.{ DataFrame, Row, SparkSession }
1616
import org.apache.spark.unsafe.types.UTF8String
1717

18+
import java.time.Instant
1819
import scala.collection.mutable.ArrayBuffer
1920

2021
object Cassandra {
@@ -92,7 +93,8 @@ object Cassandra {
9293
def explodeRow(row: Row,
9394
schema: StructType,
9495
primaryKeyOrdinals: Map[String, Int],
95-
regularKeyOrdinals: Map[String, (Int, Int, Int)]) =
96+
regularKeyOrdinals: Map[String, (Int, Int, Int)],
97+
defaultTTL: Long) =
9698
if (regularKeyOrdinals.isEmpty) List(row)
9799
else {
98100
val rowTimestampsToFields =
@@ -126,7 +128,16 @@ object Cassandra {
126128

127129
timestampsToFields
128130
.map {
129-
case ((ttl, writetime), fields) =>
131+
case ((ttl, writetime), fields) => {
132+
val writetimestamp = writetime.getOrElse(CassandraOption.Unset)
133+
val baseTTL = if (defaultTTL > 0) defaultTTL else 0L
134+
val deltaTTL = if (writetimestamp == CassandraOption.Unset) {
135+
0
136+
} else {
137+
baseTTL - (System.currentTimeMillis - writetimestamp.asInstanceOf[Long] / 1000) / 1000
138+
}
139+
val finalttl =
140+
if (deltaTTL > 0) deltaTTL else if (deltaTTL==0) baseTTL else 1L
130141
val newValues = schema.fields.map { field =>
131142
primaryKeyOrdinals
132143
.get(field.name)
@@ -135,9 +146,10 @@ object Cassandra {
135146
else Some(row.get(ord))
136147
}
137148
.getOrElse(fields.getOrElse(field.name, CassandraOption.Unset))
138-
} ++ Seq(ttl.getOrElse(0L), writetime.getOrElse(CassandraOption.Unset))
149+
} ++ Seq(ttl.getOrElse(finalttl), writetimestamp)
139150

140151
Row(newValues: _*)
152+
}
141153
}
142154
}
143155

@@ -168,7 +180,8 @@ object Cassandra {
168180
df: DataFrame,
169181
timestampColumns: Option[TimestampColumns],
170182
origSchema: StructType,
171-
tableDef: TableDef): DataFrame =
183+
tableDef: TableDef,
184+
defaultTTL: Long): DataFrame =
172185
timestampColumns match {
173186
case None => df
174187
case Some(TimestampColumns(ttl, writeTime)) =>
@@ -193,7 +206,8 @@ object Cassandra {
193206
_,
194207
broadcastSchema.value,
195208
broadcastPrimaryKeyOrdinals.value,
196-
broadcastRegularKeyOrdinals.value)
209+
broadcastRegularKeyOrdinals.value,
210+
defaultTTL)
197211
}(RowEncoder(finalSchema))
198212

199213
}
@@ -269,7 +283,8 @@ object Cassandra {
269283
spark.createDataFrame(rdd, selection.schema),
270284
selection.timestampColumns,
271285
origSchema,
272-
tableDef
286+
tableDef,
287+
source.defaultTTL.getOrElse(0L)
273288
)
274289

275290
SourceDataFrame(resultingDataframe, selection.timestampColumns, true)

0 commit comments

Comments
 (0)