Skip to content

Conversation

@pizzaeueu
Copy link
Collaborator

No description provided.

Artem added 16 commits November 21, 2025 08:51
Add skipParquetFiles field to track already-processed Parquet files
during sequential migrations with savepoints. This allows the migrator
to resume from where it left off by skipping files that have already
been successfully migrated.

Also add getSkipParquetFilesOrEmptySet() helper method for convenient
access with a sensible default.
Add StringSetAccumulator, a Spark accumulator for efficiently tracking
a set of strings across distributed operations. This will be used to
track which Parquet files have been processed during migration.

The accumulator provides:
- Thread-safe add() operation for marking items as processed
- Set-based storage to avoid duplicates
- Proper Spark accumulator semantics for distributed execution

Include comprehensive unit tests covering initialization, adding
elements, merging accumulators, and handling duplicates.
Introduce ParquetProcessingStrategy trait to enable different migration
strategies for Parquet sources. This applies the Strategy pattern to
allow switching between parallel (default) and sequential (with
savepoints) processing modes.

The interface defines a single migrate() method that implementations
will use to execute their specific migration logic.
Implement the parallel processing strategy that reads all Parquet files
at once using Spark's native parquet reader. This is the traditional
approach that maximizes parallelism and performance.

Key characteristics:
- Reads all files in one operation
- No savepoint support (savepointsSupported = false)
- Best performance for migrations expected to complete uninterrupted
- Default behavior matching existing migrations

This strategy maintains backward compatibility with the original
Parquet migration behavior.
Extract ScyllaMigratorBase trait to enable different savepoint
strategies. This refactoring introduces extension points that allow
external savepoints managers to be injected, enabling file-by-file
Parquet migration with savepoints.

Key changes:
- Extract ScyllaMigratorBase trait with template method pattern
- Add externalSavepointsManager hook for injecting custom managers
- Add createSavepointsManager() for strategy-specific manager creation
- Add shouldCloseManager() to control manager lifecycle
- Refactor migrate() to use external or created manager
- Update exception handling to support different accumulator types

The existing ScyllaMigrator object maintains full backward
compatibility by implementing the trait with original CQL savepoints
behavior.
Introduce ScyllaParquetMigrator class to handle Parquet migrations
with external savepoints management. This enables file-by-file
processing where each file's completion is tracked.

Key design decisions:
- Accepts external ParquetSavepointsManager in constructor
- Does not create its own savepoints manager (returns None)
- Does not close the external manager (returns false from
  shouldCloseManager), allowing the caller to manage lifecycle

This separation of concerns allows SequentialParquetStrategy to
orchestrate multiple migrations while maintaining a single savepoints
manager across all file operations.
Implement savepoints manager for tracking processed Parquet files.
Uses StringSetAccumulator to maintain state across Spark operations
and persists the set of processed files to configuration.

Key features:
- Tracks processed files via StringSetAccumulator
- Implements SavepointsManager interface for consistency
- Updates MigratorConfig with skipParquetFiles on state dump
- Factory method initializes accumulator from existing skipParquetFiles

This enables resumable Parquet migrations by remembering which files
have been successfully processed.
Implement sequential Parquet processing strategy that reads and
migrates files one at a time with savepoint support. Each file's
successful migration is recorded before moving to the next.

Key features:
- Processes files sequentially using prepareParquetReader
- Skips already-processed files from skipParquetFiles config
- Uses ParquetSavepointsManager to track progress
- Marks each file as processed after successful migration
- Creates final savepoint when all files complete
- Uses Using.resource for proper manager cleanup

Trade-offs:
- Lower parallelism than parallel mode (processes one file at a time)
- Enables resumability for long-running migrations
- Ideal for large datasets or unreliable environments
Implement utility methods for Parquet file discovery and AWS credential
configuration to support both parallel and sequential processing modes.

New functionality:
- listParquetFiles(): Discovers all Parquet files at given path using
  Spark's recursive file lookup
- prepareParquetReader(): Creates ParquetReaderWithSavepoints that
  filters out already-processed files
- configureHadoopCredentials(): Sets up S3A credentials including
  support for session tokens and regional endpoints

The configureHadoopCredentials method now includes comprehensive
documentation explaining Hadoop's TemporaryAWSCredentialsProvider
requirement and links to official documentation.

