-
Notifications
You must be signed in to change notification settings - Fork 45
Add savepoints for parquet migration #261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 17 commits
cbc05e2
8d56051
8fc62f8
4eb7038
ea4005b
11f25f0
f7d5332
c1a25d7
03ca5de
1156c79
7421afc
f36b901
6438b2f
f4640b5
f09f96c
9712e8c
076ccad
1e500e5
2571dae
cbb0cae
5da74e1
40fe4a0
324c0e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package com.scylladb.migrator.alternator | ||
|
|
||
| import org.apache.spark.util.AccumulatorV2 | ||
| import java.util.concurrent.atomic.AtomicReference | ||
|
|
||
| class StringSetAccumulator(initialValue: Set[String] = Set.empty) | ||
| extends AccumulatorV2[String, Set[String]] { | ||
|
|
||
| private val ref = new AtomicReference(initialValue) | ||
|
|
||
| // Note: isZero may be momentarily inconsistent in concurrent scenarios, | ||
| // as it reads the current value of the set without synchronization. | ||
| // This is eventually consistent and thread-safe, but may not reflect the most recent updates. | ||
| def isZero: Boolean = ref.get.isEmpty | ||
| def copy(): StringSetAccumulator = new StringSetAccumulator(ref.get) | ||
| def reset(): Unit = ref.set(Set.empty) | ||
| def add(v: String): Unit = ref.getAndUpdate(_ + v) | ||
|
|
||
| def merge(other: AccumulatorV2[String, Set[String]]): Unit = | ||
| ref.getAndUpdate(_ ++ other.value) | ||
|
|
||
| def value: Set[String] = ref.get | ||
| } | ||
|
|
||
| object StringSetAccumulator { | ||
| def apply(initialValue: Set[String] = Set.empty): StringSetAccumulator = | ||
| new StringSetAccumulator(initialValue) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ case class MigratorConfig(source: SourceSettings, | |
| savepoints: Savepoints, | ||
| skipTokenRanges: Option[Set[(Token[_], Token[_])]], | ||
| skipSegments: Option[Set[Int]], | ||
| skipParquetFiles: Option[Set[String]], | ||
|
||
| validation: Option[Validation]) { | ||
| def render: String = this.asJson.asYaml.spaces2 | ||
|
|
||
|
|
@@ -25,6 +26,8 @@ case class MigratorConfig(source: SourceSettings, | |
|
|
||
| def getSkipTokenRangesOrEmptySet: Set[(Token[_], Token[_])] = skipTokenRanges.getOrElse(Set.empty) | ||
|
|
||
| def getSkipParquetFilesOrEmptySet: Set[String] = skipParquetFiles.getOrElse(Set.empty) | ||
|
|
||
| } | ||
| object MigratorConfig { | ||
| implicit val tokenEncoder: Encoder[Token[_]] = Encoder.instance { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,7 @@ import io.circe.{ Decoder, Encoder } | |||||||||||||||||||||||||||||||||||||||||||
| import io.circe.generic.semiauto.{ deriveDecoder, deriveEncoder } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | |
| * Configuration for periodic savepoints written during a migration run. | |
| * | |
| * @param intervalSeconds | |
| * How often, in seconds, a savepoint should be written. | |
| * @param path | |
| * Filesystem path (directory or prefix) where savepoint data will be stored. | |
| * @param enableParquetFileTracking | |
| * When `true` (the default), enables tracking of already-processed Parquet files | |
| * as part of the savepoint state. This prevents the same Parquet file from | |
| * being migrated more than once if the job is restarted or savepoints are | |
| * resumed. | |
| * | |
| * Set this to `false` to keep the legacy behavior where Parquet files are not | |
| * tracked in savepoints. Disabling tracking may be useful for backwards | |
| * compatibility with older savepoints or when file tracking is handled by an | |
| * external mechanism, but it means repeated runs may reprocess the same | |
| * Parquet files. | |
| */ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| package com.scylladb.migrator.readers | ||
|
|
||
| import org.apache.log4j.LogManager | ||
| import org.apache.spark.scheduler.{ SparkListener, SparkListenerTaskEnd } | ||
| import org.apache.spark.Success | ||
|
|
||
| import scala.collection.concurrent.TrieMap | ||
|
|
||
| /** | ||
| * SparkListener that tracks partition completion and aggregates it to file-level completion. | ||
| * | ||
| * This listener monitors Spark task completion events and maintains mappings between | ||
| * partitions and files. When all partitions belonging to a file have been successfully | ||
| * completed, it marks the file as processed via the ParquetSavepointsManager. | ||
| * | ||
| * @param partitionToFile Mapping from Spark partition ID to source file paths | ||
pizzaeueu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * @param fileToPartitions Mapping from file path to the set of partition IDs reading from it | ||
| * @param savepointsManager Manager to notify when files are completed | ||
| */ | ||
| class FileCompletionListener( | ||
| partitionToFiles: Map[Int, Set[String]], | ||
| fileToPartitions: Map[String, Set[Int]], | ||
| savepointsManager: ParquetSavepointsManager | ||
| ) extends SparkListener { | ||
|
|
||
| private val log = LogManager.getLogger("com.scylladb.migrator.readers.FileCompletionListener") | ||
|
|
||
| private val completedPartitions = TrieMap.empty[Int, Boolean] | ||
|
|
||
| private val completedFiles = TrieMap.empty[String, Boolean] | ||
|
|
||
| log.info( | ||
| s"FileCompletionListener initialized: tracking ${fileToPartitions.size} files " + | ||
| s"across ${partitionToFiles.size} partitions") | ||
|
|
||
| override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = | ||
| if (taskEnd.reason == Success) { | ||
| val partitionId = taskEnd.taskInfo.partitionId | ||
|
|
||
| partitionToFiles.get(partitionId) match { | ||
| case Some(filenames) => | ||
| if (completedPartitions.putIfAbsent(partitionId, true).isEmpty) { | ||
| filenames.foreach { filename => | ||
| log.debug(s"Partition $partitionId completed (file: $filename)") | ||
| checkFileCompletion(filename) | ||
| } | ||
| } | ||
|
|
||
| case None => | ||
| log.trace(s"Task completed for untracked partition $partitionId") | ||
| } | ||
| } else { | ||
| log.debug( | ||
| s"Task for partition ${taskEnd.taskInfo.partitionId} did not complete successfully: ${taskEnd.reason}") | ||
| } | ||
|
|
||
| private def checkFileCompletion(filename: String): Unit = { | ||
| if (completedFiles.contains(filename)) { | ||
| return | ||
| } | ||
|
|
||
| fileToPartitions.get(filename) match { | ||
| case Some(allPartitions) => | ||
| val allComplete = allPartitions.forall(completedPartitions.contains) | ||
|
|
||
| if (allComplete) { | ||
| if (completedFiles.putIfAbsent(filename, true).isEmpty) { | ||
| savepointsManager.markFileAsProcessed(filename) | ||
|
|
||
| val progress = s"${completedFiles.size}/${fileToPartitions.size}" | ||
| log.info(s"File completed: $filename (progress: $progress)") | ||
| } | ||
|
Comment on lines
+57
to
+72
|
||
| } else { | ||
| val completedCount = allPartitions.count(completedPartitions.contains) | ||
| log.trace(s"File $filename: $completedCount/${allPartitions.size} partitions complete") | ||
| } | ||
|
|
||
| case None => | ||
| log.warn(s"File $filename not found in fileToPartitions map (this shouldn't happen)") | ||
| } | ||
| } | ||
|
|
||
| def getCompletedFilesCount: Int = completedFiles.size | ||
|
|
||
tarzanek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def getTotalFilesCount: Int = fileToPartitions.size | ||
|
|
||
tarzanek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def getProgressReport: String = { | ||
| val filesCompleted = getCompletedFilesCount | ||
| val totalFiles = getTotalFilesCount | ||
|
|
||
| s"Progress: $filesCompleted/$totalFiles files" | ||
tarzanek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,22 +1,121 @@ | ||||||||||||||
| package com.scylladb.migrator.readers | ||||||||||||||
|
|
||||||||||||||
| import com.scylladb.migrator.config.SourceSettings | ||||||||||||||
| import com.scylladb.migrator.scylla.SourceDataFrame | ||||||||||||||
| import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings } | ||||||||||||||
| import com.scylladb.migrator.scylla.{ ScyllaParquetMigrator, SourceDataFrame } | ||||||||||||||
| import org.apache.log4j.LogManager | ||||||||||||||
| import org.apache.spark.sql.SparkSession | ||||||||||||||
| import org.apache.spark.sql.{ AnalysisException, SparkSession } | ||||||||||||||
| import scala.util.Using | ||||||||||||||
|
|
||||||||||||||
| object Parquet { | ||||||||||||||
| val log = LogManager.getLogger("com.scylladb.migrator.readers.Parquet") | ||||||||||||||
|
|
||||||||||||||
| def readDataFrame(spark: SparkSession, source: SourceSettings.Parquet): SourceDataFrame = { | ||||||||||||||
| def migrateToScylla(config: MigratorConfig, | ||||||||||||||
| source: SourceSettings.Parquet, | ||||||||||||||
| target: TargetSettings.Scylla)(implicit spark: SparkSession): Unit = { | ||||||||||||||
| log.info("Starting Parquet migration with parallel processing and file-level savepoints") | ||||||||||||||
|
|
||||||||||||||
| configureHadoopCredentials(spark, source) | ||||||||||||||
|
|
||||||||||||||
| val allFiles = listParquetFiles(spark, source.path) | ||||||||||||||
| val skipFiles = config.getSkipParquetFilesOrEmptySet | ||||||||||||||
| val filesToProcess = allFiles.filterNot(skipFiles.contains) | ||||||||||||||
|
|
||||||||||||||
| if (filesToProcess.isEmpty) { | ||||||||||||||
| log.info("No Parquet files to process. Migration is complete.") | ||||||||||||||
| return | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| log.info(s"Processing ${filesToProcess.size} Parquet files") | ||||||||||||||
|
|
||||||||||||||
| val df = if (skipFiles.isEmpty) { | ||||||||||||||
| spark.read.parquet(source.path) | ||||||||||||||
| } else { | ||||||||||||||
| spark.read.parquet(filesToProcess: _*) | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shuffle files before passing them over?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or is spark shuffling this by itself?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark handles partition sizing on its own using the spark.sql.files.maxPartitionBytes, so
However, I can add |
||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+52
to
+56
|
||||||||||||||
| val df = if (skipFiles.isEmpty) { | |
| spark.read.parquet(source.path) | |
| } else { | |
| spark.read.parquet(filesToProcess: _*) | |
| } | |
| val df = spark.read.parquet(filesToProcess: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false when you support savepoints now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a better look,
since basically savepoint manager needs to dump his state to savepoint file periodically (and in case of failure of course)
and I am wondering if this periodic write to savepoint file will happen if this is false ?!?!?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or the manager is self contained and will dump savepoints automatically once it's instantiated?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false when you support savepoints now?
Basically, this false flag will not create savepoint manager for basic ScyllaMigrator, and will not be used at all in ParquetMigrator at all
it's necessary to not create additional SavepointManager since we have external one created in Parquet object
Actually, I see current architecture / naming is confusing.
(or the manager is self contained and will dump savepoints automatically once it's instantiated?)
Yeah, it is 😌
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource management concern: The ParquetSavepointsManager is closed by the Using.resource block, but it's passed to ScyllaParquetMigrator.migrate which internally marks it with shouldCloseManager = false to prevent double-closing. However, this creates a brittle design where the lifecycle management is split between two places. If an exception occurs during migration (line 80), the savepointsManager.dumpMigrationState("completed") at line 82 will never be called, and only the "final" dump in ScyllaMigrator will execute. Consider consolidating the resource management logic or adding explicit error handling to ensure proper state dumping in all scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
below has nothing to do with parquet, but OK, I will just close one eye, but PR clean wise below just confuses people, so non related changes should be in separate PRs/commits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you
Looks like the leftover after the approach with both parallel / sequential migrations
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| package com.scylladb.migrator.readers | ||
|
|
||
| import com.scylladb.migrator.SavepointsManager | ||
| import com.scylladb.migrator.config.MigratorConfig | ||
| import com.scylladb.migrator.alternator.StringSetAccumulator | ||
| import org.apache.spark.SparkContext | ||
|
|
||
| class ParquetSavepointsManager(migratorConfig: MigratorConfig, | ||
| filesAccumulator: StringSetAccumulator) | ||
| extends SavepointsManager(migratorConfig) { | ||
|
|
||
| def describeMigrationState(): String = { | ||
| val processedCount = filesAccumulator.value.size | ||
| s"Processed files: $processedCount" | ||
| } | ||
|
|
||
| def updateConfigWithMigrationState(): MigratorConfig = | ||
| migratorConfig.copy(skipParquetFiles = Some(filesAccumulator.value)) | ||
|
|
||
| def markFileAsProcessed(filePath: String): Unit = { | ||
| filesAccumulator.add(filePath) | ||
| log.debug(s"Marked file as processed: $filePath") | ||
| } | ||
| } | ||
|
|
||
| object ParquetSavepointsManager { | ||
|
|
||
| def apply(migratorConfig: MigratorConfig, spark: SparkContext): ParquetSavepointsManager = { | ||
| val filesAccumulator = | ||
| StringSetAccumulator(migratorConfig.skipParquetFiles.getOrElse(Set.empty)) | ||
|
|
||
| spark.register(filesAccumulator, "processed-parquet-files") | ||
|
|
||
| new ParquetSavepointsManager(migratorConfig, filesAccumulator) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.