Skip to content

Commit 324c0e7

Browse files
author
Artem
committed
Add flag to support parquet migration with and without savepoints
1 parent 40fe4a0 commit 324c0e7

File tree

12 files changed

+630
-10
lines changed

12 files changed

+630
-10
lines changed

build.sbt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ lazy val migrator = (project in file("migrator")).enablePlugins(BuildInfoPlugin)
5252
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the Spark ScyllaDB connector
5353
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.8.0",
5454
"com.scylladb.alternator" % "load-balancing" % "1.0.0",
55-
"io.circe" %% "circe-generic" % "0.14.7",
56-
"io.circe" %% "circe-parser" % "0.14.7",
57-
"io.circe" %% "circe-yaml" % "0.15.1",
55+
"io.circe" %% "circe-generic" % "0.14.7",
56+
"io.circe" %% "circe-parser" % "0.14.7",
57+
"io.circe" %% "circe-yaml" % "0.15.1",
58+
"io.circe" %% "circe-generic-extras" % "0.14.4",
5859
),
5960
assembly / assemblyShadeRules := Seq(
6061
ShadeRule.rename("org.yaml.snakeyaml.**" -> "com.scylladb.shaded.@1").inAll
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.scylladb.migrator.config
22

3-
import io.circe.{ Decoder, Encoder }
4-
import io.circe.generic.semiauto.{ deriveDecoder, deriveEncoder }
3+
import io.circe.Codec
4+
import io.circe.generic.extras.Configuration
5+
import io.circe.generic.extras.semiauto._
56

6-
case class Savepoints(intervalSeconds: Int, path: String)
7+
case class Savepoints(intervalSeconds: Int, path: String, enableParquetFileTracking: Boolean = true)
78

89
object Savepoints {
9-
implicit val encoder: Encoder[Savepoints] = deriveEncoder[Savepoints]
10-
implicit val decoder: Decoder[Savepoints] = deriveDecoder[Savepoints]
10+
implicit val config: Configuration = Configuration.default.withDefaults
11+
implicit val codec: Codec[Savepoints] = deriveConfiguredCodec[Savepoints]
1112
}

migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.scylladb.migrator.readers
22

33
import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings }
4-
import com.scylladb.migrator.scylla.{ ScyllaParquetMigrator, SourceDataFrame }
4+
import com.scylladb.migrator.scylla.{ ScyllaMigrator, ScyllaParquetMigrator, SourceDataFrame }
55
import org.apache.log4j.LogManager
66
import org.apache.spark.sql.{ AnalysisException, SparkSession }
77
import scala.util.Using
@@ -12,8 +12,30 @@ object Parquet {
1212
def migrateToScylla(config: MigratorConfig,
1313
source: SourceSettings.Parquet,
1414
target: TargetSettings.Scylla)(implicit spark: SparkSession): Unit = {
15-
log.info("Starting Parquet migration with parallel processing and file-level savepoints")
1615

16+
val useFileTracking = config.savepoints.enableParquetFileTracking
17+
18+
if (useFileTracking) {
19+
log.info(
20+
"Starting Parquet migration with file-level savepoint tracking")
21+
migrateWithSavepoints(config, source, target)
22+
} else {
23+
log.info("Starting Parquet migration without savepoint tracking")
24+
migrateWithoutSavepoints(config, source, target)
25+
}
26+
}
27+
28+
/**
29+
* Parquet migration with file-level savepoint tracking.
30+
*
31+
* This mode tracks completion of individual Parquet files, enabling resume capability
32+
* if the migration is interrupted. Uses SparkListener to detect when all partitions
33+
* of a file have been processed.
34+
*/
35+
private def migrateWithSavepoints(
36+
config: MigratorConfig,
37+
source: SourceSettings.Parquet,
38+
target: TargetSettings.Scylla)(implicit spark: SparkSession): Unit = {
1739
configureHadoopCredentials(spark, source)
1840

1941
val allFiles = listParquetFiles(spark, source.path)
@@ -70,6 +92,21 @@ object Parquet {
7092
}
7193
}
7294

95+
/**
96+
* Parquet migration without savepoint tracking.
97+
*
98+
* This mode reads all Parquet files using Spark's native parallelism but does not
99+
* track individual file completion. If migration is interrupted, it will restart
100+
* from the beginning.
101+
*/
102+
private def migrateWithoutSavepoints(
103+
config: MigratorConfig,
104+
source: SourceSettings.Parquet,
105+
target: TargetSettings.Scylla)(implicit spark: SparkSession): Unit = {
106+
val sourceDF = ParquetWithoutSavepoints.readDataFrame(spark, source)
107+
ScyllaMigrator.migrate(config, target, sourceDF)
108+
}
109+
73110
def listParquetFiles(spark: SparkSession, path: String): Seq[String] = {
74111
log.info(s"Discovering Parquet files in $path")
75112

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.scylladb.migrator.readers
2+
3+
import com.scylladb.migrator.config.SourceSettings
4+
import com.scylladb.migrator.scylla.SourceDataFrame
5+
import org.apache.log4j.LogManager
6+
import org.apache.spark.sql.SparkSession
7+
8+
/**
9+
* Parquet reader implementation without savepoint tracking.
10+
*
11+
* This implementation provides simple Parquet file reading without file-level savepoint tracking.
12+
* Enable via configuration: `savepoints.enableParquetFileTracking = false`
13+
*/
14+
object ParquetWithoutSavepoints {
15+
val log = LogManager.getLogger("com.scylladb.migrator.readers.ParquetWithoutSavepoints")
16+
17+
def readDataFrame(spark: SparkSession, source: SourceSettings.Parquet): SourceDataFrame = {
18+
log.info(s"Reading Parquet files from ${source.path} (without savepoint tracking)")
19+
20+
Parquet.configureHadoopCredentials(spark, source)
21+
22+
val df = spark.read.parquet(source.path)
23+
log.info(s"Loaded Parquet DataFrame with ${df.rdd.getNumPartitions} partitions")
24+
25+
SourceDataFrame(df, None, savepointsSupported = false)
26+
}
27+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/comparison-data
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: comparison
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/comparison-savepoints
21+
intervalSeconds: 300
22+
enableParquetFileTracking: false
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/legacy
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: singlefile
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/legacy-savepoints
21+
intervalSeconds: 300
22+
enableParquetFileTracking: false
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/legacy
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: legacytest
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/legacy-savepoints
21+
intervalSeconds: 300
22+
enableParquetFileTracking: false
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/legacy2
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: legacytest2
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/legacy-savepoints
21+
intervalSeconds: 300
22+
enableParquetFileTracking: false
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/legacy3
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: legacytest3
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/legacy-noresume
21+
intervalSeconds: 300
22+
enableParquetFileTracking: false
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
source:
2+
type: parquet
3+
path: /app/parquet/comparison-data
4+
5+
target:
6+
type: scylla
7+
host: scylla
8+
port: 9042
9+
localDC: datacenter1
10+
credentials:
11+
username: dummy
12+
password: dummy
13+
keyspace: test
14+
table: comparison
15+
consistencyLevel: LOCAL_QUORUM
16+
connections: 16
17+
stripTrailingZerosForDecimals: false
18+
19+
savepoints:
20+
path: /app/spark-master/comparison-savepoints
21+
intervalSeconds: 300
22+
enableParquetFileTracking: true

0 commit comments

Comments
 (0)