These utilities enable the sequential strategy to enumerate files
and track progress while maintaining credential compatibility.
Complete the Strategy pattern implementation by adding strategy
selection logic based on parquetProcessingMode configuration.

The migrateToScylla method now:
- Reads parquetProcessingMode from config.savepoints
- Instantiates ParallelParquetStrategy (default) or
  SequentialParquetStrategy based on mode
- Delegates migration to the selected strategy
- Logs the selected mode for observability

This provides a clean separation between the two migration approaches
while maintaining backward compatibility (parallel is default).
Add comprehensive unit tests for the Parquet savepoints feature:

ParquetSavepointsTest:
- Tests ParquetSavepointsManager state tracking
- Verifies file marking and accumulator behavior
- Tests config update with processed files
- Ensures proper state serialization

ParquetConfigSerializationTest:
- Tests Circe encoding/decoding of ParquetProcessingMode
- Verifies parallel and sequential mode serialization
- Tests Savepoints config with parquetProcessingMode field
- Validates backward compatibility with missing field
- Tests error handling for invalid mode values

These tests ensure configuration serialization and state management
work correctly across migration restarts.
Add integration test suite for end-to-end Parquet migration validation:

ParquetMigratorSuite:
- Base test utilities for Parquet migration testing
- Shared fixtures and helper methods
- Test data generation and validation

ParquetParallelModeTest:
- Integration test for parallel processing mode
- Verifies default behavior and performance characteristics

ParquetSavepointsIntegrationTest:
- Integration test for sequential mode with savepoints
- Tests migration interruption and resumption
- Verifies skipParquetFiles configuration handling
- Validates that already-processed files are skipped
- Ensures data consistency after resume

These tests validate the complete migration flow including Spark
DataFrame operations, ScyllaDB writes, and savepoint persistence.
Add YAML configuration files for testing both Parquet processing modes:

parquet-to-scylla-parallel.yaml:
- Configuration for parallel processing mode (default)
- No parquetProcessingMode specified (uses default)
- Validates backward compatibility

parquet-to-scylla-savepoints.yaml:
- Configuration for sequential processing with savepoints
- Sets parquetProcessingMode: sequential
- Example configuration for resumable migrations

These configurations serve as both test fixtures and documentation
examples for users implementing Parquet migrations.
Simplify ParquetToScyllaBasicMigrationTest to work with the new
Strategy pattern architecture. The test now uses the standard
Parquet migration path which automatically selects the appropriate
strategy based on configuration.

Changes:
- Use Parquet.migrateToScylla instead of direct DataFrame creation
- Remove manual DataFrame construction
- Rely on parallel strategy (default) for basic migration test
- Maintain same test coverage with cleaner implementation

This refactoring validates that the new architecture maintains
backward compatibility with existing migration workflows.
Introduce ParquetProcessingMode enum with two variants:
- Parallel: Process all Parquet files at once (default, no savepoints)
- Sequential: Process files one by one with savepoint support

Add parquetProcessingMode field to Savepoints configuration with
appropriate Circe encoders/decoders. The mode defaults to Parallel
to maintain backward compatibility with existing configurations.

This lays the foundation for implementing savepoint support for
Parquet migrations, allowing users to resume interrupted migrations
without reprocessing already-migrated files.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds comprehensive savepoints support for Parquet-to-Scylla migrations, enabling resumable data migrations by tracking processed Parquet files at a granular level. The implementation handles both single-partition and multi-partition files, supports parallel processing, and provides detailed progress tracking through Spark listeners.

Key changes:

  • Implemented file-level savepoints tracking with partition awareness to support resumable Parquet migrations
  • Refactored ScyllaMigrator into a trait-based architecture for extensibility with specialized Parquet migration logic
  • Added FileCompletionListener to track Spark task completion and aggregate partition-level events into file-level completion

Reviewed Changes

