Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for transferring raw files to NFS storage #73

Merged
merged 9 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static String createCompletedFileName(Path completedFilesDir, String file
}

/*
Move failed files to different directory
Move files to different directory
*/
static void moveFile(Path sourcePath, Path targetPath) throws IOException {
Files.createDirectories(targetPath.getParent());
Expand All @@ -185,9 +185,35 @@ static void moveFile(Path sourcePath, Path targetPath) throws IOException {
}
}
} catch (Exception e) {
LOGGER.warn("Unable to move failed file {}", e.getMessage());
LOGGER.warn("Failed file will be moved on the next iteration.");
LOGGER.warn("Unable to move file {}", e.getMessage());
LOGGER.warn("File will be moved on the next iteration.");
// We can continue on this error. Moving will be retried on the next iteration.
}
}

public static void movetoNFS(FileNameWithOffset fileEntry, Path nfsPath, String userFileSpec) throws IOException {
Path sourcePath = Paths.get(fileEntry.fileName);
LOGGER.debug("source path= {}", sourcePath);
LOGGER.debug("target path= {}", nfsPath);

Path userFileSpecPath = Paths.get(userFileSpec);
Path relativePath = userFileSpecPath.relativize(sourcePath);

String newTarget = nfsPath + File.separator + userFileSpecPath.getName(userFileSpecPath.getNameCount()-1) + File.separator + relativePath;
Path targetFile = Paths.get(newTarget);
LOGGER.info("Target path = {}", targetFile);

File f = new File(targetFile.getParent().toString().replace('\\', '/'));
if(!f.exists()) {
if(!f.mkdirs()){ //creates the directories in targetFile
LOGGER.error("Unable to create directories in target path");
throw new IOException("Unable to create directories");
}
}

Path tempFile = Paths.get(newTarget + ".temp");
Files.copy(sourcePath, tempFile, StandardCopyOption.REPLACE_EXISTING);
Files.move(tempFile, targetFile, StandardCopyOption.REPLACE_EXISTING);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ public class TransactionCoordinator {
public TransactionCoordinator(Connection connection, EventWriter<byte[]> writer) {
this.connection = connection;
this.writer = writer;
initializeDatabase();
}

public TransactionCoordinator(Connection connection) {
this.connection = connection;
this.writer = null;
initializeDatabase();
}

private void initializeDatabase() {
try {
try (final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.writetonfs;

/*
* Configuration file.
*/
public class FileConfig {
public final String stateDatabaseFileName;
public final String fileSpec;
public final String fileExtension;
public final String nfsMountPath;
public final String routingKey;
public final String eventTemplateStr;
public final String fileType;
/**
* Also known as samplesPerEvent.
*/
public final int maxRecordsPerEvent;

public final boolean enableDeleteCompletedFiles;
public final boolean exactlyOnce;
public final double transactionTimeoutMinutes;

public final long minTimeInMillisToUpdateFile;

public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String nfsMountPath,String routingKey, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
this.nfsMountPath = nfsMountPath;
this.routingKey = routingKey;
this.eventTemplateStr = eventTemplateStr;
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.enableDeleteCompletedFiles = enableDeleteCompletedFiles;
this.exactlyOnce = exactlyOnce;
this.transactionTimeoutMinutes = transactionTimeoutMinutes;
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
}

@Override
public String toString() {
return "FileConfig{"
+ "stateDatabaseFileName='" + stateDatabaseFileName + '\''
+ ", fileSpec='" + fileSpec + '\''
+ ", fileExtension='" + fileExtension + '\''
+ ", nfsMountPath='" + nfsMountPath + '\''
+ ", fileType='" + fileType + '\''
+ ", routingKey='" + routingKey + '\''
+ ", eventTemplateStr='" + eventTemplateStr + '\''
+ ", maxRecordsPerEvent=" + maxRecordsPerEvent
+ ", enableDeleteCompletedFiles=" + enableDeleteCompletedFiles
+ ", exactlyOnce=" + exactlyOnce
+ ", transactionTimeoutMinutes=" + transactionTimeoutMinutes
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.writetonfs;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.pravega.sensor.collector.DeviceDriver;
import io.pravega.sensor.collector.DeviceDriverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* File transfer service with common implementation logic for all files.
*/
public abstract class FileMoveService extends DeviceDriver {
private static final Logger LOG = LoggerFactory.getLogger(FileMoveService.class);

private static final String FILE_SPEC_KEY = "FILE_SPEC";
private static final String FILE_EXT = "FILE_EXTENSION";
private static final String NFS_MOUNT_PATH = "NFS_MOUNT_PATH";
private static final String DELETE_COMPLETED_FILES_KEY = "DELETE_COMPLETED_FILES";
private static final String DATABASE_FILE_KEY = "DATABASE_FILE";
private static final String EVENT_TEMPLATE_KEY = "EVENT_TEMPLATE";
private static final String SAMPLES_PER_EVENT_KEY = "SAMPLES_PER_EVENT";
private static final String INTERVAL_MS_KEY = "INTERVAL_MS";

private static final String ROUTING_KEY_KEY = "ROUTING_KEY";
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";

private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private final FileProcessor processor;
private final ScheduledExecutorService executor;

private ScheduledFuture<?> watchFiletask;
private ScheduledFuture<?> processFileTask;

public FileMoveService(DeviceDriverConfig config) {
super(config);
final FileConfig fileSequenceConfig = new FileConfig(
getDatabaseFileName(),
getFileSpec(),
getFileExtension(),
getNFSMountPath(),
getRoutingKey(),
getEventTemplate(),
getSamplesPerEvent(),
getDeleteCompletedFiles(),
getExactlyOnce(),
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName());
LOG.info("File Transfer Config: {}", fileSequenceConfig);
processor = FileProcessor.create(fileSequenceConfig);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
FileMoveService.class.getSimpleName() + "-" + config.getInstanceName() + "-%d").build();
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}

String getFileExtension() {
return getProperty(FILE_EXT, "");
}

String getNFSMountPath() {
return getProperty(NFS_MOUNT_PATH, "");
}

boolean getDeleteCompletedFiles() {
return Boolean.parseBoolean(getProperty(DELETE_COMPLETED_FILES_KEY, Boolean.toString(true)));
}

String getDatabaseFileName() {
return getProperty(DATABASE_FILE_KEY);
}

String getEventTemplate() {
return getProperty(EVENT_TEMPLATE_KEY, "{}");
}

int getSamplesPerEvent() {
return Integer.parseInt(getProperty(SAMPLES_PER_EVENT_KEY, Integer.toString(DEFAULT_SAMPLES_PER_EVENT_KEY)));
}

long getIntervalMs() {
return Long.parseLong(getProperty(INTERVAL_MS_KEY, Long.toString(DEFAULT_INTERVAL_MS_KEY)));
}

protected String getRoutingKey() {
return getProperty(ROUTING_KEY_KEY, "");
}

boolean getExactlyOnce() {
return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true)));
}

/**
* This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds).
*/
double getTransactionTimeoutMinutes() {
return Double.parseDouble(getProperty(TRANSACTION_TIMEOUT_MINUTES_KEY, Double.toString(18.0 * 60.0)));
}

long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

protected void watchFiles() {
LOG.trace("watchFiles: BEGIN");
try {
processor.watchFiles();
} catch (Exception e) {
LOG.error("watchFiles: watch file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("watchFiles: END");
}

protected void processFiles() {
LOG.trace("processFiles: BEGIN");
try {
processor.processFiles();
} catch (Exception e) {
LOG.error("processFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("processFiles: END");
}

@Override
protected void doStart() {
watchFiletask = executor.scheduleAtFixedRate(
this::watchFiles,
0,
getIntervalMs(),
TimeUnit.MILLISECONDS);
/*
Submits a periodic action that becomes enabled immediately for the first time,
and subsequently with the delay of 1 milliseconds between the termination of one execution and the commencement of the next
ie immediately after completion of first action.
*/
processFileTask = executor.scheduleWithFixedDelay(
this::processFiles,
0,
1,
TimeUnit.MILLISECONDS);
notifyStarted();
}

@Override
protected void doStop() {
LOG.info("doStop: Cancelling transfer task and process file task");
watchFiletask.cancel(false);
processFileTask.cancel(false);
}
}
Loading
Loading