Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cbc05e2
config: Add skipParquetFiles to MigratorConfig
Nov 9, 2025
8d56051
alternator: Introduce StringSetAccumulator utility
Nov 9, 2025
8fc62f8
readers: Extract ParquetProcessingStrategy interface
Nov 9, 2025
4eb7038
readers: Implement ParallelParquetStrategy
Nov 9, 2025
ea4005b
scylla: Refactor ScyllaMigrator to support external savepoints
Nov 9, 2025
11f25f0
scylla: Add ScyllaParquetMigrator for file-by-file migration
Nov 9, 2025
f7d5332
readers: Add ParquetSavepointsManager
Nov 9, 2025
c1a25d7
readers: Implement SequentialParquetStrategy
Nov 9, 2025
03ca5de
readers: Add file discovery and credential configuration to Parquet
Nov 9, 2025
1156c79
readers: Wire up strategy selection in Parquet.migrateToScylla
Nov 9, 2025
7421afc
tests: Add unit tests for ParquetSavepoints functionality
Nov 9, 2025
f36b901
tests: Add integration tests for Parquet migration modes
Nov 9, 2025
6438b2f
tests: Add test configurations for Parquet modes
Nov 9, 2025
f4640b5
tests: Refactor ParquetToScyllaBasicMigrationTest
Nov 9, 2025
f09f96c
tests: Add gitignore for spark-master test artifacts
Nov 21, 2025
9712e8c
config: Add parquetProcessingMode to Savepoints configuration
Nov 9, 2025
076ccad
Replace Parquet strategy pattern with unified parallel approach
Nov 21, 2025
1e500e5
Optimize debug logging during parquet partition reading
Nov 25, 2025
2571dae
Apply suggestions from code review (copilot)
pizzaeueu Nov 26, 2025
cbb0cae
Extracts partition-to-file mappings from Spark execution plan
Jan 1, 2026
5da74e1
Merge branch 'parquet-savepoint-rebased' of github.com:pizzaeueu/scyl…
Jan 1, 2026
40fe4a0
Disable tablets for tests
Jan 6, 2026
324c0e7
Add flag to support parquet migration with and without savepoints
Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ object Migrator {
migratorConfig.getSkipTokenRangesOrEmptySet)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
readers.Parquet.migrateToScylla(migratorConfig, parquetSource, scyllaTarget)(spark)
case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrateFromDynamoDB(dynamoSource, alternatorTarget, migratorConfig)
case (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.scylladb.migrator.alternator

import org.apache.spark.util.AccumulatorV2
import java.util.concurrent.atomic.AtomicReference

class StringSetAccumulator(initialValue: Set[String] = Set.empty)
extends AccumulatorV2[String, Set[String]] {

private val ref = new AtomicReference(initialValue)

// Note: isZero may be momentarily inconsistent in concurrent scenarios,
// as it reads the current value of the set without synchronization.
// This is eventually consistent and thread-safe, but may not reflect the most recent updates.
def isZero: Boolean = ref.get.isEmpty
def copy(): StringSetAccumulator = new StringSetAccumulator(ref.get)
def reset(): Unit = ref.set(Set.empty)
def add(v: String): Unit = ref.getAndUpdate(_ + v)

def merge(other: AccumulatorV2[String, Set[String]]): Unit =
ref.getAndUpdate(_ ++ other.value)

def value: Set[String] = ref.get
}

object StringSetAccumulator {
def apply(initialValue: Set[String] = Set.empty): StringSetAccumulator =
new StringSetAccumulator(initialValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ case class MigratorConfig(source: SourceSettings,
savepoints: Savepoints,
skipTokenRanges: Option[Set[(Token[_], Token[_])]],
skipSegments: Option[Set[Int]],
skipParquetFiles: Option[Set[String]],
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new skipParquetFiles configuration field. This field should have Scaladoc comments explaining its purpose, format (Set of file URIs), and how it's used in the Parquet migration process. Without documentation, users won't understand what format the file paths should be in or when this field is populated.

Copilot uses AI. Check for mistakes.
validation: Option[Validation]) {
def render: String = this.asJson.asYaml.spaces2

Expand All @@ -25,6 +26,8 @@ case class MigratorConfig(source: SourceSettings,

def getSkipTokenRangesOrEmptySet: Set[(Token[_], Token[_])] = skipTokenRanges.getOrElse(Set.empty)

def getSkipParquetFilesOrEmptySet: Set[String] = skipParquetFiles.getOrElse(Set.empty)

}
object MigratorConfig {
implicit val tokenEncoder: Encoder[Token[_]] = Encoder.instance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.circe.{ Decoder, Encoder }
import io.circe.generic.semiauto.{ deriveDecoder, deriveEncoder }

Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new enableParquetFileTracking parameter. The Savepoints case class should include Scaladoc comments explaining what this parameter does, when it should be set to true vs false, and its default value. This is especially important since it controls a significant behavior change in how Parquet migrations work.

Suggested change
/**
* Configuration for periodic savepoints written during a migration run.
*
* @param intervalSeconds
* How often, in seconds, a savepoint should be written.
* @param path
* Filesystem path (directory or prefix) where savepoint data will be stored.
* @param enableParquetFileTracking
* When `true` (the default), enables tracking of already-processed Parquet files
* as part of the savepoint state. This prevents the same Parquet file from
* being migrated more than once if the job is restarted or savepoints are
* resumed.
*
* Set this to `false` to keep the legacy behavior where Parquet files are not
* tracked in savepoints. Disabling tracking may be useful for backwards
* compatibility with older savepoints or when file tracking is handled by an
* external mechanism, but it means repeated runs may reprocess the same
* Parquet files.
*/

Copilot uses AI. Check for mistakes.
case class Savepoints(intervalSeconds: Int, path: String)

object Savepoints {
implicit val encoder: Encoder[Savepoints] = deriveEncoder[Savepoints]
implicit val decoder: Decoder[Savepoints] = deriveDecoder[Savepoints]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.scylladb.migrator.readers

import org.apache.log4j.LogManager
import org.apache.spark.scheduler.{ SparkListener, SparkListenerTaskEnd }
import org.apache.spark.Success

import scala.collection.concurrent.TrieMap

/**
* SparkListener that tracks partition completion and aggregates it to file-level completion.
*
* This listener monitors Spark task completion events and maintains mappings between
* partitions and files. When all partitions belonging to a file have been successfully
* completed, it marks the file as processed via the ParquetSavepointsManager.
*
* @param partitionToFile Mapping from Spark partition ID to source file paths
* @param fileToPartitions Mapping from file path to the set of partition IDs reading from it
* @param savepointsManager Manager to notify when files are completed
*/
class FileCompletionListener(
partitionToFiles: Map[Int, Set[String]],
fileToPartitions: Map[String, Set[Int]],
savepointsManager: ParquetSavepointsManager
) extends SparkListener {

private val log = LogManager.getLogger("com.scylladb.migrator.readers.FileCompletionListener")

private val completedPartitions = TrieMap.empty[Int, Boolean]

private val completedFiles = TrieMap.empty[String, Boolean]

log.info(
s"FileCompletionListener initialized: tracking ${fileToPartitions.size} files " +
s"across ${partitionToFiles.size} partitions")

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
if (taskEnd.reason == Success) {
val partitionId = taskEnd.taskInfo.partitionId

partitionToFiles.get(partitionId) match {
case Some(filenames) =>
if (completedPartitions.putIfAbsent(partitionId, true).isEmpty) {
filenames.foreach { filename =>
log.debug(s"Partition $partitionId completed (file: $filename)")
checkFileCompletion(filename)
}
}

case None =>
log.trace(s"Task completed for untracked partition $partitionId")
}
} else {
log.debug(
s"Task for partition ${taskEnd.taskInfo.partitionId} did not complete successfully: ${taskEnd.reason}")
}

private def checkFileCompletion(filename: String): Unit = {
if (completedFiles.contains(filename)) {
return
}

fileToPartitions.get(filename) match {
case Some(allPartitions) =>
val allComplete = allPartitions.forall(completedPartitions.contains)

if (allComplete) {
if (completedFiles.putIfAbsent(filename, true).isEmpty) {
savepointsManager.markFileAsProcessed(filename)

val progress = s"${completedFiles.size}/${fileToPartitions.size}"
log.info(s"File completed: $filename (progress: $progress)")
}
Comment on lines +57 to +72
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in concurrent file completion checking. Between the check at line 58 (if (completedFiles.contains(filename))) and the putIfAbsent at line 67, multiple threads could pass the first check and all attempt to mark the file as complete. While putIfAbsent prevents duplicate entries in the map, this could result in savepointsManager.markFileAsProcessed(filename) being called multiple times for the same file. Consider moving the completion check inside a synchronized block or using completedFiles.putIfAbsent earlier to gate the entire completion logic.

Copilot uses AI. Check for mistakes.
} else {
val completedCount = allPartitions.count(completedPartitions.contains)
log.trace(s"File $filename: $completedCount/${allPartitions.size} partitions complete")
}

case None =>
log.warn(s"File $filename not found in fileToPartitions map (this shouldn't happen)")
}
}

def getCompletedFilesCount: Int = completedFiles.size

def getTotalFilesCount: Int = fileToPartitions.size

def getProgressReport: String = {
val filesCompleted = getCompletedFilesCount
val totalFiles = getTotalFilesCount

s"Progress: $filesCompleted/$totalFiles files"
}
}
113 changes: 104 additions & 9 deletions migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,121 @@
package com.scylladb.migrator.readers

import com.scylladb.migrator.config.SourceSettings
import com.scylladb.migrator.scylla.SourceDataFrame
import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings }
import com.scylladb.migrator.scylla.{ ScyllaParquetMigrator, SourceDataFrame }
import org.apache.log4j.LogManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{ AnalysisException, SparkSession }
import scala.util.Using

object Parquet {
val log = LogManager.getLogger("com.scylladb.migrator.readers.Parquet")

def readDataFrame(spark: SparkSession, source: SourceSettings.Parquet): SourceDataFrame = {
def migrateToScylla(config: MigratorConfig,
source: SourceSettings.Parquet,
target: TargetSettings.Scylla)(implicit spark: SparkSession): Unit = {
log.info("Starting Parquet migration with parallel processing and file-level savepoints")

configureHadoopCredentials(spark, source)

val allFiles = listParquetFiles(spark, source.path)
val skipFiles = config.getSkipParquetFilesOrEmptySet
val filesToProcess = allFiles.filterNot(skipFiles.contains)

if (filesToProcess.isEmpty) {
log.info("No Parquet files to process. Migration is complete.")
return
}

log.info(s"Processing ${filesToProcess.size} Parquet files")

val df = if (skipFiles.isEmpty) {
spark.read.parquet(source.path)
} else {
spark.read.parquet(filesToProcess: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle files before passing them over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is spark shuffling this by itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark handles partition sizing on its own using the spark.sql.files.maxPartitionBytes, so

  • small files will be combined into single partition
  • big files will be split between several partitions

However, I can add scala.util.Random.shuffle to be more safe, overhead is negligible

}
Comment on lines +52 to +56
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential inconsistency in DataFrame creation. When skipFiles.isEmpty is true (line 52), the code reads from source.path which may include newly added files that weren't in the original allFiles list. However, when skipFiles is not empty, it reads from the specific filesToProcess list. This could lead to different behavior: in the first case, files added after the listParquetFiles call would be processed, while in the second case they would not. Consider always using the explicit file list for consistency.

Suggested change
val df = if (skipFiles.isEmpty) {
spark.read.parquet(source.path)
} else {
spark.read.parquet(filesToProcess: _*)
}
val df = spark.read.parquet(filesToProcess: _*)

Copilot uses AI. Check for mistakes.

log.info("Reading partition metadata for file tracking...")
val metadata = PartitionMetadataReader.readMetadataFromDataFrame(df)

val partitionToFiles = PartitionMetadataReader.buildPartitionToFileMap(metadata)
val fileToPartitions = PartitionMetadataReader.buildFileToPartitionsMap(metadata)

log.info(
s"Discovered ${fileToPartitions.size} files with ${metadata.size} total partitions to process")

Using.resource(ParquetSavepointsManager(config, spark.sparkContext)) { savepointsManager =>
val listener = new FileCompletionListener(
partitionToFiles,
fileToPartitions,
savepointsManager
)
spark.sparkContext.addSparkListener(listener)

try {
val sourceDF = SourceDataFrame(df, None, savepointsSupported = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why false when you support savepoints now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a better look,
since basically savepoint manager needs to dump his state to savepoint file periodically (and in case of failure of course)
and I am wondering if this periodic write to savepoint file will happen if this is false ?!?!?

Copy link
Contributor

@tarzanek tarzanek Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or the manager is self contained and will dump savepoints automatically once it's instantiated?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why false when you support savepoints now?

Basically, this false flag will not create savepoint manager for basic ScyllaMigrator, and will not be used at all in ParquetMigrator at all
it's necessary to not create additional SavepointManager since we have external one created in Parquet object

Actually, I see current architecture / naming is confusing.

(or the manager is self contained and will dump savepoints automatically once it's instantiated?)

Yeah, it is 😌


log.info("Created DataFrame from Parquet source")

ScyllaParquetMigrator.migrate(config, target, sourceDF, savepointsManager)

savepointsManager.dumpMigrationState("completed")

log.info(
s"Parquet migration completed successfully: " +
s"${listener.getCompletedFilesCount}/${listener.getTotalFilesCount} files processed")

} finally {
spark.sparkContext.removeSparkListener(listener)
log.info(s"Final progress: ${listener.getProgressReport}")
}
}
Comment on lines +67 to +92
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource management concern: The ParquetSavepointsManager is closed by the Using.resource block, but it's passed to ScyllaParquetMigrator.migrate which internally marks it with shouldCloseManager = false to prevent double-closing. However, this creates a brittle design where the lifecycle management is split between two places. If an exception occurs during migration (line 80), the savepointsManager.dumpMigrationState("completed") at line 82 will never be called, and only the "final" dump in ScyllaMigrator will execute. Consider consolidating the resource management logic or adding explicit error handling to ensure proper state dumping in all scenarios.

Copilot uses AI. Check for mistakes.
}

def listParquetFiles(spark: SparkSession, path: String): Seq[String] = {
log.info(s"Discovering Parquet files in $path")

try {
val dataFrame = spark.read
.option("recursiveFileLookup", "true")
.parquet(path)

val files = dataFrame.inputFiles.toSeq.distinct.sorted

if (files.isEmpty) {
throw new IllegalArgumentException(s"No Parquet files found in $path")
}

log.info(s"Found ${files.size} Parquet file(s)")
files
} catch {
case e: AnalysisException =>
val message = s"Failed to list Parquet files from $path: ${e.getMessage}"
log.error(message)
throw new IllegalArgumentException(message, e)
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below has nothing to do with parquet, but OK, I will just close one eye, but PR clean wise below just confuses people, so non related changes should be in separate PRs/commits

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you
Looks like the leftover after the approach with both parallel / sequential migrations

* Configures Hadoop S3A credentials for reading from AWS S3.
*
* This method sets the necessary Hadoop configuration properties for AWS access key, secret key,
* and optionally a session token. When a session token is present, it sets the credentials provider
* to TemporaryAWSCredentialsProvider as required by Hadoop.
*
* If a region is specified in the source configuration, this method also sets the S3A endpoint region
* via the `fs.s3a.endpoint.region` property.
*
* For more details, see the official Hadoop AWS documentation:
* https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authentication
*/
private[readers] def configureHadoopCredentials(spark: SparkSession,
source: SourceSettings.Parquet): Unit =
source.finalCredentials.foreach { credentials =>
log.info("Loaded AWS credentials from config file")
source.region.foreach { region =>
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint.region", region)
}
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", credentials.accessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", credentials.secretKey)
// See https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Using_Session_Credentials_with_TemporaryAWSCredentialsProvider
credentials.maybeSessionToken.foreach { sessionToken =>
spark.sparkContext.hadoopConfiguration.set(
"fs.s3a.aws.credentials.provider",
Expand All @@ -27,8 +126,4 @@ object Parquet {
)
}
}

SourceDataFrame(spark.read.parquet(source.path), None, false)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.scylladb.migrator.readers

import com.scylladb.migrator.SavepointsManager
import com.scylladb.migrator.config.MigratorConfig
import com.scylladb.migrator.alternator.StringSetAccumulator
import org.apache.spark.SparkContext

class ParquetSavepointsManager(migratorConfig: MigratorConfig,
filesAccumulator: StringSetAccumulator)
extends SavepointsManager(migratorConfig) {

def describeMigrationState(): String = {
val processedCount = filesAccumulator.value.size
s"Processed files: $processedCount"
}

def updateConfigWithMigrationState(): MigratorConfig =
migratorConfig.copy(skipParquetFiles = Some(filesAccumulator.value))

def markFileAsProcessed(filePath: String): Unit = {
filesAccumulator.add(filePath)
log.debug(s"Marked file as processed: $filePath")
}
}

object ParquetSavepointsManager {

def apply(migratorConfig: MigratorConfig, spark: SparkContext): ParquetSavepointsManager = {
val filesAccumulator =
StringSetAccumulator(migratorConfig.skipParquetFiles.getOrElse(Set.empty))

spark.register(filesAccumulator, "processed-parquet-files")

new ParquetSavepointsManager(migratorConfig, filesAccumulator)
}
}
Loading
Loading