Copilot reviewed 29 out of 29 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
ParquetToScyllaBasicMigrationTest.scala Refactored to use new ParquetMigratorSuite base class with fixture-based resource management
ParquetSavepointsTest.scala Unit tests for Parquet file listing, filtering, and savepoints manager state tracking
ParquetSavepointsIntegrationTest.scala Integration test verifying savepoints include all processed files after migration
ParquetResumeIntegrationTest.scala Tests for migration resumption after interruption and idempotency with pre-processed files
ParquetParallelModeTest.scala Tests parallel Parquet file processing with savepoints in both simple and concurrent scenarios
ParquetMultiPartitionTest.scala Tests handling of large Parquet files split across multiple Spark partitions
ParquetMigratorSuite.scala New base test suite providing fixtures for tables, Parquet directories, and savepoints
PartitionMetadataReaderTest.scala Tests for reading Spark partition metadata and building file-to-partition mappings
FileCompletionListenerTest.scala Tests for Spark listener tracking partition completion and file-level aggregation
ParquetConfigSerializationTest.scala Tests YAML serialization/deserialization of skipParquetFiles configuration
StringSetAccumulatorTest.scala Tests thread-safe string set accumulator for tracking processed files
parquet-to-scylla-*.yaml Configuration files for various test scenarios including savepoints, resume, and parallel modes
.gitignore Updated to track spark-master directory for savepoints storage
ScyllaMigrator.scala Refactored into trait-based architecture with specialized ScyllaParquetMigrator implementation
PartitionMetadataReader.scala Reads Spark partition metadata to build file-partition mappings for tracking
ParquetSavepointsManager.scala Manages Parquet savepoints state using string set accumulator for processed files
Parquet.scala Refactored to implement parallel file processing with savepoints and file completion tracking
FileCompletionListener.scala Spark listener aggregating partition completions into file-level tracking
MigratorConfig.scala Added skipParquetFiles field and helper method for configuration
StringSetAccumulator.scala Thread-safe accumulator for collecting processed file paths across Spark tasks
Migrator.scala Updated Parquet-to-Scylla migration path to use new migrateToScylla method

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@pizzaeueu pizzaeueu force-pushed the parquet-savepoint-rebased branch from 7df7cdc to 076ccad Compare November 21, 2025 09:05
spark.sparkContext.addSparkListener(listener)

