Skip to content

Demo Branch: Test demo branch For File Copy #3078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class FileCopyBasedReplicationConfig {
public FileCopyBasedReplicationConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(FILE_COPY_META_DATA_FILE_NAME, "segments_metadata_file");
fileCopyParallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(FILE_COPY_PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
fileCopyNumberOfFileCopyThreads = verifiableProperties.getInt(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS, 4);
fileCopyNumberOfFileCopyThreads = verifiableProperties.getInt(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS, 1);
fileCopyDataFlushIntervalInMbs = verifiableProperties.getLong(FILE_COPY_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
fileCopyReplicaTimeoutSecs = verifiableProperties.getLong(FILE_COPY_REPLICA_TIMEOUT_SECS, 36000);
fileCopySchedulerWaitTimeSecs = verifiableProperties.getLong(FILE_COPY_SCHEDULER_WAIT_TIME_SECS, 30);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, long siz
* @throws IOException if an I/O error occurs during the move operation
*/
void moveAllRegularFiles(String srcDirPath, String destDirPath) throws IOException;

void cleanUpDirectory(String srcPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,14 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
try {
// 1. take actions in storage manager (add new replica if necessary)
onPartitionBecomePreBootstrapFromOffline(partitionName);

PartitionStateChangeListener fileCopyStateChangeListener =
partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener);

if (fileCopyStateChangeListener != null) {
fileCopyStateChangeListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}

onPartitionBecomeBootstrapFromPreBootstrap(partitionName);
// 2. take actions in replication manager (add new replica if necessary)
PartitionStateChangeListener replicationManagerListener =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public List<DiskId> getDiskIdsToHydrate() {
public void submitReplicaForHydration(ReplicaId replicaId, FileCopyStatusListener fileCopyStatusListener,
FileCopyHandler fileCopyHandler) {
try {
logger.info("FCH TEST: Submit");
threadQueueLock.lock();
DiskId diskId = replicaId.getDiskId();
FileCopyThread fileCopyThread = new FileCopyThread(fileCopyHandler, fileCopyStatusListener);
Expand All @@ -134,18 +135,18 @@ public void submitReplicaForHydration(ReplicaId replicaId, FileCopyStatusListene
@Override
public void stopAndRemoveReplicaFromThreadPool(ReplicaId replicaId) throws InterruptedException {
threadQueueLock.lock();
logger.info("Stopping and removing replica {} from thread pool", replicaId);
logger.info("FCH TEST: Stopping and removing replica {} from thread pool", replicaId);
FileCopyThread fileCopyThread = replicaToFileCopyThread.get(replicaId);

if (fileCopyThread == null || !fileCopyThread.isAlive()) {
logger.info("No thread found for replica {}. Nothing to stop.", replicaId);
logger.info("FCH TEST: No thread found for replica {}. Nothing to stop.", replicaId);
threadQueueLock.unlock();
return;
}
long threadShutDownInitiationTime = System.currentTimeMillis();
logger.info("Stopping thread for replica {}", replicaId);
logger.info("FCH TEST: Stopping thread for replica {}", replicaId);
fileCopyThread.shutDown();
logger.info("Thread for replica {} stopped in {} ms", replicaId,
logger.info("FCH TEST: Thread for replica {} stopped in {} ms", replicaId,
System.currentTimeMillis() - threadShutDownInitiationTime);
threadQueueLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.PartitionStateChangeListener;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaSyncUpManager;
Expand All @@ -31,7 +32,13 @@
import com.github.ambry.replica.prioritization.PrioritizationManagerFactory;
import com.github.ambry.server.StoreManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +88,7 @@ public FileCopyBasedReplicationManager(FileCopyBasedReplicationConfig fileCopyBa
if (clusterParticipant != null) {
clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener,
new PartitionStateChangeListenerImpl());
logger.info("File Copy Manager's state change listener registered!");
logger.info("FCH TEST: File Copy Manager's state change listener registered!");
} else {
throw new InstantiationException("File Copy Manager cannot be instantiated without a ClusterParticipant");
}
Expand All @@ -90,6 +97,7 @@ public FileCopyBasedReplicationManager(FileCopyBasedReplicationConfig fileCopyBa
this.prioritizationManager = prioritizationManager;
this.fileCopyBasedReplicationScheduler = fileCopyBasedReplicationSchedulerFactory.getFileCopyBasedReplicationScheduler();
this.fileCopyBasedReplicationSchedulerThread = new Thread(fileCopyBasedReplicationScheduler);

if(!prioritizationManager.isRunning()) {
throw new InstantiationException("File Copy cannot run when Prioritization Manager is not running");
}
Expand All @@ -103,23 +111,57 @@ public void start() throws InterruptedException, IOException {
logger.info("Starting FileCopyBasedReplicationManager");
fileCopyBasedReplicationSchedulerThread.start();
isRunning = true;
logger.info("FileCopyBasedReplicationManager started");
logger.info("FCH TEST: FileCopyBasedReplicationManager started");
PartitionStateChangeListenerImpl partitionStateChangeListener = new PartitionStateChangeListenerImpl();
// List<Long> partitionIds = Arrays.asList(20l, 127l);
//
// logger.info("FCH TEST: All Partitions to be hydrated up: {}", storeManager.getLocalPartitions().stream().map(
// PartitionId::getId).collect(Collectors.toList()));
//
// List<PartitionId> partitionIdList =
// storeManager.getLocalPartitions().stream().filter(p -> partitionIds.contains(p.getId())).collect(Collectors.toList());
//
// logger.info("FCH TEST: Partitions to be hydrated up: {}", partitionIdList);
// //Integrate clean up.
// ExecutorService executor = Executors.newFixedThreadPool(30); // use appropriate number of threads
//
// for (PartitionId partitionId : partitionIdList) {
// executor.execute(() -> {
// try {
// partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(String.valueOf(partitionId.getId()));
// } catch (Exception e) {
// logger.error("FCH TEST: Failed to build state for file copy for partition {}", partitionId, e);
// }
// });
// }
}

public void shutdown() throws InterruptedException {
logger.info("Shutting down FileCopyBasedReplicationManager");
logger.info("FCH TEST: Shutting down FileCopyBasedReplicationManager");
fileCopyBasedReplicationScheduler.shutdown();
fileCopyBasedReplicationSchedulerThread.join();
isRunning = false;
logger.info("FileCopyBasedReplicationManager shutdown");
logger.info("FCH TEST: FileCopyBasedReplicationManager shutdown");
}

class PartitionStateChangeListenerImpl implements PartitionStateChangeListener {

@Override
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
List<Long> partitionIds = Arrays.asList(20l);

logger.info("FCH TEST: All Partitions to be hydrated up: {}", storeManager.getLocalPartitions().stream().map(
PartitionId::getId).collect(Collectors.toList()));

List<PartitionId> partitionIdList =
storeManager.getLocalPartitions().stream().filter(p -> partitionIds.contains(p.getId())).collect(Collectors.toList());
if(!partitionIdList.stream().map(x -> x.getId()).collect(Collectors.toList()).contains(partitionName)) {
logger.warn("FCH TEST: Partition {} is not part of the list of partitions to be hydrated up. Ignoring state change", partitionName);
return;
}

if(!isRunning){
logger.info("FileCopyBasedReplicationManager is not running. Ignoring state change for partition: {}", partitionName);
logger.info("FCH TEST: FileCopyBasedReplicationManager is not running. Ignoring state change for partition: {}", partitionName);
throw new StateTransitionException("FileCopyBasedReplicationManager is not running. Ignoring state "
+ "change for partition: " + partitionName, StateTransitionException.
TransitionErrorCode.FileCopyBasedReplicationManagerNotRunning);
Expand All @@ -138,20 +180,21 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
* If the file copy was already completed, then no need to do it again.
*/
if(storeManager.isFileExists(replicaId.getPartitionId(), storeConfig.storeFileCopyCompletedFileName)){
logger.info("File Copy Was Completed For Replica: " + replicaId.getPartitionId().toPathString());
logger.info("FCH TEST: File Copy Was Completed For Replica: " + replicaId.getPartitionId().toPathString());
return;
}

logger.info("Initiated File Copy Wait On ReplicaSyncUpManager for Replica: {}", replicaId.getPartitionId().toPathString());
logger.info("FCH TEST: Initiated File Copy Wait On ReplicaSyncUpManager for Replica: {}", replicaId.getPartitionId().toPathString());

replicaSyncUpManager.initiateFileCopy(replicaId);

logger.info("Adding Replica to Prioritization Manager For Replica: {}", replicaId.getPartitionId().toPathString());
logger.info("FCH TEST: Adding Replica to Prioritization Manager For Replica: {}", replicaId.getPartitionId().toPathString());
prioritizationManager.addReplica(replicaId);

try {
logger.info("Waiting for File Copy to be completed for Replica: {}", replicaId.getPartitionId().toPathString());
logger.info("FCH TEST: Waiting for File Copy to be completed for Replica: {}", replicaId.getPartitionId().toPathString());
replicaSyncUpManager.waitForFileCopyCompleted(partitionName);
logger.info("File Copy Completed for Replica: {}", replicaId.getPartitionId().toPathString());
logger.info("FCH TEST: File Copy Completed for Replica: {}", replicaId.getPartitionId().toPathString());
} catch (InterruptedException e) {
logger.error("File copy for partition {} was interrupted", partitionName);
throw new StateTransitionException("File copy for partition " + partitionName + " was interrupted",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.slf4j.LoggerFactory;


class FileCopyBasedReplicationSchedulerImpl implements FileCopyBasedReplicationScheduler{
class FileCopyBasedReplicationSchedulerImpl implements FileCopyBasedReplicationScheduler {
private final FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig;
private final FileCopyHandlerFactory fileCopyHandlerFactory;
private final ClusterMap clusterMap;
Expand Down Expand Up @@ -88,6 +88,7 @@ public FileCopyBasedReplicationSchedulerImpl(@Nonnull FileCopyHandlerFactory fil
this.replicaToStatusListenerMap = new ConcurrentHashMap<>();
}

@Override
public void run(){
isRunning = true;
logger.info("FileCopyBasedReplicationSchedulerImpl Started");
Expand All @@ -113,7 +114,7 @@ List<ReplicaId> findStarvedReplicas() {
&& System.currentTimeMillis() / 1000 - replicaToStartTimeMap.get(replica)
> fileCopyBasedReplicationConfig.fileCopyReplicaTimeoutSecs) {

logger.info("Replica: {} is starved for hydration. Time since start: {} seconds",
logger.info("FCH TEST: Replica: {} is starved for hydration. Time since start: {} seconds",
replica.getPartitionId().toPathString(),
System.currentTimeMillis() / 1000 - replicaToStartTimeMap.get(replica));
replicasToDropFromHydration.add(replica);
Expand All @@ -140,17 +141,17 @@ public void shutdown() throws InterruptedException {

@Override
public void scheduleFileCopy() throws InterruptedException {
logger.info("Starting File Copy Scheduler");
logger.info("FCH TEST: Starting File Copy Scheduler");
while(isRunning){

logger.info("FCH TEST: Sleeping For File Copy Scheduler Wait Time: " + fileCopyBasedReplicationConfig.fileCopySchedulerWaitTimeSecs);
Thread.sleep(fileCopyBasedReplicationConfig.fileCopySchedulerWaitTimeSecs*1000);

List<ReplicaId> replicasToDropForHydration = findStarvedReplicas();
if(!replicasToDropForHydration.isEmpty()){
logger.info("Found Replicas To Drop From Hydration: " + replicasToDropForHydration.stream()
logger.info("FCH TEST: Found Replicas To Drop From Hydration: " + replicasToDropForHydration.stream()
.map(replicaId -> replicaId.getPartitionId().toPathString()).collect(Collectors.toList()));
} else{
logger.info("No Replicas To Drop From Hydration In Current Cycle");
logger.info("FCH TEST: No Replicas To Drop From Hydration In Current Cycle");
}

for(ReplicaId replica: replicasToDropForHydration){
Expand All @@ -171,9 +172,14 @@ public void scheduleFileCopy() throws InterruptedException {
}

List<DiskId> disksToHydrate = fileCopyBasedReplicationThreadPoolManager.getDiskIdsToHydrate();

for(DiskId diskId: disksToHydrate){
List<ReplicaId> replicaIds = getNextReplicaToHydrate(diskId, fileCopyBasedReplicationConfig.fileCopyParallelPartitionHydrationCountPerDisk);
logger.info("Starting Hydration For Disk: {} with ReplicaId: {}", diskId, replicaIds.stream().map(replicaId -> replicaId.getPartitionId().toPathString()));
if(replicaIds == null || replicaIds.isEmpty()){
logger.info("FCH TEST: No Replicas To Hydrate For Disk: " + diskId.getMountPath());
continue;
}
logger.info("FCH TEST: Starting Hydration For Disk: {} with ReplicaId: {}", diskId, replicaIds.stream().map(replicaId -> replicaId.getPartitionId().toPathString()));

if(!replicaIds.isEmpty()){
for(ReplicaId replicaId: replicaIds) {
Expand All @@ -193,6 +199,18 @@ public void scheduleFileCopy() throws InterruptedException {
fileCopyStatusListener.onFileCopyFailure(e);
continue;
}
try{
/**
* Use FileCopyTemporaryDirectoryName to create a temporary directory for file copy.
* This will be used to write the files which are not yet written and can be cleaned
* up without
*/
createTemporaryDirectoryForFileCopyIfAbsent(replicaId, fileCopyBasedReplicationConfig);
} catch (IOException e){
logger.error("Error Creating Temporary Directory For Replica: " + replicaId.getPartitionId().toPathString());
fileCopyStatusListener.onFileCopyFailure(e);
continue;
}

fileCopyBasedReplicationThreadPoolManager.submitReplicaForHydration(replicaId,
fileCopyStatusListener, fileCopyHandler);
Expand All @@ -202,7 +220,7 @@ public void scheduleFileCopy() throws InterruptedException {
replicaToStartTimeMap.put(replicaId, System.currentTimeMillis()/1000);
}
} else{
logger.info("No Replicas To Hydrate For Disk: " + diskId);
logger.info("FCH TEST: No Replicas To Hydrate For Disk: " + diskId);
}
}
}
Expand All @@ -222,6 +240,15 @@ void createFileCopyInProgressFileIfAbsent(ReplicaId replica) throws IOException
}
}

void createTemporaryDirectoryForFileCopyIfAbsent(ReplicaId replica, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig) throws IOException {
logger.info("FCH TEST: Creating Temporary Directory For File Copy: " + fileCopyBasedReplicationConfig.fileCopyTemporaryDirectoryName);
File fileCopyTemporaryDirectory = new File(replica.getReplicaPath(), fileCopyBasedReplicationConfig.fileCopyTemporaryDirectoryName);
if (!fileCopyTemporaryDirectory.exists()) {
fileCopyTemporaryDirectory.mkdirs();
}
}


@Override
public int getThreadPoolSize() {
return fileCopyBasedReplicationThreadPoolManager.getThreadPoolSize();
Expand All @@ -243,6 +270,7 @@ public ReplicaId getReplicaId() {

@Override
public void onFileCopySuccess() {
logger.info("FCH TEST: Hydration Completed For Replica: " + replicaId.getPartitionId().toPathString());
removeReplicaFromFileCopy(replicaId);
replicaSyncUpManager.onFileCopyComplete(replicaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,28 @@ public class FileCopyThread extends Thread {

@Override
public void run() {
logger.info("Starting FileCopyThread: {} for replicaId: {}", threadName, fileCopyStatusListener.getReplicaId());
logger.info("FCH TEST: Starting FileCopyThread: {} for replicaId: {}", threadName, fileCopyStatusListener.getReplicaId());

try {
ReplicaId replicaId = fileCopyStatusListener.getReplicaId();
if (replicaId == null) {
throw new IllegalStateException("ReplicaId cannot be null");
}

logger.info("FCH TEST: ReplicaId Mount Path Is {}", replicaId.getMountPath());

//TODO add logic to get the source and target replica id
ReplicaId targetReplicaId = FileCopyUtils.getPeerForFileCopy(replicaId.getPartitionId(), replicaId.getDataNodeId().getDatacenterName());

if(targetReplicaId == null) {
throw new IllegalStateException("Target ReplicaId cannot be null");
if (targetReplicaId == null) {
logger.warn("No peer replica found for file copy for replicaId: {}", replicaId);
fileCopyStatusListener.onFileCopyFailure(new IOException("No peer replica found for file copy"));
return;
}

logger.info("FCH TEST: Starting file copy from {} to {}", replicaId.getDataNodeId(), targetReplicaId.getDataNodeId());



FileCopyInfo fileCopyInfo = new FileCopyInfo(START_CORRELATION_ID, CLIENT_ID, replicaId, targetReplicaId);
fileCopyHandler.start();
// Start the file copy process
Expand Down
Loading
Loading