try {
val sourceDF = SourceDataFrame(df, None, savepointsSupported = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why false when you support savepoints now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a better look,
since basically savepoint manager needs to dump his state to savepoint file periodically (and in case of failure of course)
and I am wondering if this periodic write to savepoint file will happen if this is false ?!?!?

Copy link
Contributor

@tarzanek tarzanek Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or the manager is self contained and will dump savepoints automatically once it's instantiated?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why false when you support savepoints now?

Basically, this false flag will not create savepoint manager for basic ScyllaMigrator, and will not be used at all in ParquetMigrator at all
it's necessary to not create additional SavepointManager since we have external one created in Parquet object

Actually, I see current architecture / naming is confusing.

(or the manager is self contained and will dump savepoints automatically once it's instantiated?)

Yeah, it is 😌

}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below has nothing to do with parquet, but OK, I will just close one eye, but PR clean wise below just confuses people, so non related changes should be in separate PRs/commits

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you
Looks like the leftover after the approach with both parallel / sequential migrations

sourceDF: SourceDataFrame
)(implicit spark: SparkSession): Option[SavepointsManager]

protected def shouldCloseManager(manager: SavepointsManager): Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

I mean manager will be closed anyways and should be closed anyways
that's why it periodically dumps its state and of course it should close cleanly when any failure happens

savePointsManger.dumpMigrationState("final")
savePointsManger.close()
if (shouldCloseManager(savePointsManger)) {
savePointsManger.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen with parquet manager ?
who will close it then? is this a leak? :-D ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably the problem here is over-unification of different approaches for savepoints:

The idea is:

  • Parquet object is responsible to open SavepointsManager => same object should be responsible to close it ( and it's done safely with Using statement )

However, it makes sense to reconsider an approach to avoid unnecessary mess 🤔

Copy link
Contributor

@tarzanek tarzanek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check my comments

@pizzaeueu pizzaeueu requested a review from tarzanek November 25, 2025 06:05
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 29 out of 29 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@tarzanek
Copy link
Contributor

I have no other comment from my end, so let's merge

Copy link
Contributor

@tarzanek tarzanek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks ok, we still have a real life test in flight for this, so let's merge if no problems will be seen

val df = if (skipFiles.isEmpty) {
spark.read.parquet(source.path)
} else {
spark.read.parquet(filesToProcess: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle files before passing them over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is spark shuffling this by itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark handles partition sizing on its own using the spark.sql.files.maxPartitionBytes, so

  • small files will be combined into single partition
  • big files will be split between several partitions

However, I can add scala.util.Random.shuffle to be more safe, overhead is negligible

.rdd
.mapPartitionsWithIndex { (partitionId, iter) =>
val files = iter.map(row => row.getString(0)).toSet
files.map(filename => (partitionId, filename)).iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this behave when you have lots of 100-200MB files and few files that are around 5G (how is splitting)

@tarzanek
Copy link
Contributor

before merging we are checking the logic of partition to file mapping - per tests with a case with lots of small (~200MB) and few big (~5G) files (with large partitions!) we saw delays and problems - if the mapping isn't correct, job might not finish

@tarzanek
Copy link
Contributor

tarzanek commented Jan 2, 2026

alternator test might be impacted due to 2025.4 released https://forum.scylladb.com/t/release-scylladb-2025-4-0/5222

@tarzanek
Copy link
Contributor

tarzanek commented Jan 2, 2026

Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled. (Service: DynamoDb, Status Code: 400, Request ID: null)

or better do tablets_mode_for_new_keyspaces set to disabled on the cmd line for scylla start in docker
(until scylladb/spark-scylladb-connector#19 is merged we don't support tablets properly anyways)

@tarzanek
Copy link
Contributor

tarzanek commented Jan 2, 2026

Note: https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
might need to be disabled for supporting parquet resume, is spark.sql.adaptive.enabled setting to false needed?

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 40 out of 41 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (1)

migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala:95

  • The error handling in the catch block swallows the exception after logging it. This means that even if the migration fails with an error, the method will not propagate the failure to the caller. The application will appear to complete successfully despite data not being fully migrated. The exception should be re-thrown after dumping the savepoint state to ensure proper error propagation and to prevent silent failures.
    } catch {
      case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state
        log.error(
          "Caught error while writing the DataFrame. Will create a savepoint before exiting",
          e)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +57 to +72
private def checkFileCompletion(filename: String): Unit = {
if (completedFiles.contains(filename)) {
return
}

fileToPartitions.get(filename) match {
case Some(allPartitions) =>
val allComplete = allPartitions.forall(completedPartitions.contains)

if (allComplete) {
if (completedFiles.putIfAbsent(filename, true).isEmpty) {
savepointsManager.markFileAsProcessed(filename)

val progress = s"${completedFiles.size}/${fileToPartitions.size}"
log.info(s"File completed: $filename (progress: $progress)")
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in concurrent file completion checking. Between the check at line 58 (if (completedFiles.contains(filename))) and the putIfAbsent at line 67, multiple threads could pass the first check and all attempt to mark the file as complete. While putIfAbsent prevents duplicate entries in the map, this could result in savepointsManager.markFileAsProcessed(filename) being called multiple times for the same file. Consider moving the completion check inside a synchronized block or using completedFiles.putIfAbsent earlier to gate the entire completion logic.

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +92
Using.resource(ParquetSavepointsManager(config, spark.sparkContext)) { savepointsManager =>
val listener = new FileCompletionListener(
partitionToFiles,
fileToPartitions,
savepointsManager
)
spark.sparkContext.addSparkListener(listener)

try {
val sourceDF = SourceDataFrame(df, None, savepointsSupported = false)

log.info("Created DataFrame from Parquet source")

ScyllaParquetMigrator.migrate(config, target, sourceDF, savepointsManager)

savepointsManager.dumpMigrationState("completed")

log.info(
s"Parquet migration completed successfully: " +
s"${listener.getCompletedFilesCount}/${listener.getTotalFilesCount} files processed")

} finally {
spark.sparkContext.removeSparkListener(listener)
log.info(s"Final progress: ${listener.getProgressReport}")
}
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource management concern: The ParquetSavepointsManager is closed by the Using.resource block, but it's passed to ScyllaParquetMigrator.migrate which internally marks it with shouldCloseManager = false to prevent double-closing. However, this creates a brittle design where the lifecycle management is split between two places. If an exception occurs during migration (line 80), the savepointsManager.dumpMigrationState("completed") at line 82 will never be called, and only the "final" dump in ScyllaMigrator will execute. Consider consolidating the resource management logic or adding explicit error handling to ensure proper state dumping in all scenarios.

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +56
val df = if (skipFiles.isEmpty) {
spark.read.parquet(source.path)
} else {
spark.read.parquet(filesToProcess: _*)
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential inconsistency in DataFrame creation. When skipFiles.isEmpty is true (line 52), the code reads from source.path which may include newly added files that weren't in the original allFiles list. However, when skipFiles is not empty, it reads from the specific filesToProcess list. This could lead to different behavior: in the first case, files added after the listParquetFiles call would be processed, while in the second case they would not. Consider always using the explicit file list for consistency.

Suggested change
val df = if (skipFiles.isEmpty) {
spark.read.parquet(source.path)
} else {
spark.read.parquet(filesToProcess: _*)
}
val df = spark.read.parquet(filesToProcess: _*)

Copilot uses AI. Check for mistakes.
import io.circe.Codec
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto._

Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new enableParquetFileTracking parameter. The Savepoints case class should include Scaladoc comments explaining what this parameter does, when it should be set to true vs false, and its default value. This is especially important since it controls a significant behavior change in how Parquet migrations work.

Suggested change
/**
* Configuration for periodic savepoints written during a migration run.
*
* @param intervalSeconds
* How often, in seconds, a savepoint should be written.
* @param path
* Filesystem path (directory or prefix) where savepoint data will be stored.
* @param enableParquetFileTracking
* When `true` (the default), enables tracking of already-processed Parquet files
* as part of the savepoint state. This prevents the same Parquet file from
* being migrated more than once if the job is restarted or savepoints are
* resumed.
*
* Set this to `false` to keep the legacy behavior where Parquet files are not
* tracked in savepoints. Disabling tracking may be useful for backwards
* compatibility with older savepoints or when file tracking is handled by an
* external mechanism, but it means repeated runs may reprocess the same
* Parquet files.
*/

Copilot uses AI. Check for mistakes.
savepoints: Savepoints,
skipTokenRanges: Option[Set[(Token[_], Token[_])]],
skipSegments: Option[Set[Int]],
skipParquetFiles: Option[Set[String]],
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new skipParquetFiles configuration field. This field should have Scaladoc comments explaining its purpose, format (Set of file URIs), and how it's used in the Parquet migration process. Without documentation, users won't understand what format the file paths should be in or when this field is populated.

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +37
val message = "Several FileSourceScanExec were found in plan"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)
case list if list.isEmpty =>
val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation detected. Lines 31-32 and 35-36 use tabs instead of spaces. The codebase should use consistent spacing (2 spaces based on other files in the project). Please replace tabs with spaces to maintain consistency.

Suggested change
val message = "Several FileSourceScanExec were found in plan"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)
case list if list.isEmpty =>
val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)
val message = "Several FileSourceScanExec were found in plan"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)
case list if list.isEmpty =>
val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)"
logger.error(s"$message. Plan: ${plan.treeString}")
throw new IllegalArgumentException(message)

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +38
package com.scylladb.migrator.alternator

import org.apache.spark.util.AccumulatorV2
import java.util.concurrent.atomic.AtomicReference

/**
* Accumulator for tracking processed Parquet file paths during migration.
*
* This accumulator collects the set of Parquet file paths that have been processed
* as part of a migration job. It is useful for monitoring progress, avoiding duplicate
* processing, and debugging migration workflows. The accumulator is thread-safe and
* can be used in distributed Spark jobs.
*
* @param initialValue The initial set of processed file paths (usually empty).
*/
class StringSetAccumulator(initialValue: Set[String] = Set.empty)
extends AccumulatorV2[String, Set[String]] {

private val ref = new AtomicReference(initialValue)

// Note: isZero may be momentarily inconsistent in concurrent scenarios,
// as it reads the current value of the set without synchronization.
// This is eventually consistent and thread-safe, but may not reflect the most recent updates.
def isZero: Boolean = ref.get.isEmpty
def copy(): StringSetAccumulator = new StringSetAccumulator(ref.get)
def reset(): Unit = ref.set(Set.empty)
def add(v: String): Unit = ref.getAndUpdate(_ + v)

def merge(other: AccumulatorV2[String, Set[String]]): Unit =
ref.getAndUpdate(_ ++ other.value)

def value: Set[String] = ref.get
}

object StringSetAccumulator {
def apply(initialValue: Set[String] = Set.empty): StringSetAccumulator =
new StringSetAccumulator(initialValue)
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StringSetAccumulator class is placed in the alternator package, but it's a general-purpose utility used for Parquet migration tracking. This creates an incorrect package coupling. Consider moving this class to a more appropriate location such as com.scylladb.migrator.util or com.scylladb.migrator.accumulator to better reflect its purpose and avoid confusion.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants