Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test #49

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ commonsCLIVersion=1.4
commonsCSVVersion=1.8
commonsCodecVersion=1.14
commonsMath3Version=3.6.1
grizzlyVersion=3.1.3
grizzlyVersion=2.35

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we are going to lower version here @dada-dell-emc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of higher version there is conflicting versions dependency on glassfish library.

gsonVersion=2.10.1
includePravegaCredentials=true
jacksonVersion=2.15.2
junitVersion=5.6.2
junitVersion=5.10.1
junitPlatformVersion=1.10.1
jakartaBindVersion=2.3.2
jaxbVersion=2.3.2
javaxServletApiVersion=3.0.1
Expand Down
1 change: 1 addition & 0 deletions parquet-file-sample-data/test_file/hello-world.parquet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello World.
8 changes: 6 additions & 2 deletions pravega-sensor-collector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ dependencies {

implementation "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
"commons-cli:commons-cli:${commonsCLIVersion}"
"commons-cli:commons-cli:${commonsCLIVersion}",
"io.pravega:pravega-standalone:${pravegaVersion}",
"io.pravega:pravega-test-integration:${pravegaVersion}"


if (includePravegaCredentials.toBoolean()) {
implementation "io.pravega:pravega-keycloak-client:${pravegaCredentialsVersion}"
Expand All @@ -68,7 +71,7 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
testImplementation "org.junit.vintage:junit-vintage-engine:${junitVersion}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}"
testImplementation "org.junit.platform:junit-platform-launcher"
testImplementation "org.junit.platform:junit-platform-launcher:${junitPlatformVersion}"

testImplementation "org.mockito:mockito-core:${mockitoVersion}"

Expand Down Expand Up @@ -105,6 +108,7 @@ startScripts {
}

shadowJar{
zip64 true
archiveBaseName = 'pravega-sensor-collector'
archiveClassifier = ''
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ protected void doStart() {
LOGGER.info("Starting device drivers");
final DeviceDriverFactory factory = new DeviceDriverFactory();
drivers = configs.stream().map(factory::create).collect(Collectors.toList());
drivers.stream().forEach((driver) -> driver.startAsync());
drivers.stream().forEach((driver) -> driver.awaitRunning());
drivers.forEach(AbstractService::startAsync);
drivers.forEach(AbstractService::awaitRunning);
LOGGER.info("All device drivers started successfully");
notifyStarted();
}

@Override
protected void doStop() {
drivers.stream().forEach((driver) -> driver.stopAsync());
drivers.stream().forEach((driver) -> driver.awaitTerminated());
LOGGER.info("Stopping all device drivers");
drivers.forEach(AbstractService::stopAsync);
drivers.forEach(AbstractService::awaitTerminated);
drivers = null;
LOGGER.info("Stopped all device drivers");
notifyStopped();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ public static String getEnvPrefix() {
return ENV_PREFIX;
}

public static Map<String, String> getProperties() {
final String fileName = getPropertiesFileName();
return getProperties(fileName);
}
/**
* Combines properties from:
* 1. properties file (if specified)
* 2. system environment
* Values in the system environment will override values in the properties file.
* It is intended that properties files only be used when developing in an IDE.
*/
public static Map<String, String> getProperties() {
public static Map<String, String> getProperties(final String fileName) {
Map<String, String> map = new HashMap<>();
final String fileName = getPropertiesFileName();
if (!fileName.isEmpty()) {
if (fileName != null && !fileName.isEmpty()) {
log.info("Reading properties from file {}", fileName);
Properties properties = new Properties();
try (FileInputStream inputStream = new FileInputStream(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";
private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES";
private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;
Expand Down Expand Up @@ -137,8 +137,8 @@ long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

long getDeleteCompletedFilesIntervalInMinutes() {
return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY, "720"));
long getDeleteCompletedFilesIntervalInSeconds() {
return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY, "43200"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it like are we trying to delete completed file every 12 hours? @dada-dell-emc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}

boolean getLargeEventEnable() {
Expand Down Expand Up @@ -167,14 +167,14 @@ protected void processFiles() {
}

protected void deleteCompletedFiles() {
LOG.trace("deleteCompletedFiles: BEGIN");
LOG.debug("deleteCompletedFiles: BEGIN");
try {
processor.deleteCompletedFiles();
} catch (Exception e) {
LOG.error("deleteCompletedFiles: Delete file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("deleteCompletedFiles: END");
LOG.debug("deleteCompletedFiles: END");
}

@Override
Expand All @@ -199,17 +199,19 @@ protected void doStart() {
deleteFileTask = executor.scheduleAtFixedRate(
this::deleteCompletedFiles,
1,
getDeleteCompletedFilesIntervalInMinutes(),
TimeUnit.MINUTES);
getDeleteCompletedFilesIntervalInSeconds(),
TimeUnit.SECONDS);

notifyStarted();
}

@Override
protected void doStop() {
LOG.info("doStop: Cancelling ingestion task and process file task");
LOG.info("doStop: Cancelling ingestion, process and delete file task");
watchFileTask.cancel(false);
processFileTask.cancel(false);
deleteFileTask.cancel(false);
LOG.info("doStop: Cancelled ingestion, process and delete file task");
notifyStopped();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TransactionStateSQLiteImpl implements AutoCloseable, TransactionSt

public TransactionStateSQLiteImpl(Connection connection, TransactionCoordinator transactionCoordinator) {
this.connection = Preconditions.checkNotNull(connection, "connection");
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
this.transactionCoordinator = transactionCoordinator;
}

@Override
Expand Down Expand Up @@ -124,7 +124,9 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf
deletePendingFileStatement.setString(1, fileName);
deletePendingFileStatement.setLong(2, beginOffset);
deletePendingFileStatement.execute();
transactionCoordinator.addTransactionToCommit(txnId);
if(transactionCoordinator!=null) {
transactionCoordinator.addTransactionToCommit(txnId);
}
autoRollback.commit();
}
}
Expand Down Expand Up @@ -171,7 +173,9 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf
*/
@Override
public void deleteTransactionToCommit(Optional<UUID> txnId) {
transactionCoordinator.deleteTransactionToCommit(txnId);
if(transactionCoordinator!=null) {
transactionCoordinator.deleteTransactionToCommit(txnId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package io.pravega.sensor.collector;

import com.google.common.util.concurrent.Service;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.*;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.SQliteDBUtility;
import io.pravega.sensor.collector.util.TransactionStateDB;
import io.pravega.sensor.collector.util.TransactionStateSQLiteImpl;
import io.pravega.test.integration.utils.SetupUtils;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PravegaSensorCollectorIntegrationTests {
private static final Logger log = LoggerFactory.getLogger(PravegaSensorCollectorIntegrationTests.class);
private final SetupUtils setupUtils = new SetupUtils();
static String fileName = "./src/test/resources/RawFileIngest-integration-test.properties";
Map<String, String> properties = null;
@BeforeEach
public void setup() {
log.info("Setup");
properties = Parameters.getProperties(fileName);
try {
setupUtils.startAllServices();

Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
} catch (Exception e) {
throw new RuntimeException(e);
}
properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI", setupUtils.getControllerUri().toString());
log.debug("Properties: {}", properties);
}

@AfterEach
public void tearDown() {
log.info("TearDown");
try {
setupUtils.stopAllServices();
} catch (Exception e) {
throw new RuntimeException(e);
}

try {
Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
} catch (IOException e) {
throw new RuntimeException(e);
}
properties = null;
}

@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testPSCDataIntegration() {
try {
copyHelloWorldFile();
} catch (IOException e) {
throw new RuntimeException(e);
}
URI controllerURI = setupUtils.getControllerUri();
String scope = "test-psc-data-integration";
String streamName ="test-psc-data-integration-stream";

properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE",scope);
properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM",streamName);

final DeviceDriverManager deviceDriverManager = new DeviceDriverManager(properties);
Service startService = deviceDriverManager.startAsync();
try {
startService.awaitRunning(Duration.ofSeconds(30));
Thread.sleep(12000);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
final Connection connection = SQliteDBUtility.createDatabase(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE"));
final TransactionStateDB state = new TransactionStateSQLiteImpl(connection, null);

try {
List<FileNameWithOffset> completedFiles = state.getCompletedFileRecords();
Assertions.assertEquals(1, completedFiles.size());

validateStreamData(controllerURI, scope, streamName, new String(Files.readAllBytes(Paths.get("../parquet-file-sample-data/test_file/hello-world.parquet"))));

Thread.sleep(5000);

Service stopService = deviceDriverManager.stopAsync();
stopService.awaitTerminated(Duration.ofSeconds(10));

// Till this time all the completed files should get deleted
completedFiles = state.getCompletedFileRecords();
Assertions.assertEquals(0, completedFiles.size());
connection.close();
} catch (SQLException | InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

private static void validateStreamData(URI controllerURI, String scope, String streamName, String content) {
StreamManager streamManager = StreamManager.create(controllerURI);

final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}

try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
ClientConfig.builder().controllerURI(controllerURI).build());
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new UTF8StringSerializer(),
ReaderConfig.builder().build())) {
System.out.format("Reading all the events from %s/%s%n", scope, streamName);
EventRead<String> eventRead = null;
try {
while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) {
String event = eventRead.getEvent();
System.out.format("Read event: %s", event);
Assertions.assertNotNull(event);
Assertions.assertFalse(event.isEmpty());
Assertions.assertEquals(content, event);
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
e.printStackTrace();
}
System.out.format("No more events from %s/%s%n", scope, streamName);
}
}

public void copyHelloWorldFile() throws IOException {
Path sourcePath = Paths.get("../parquet-file-sample-data/test_file/hello-world.parquet");
Path targetPath = Paths.get("../parquet-file-sample-data/integration-test/hello-world.parquet");
Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
}

@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testRawFile() {
try {
copyFile();
} catch (IOException e) {
throw new RuntimeException(e);
}

final DeviceDriverManager deviceDriverManager = new DeviceDriverManager(properties);
Service startService = deviceDriverManager.startAsync();
try {
startService.awaitRunning(Duration.ofSeconds(30));
Thread.sleep(15000);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
final Connection connection = SQliteDBUtility.createDatabase(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE"));
final TransactionStateDB state = new TransactionStateSQLiteImpl(connection, null);

try {
List<FileNameWithOffset> completedFiles = state.getCompletedFileRecords();
Assertions.assertEquals(3, completedFiles.size());

Thread.sleep(5000);

Service stopService = deviceDriverManager.stopAsync();
stopService.awaitTerminated(Duration.ofSeconds(10));

// Till this time all the completed files should get deleted
completedFiles = state.getCompletedFileRecords();
Assertions.assertEquals(0, completedFiles.size());
connection.close();
} catch (SQLException | InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}

public void copyFile() throws IOException {
Path sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub1.parquet");
Path targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub1.parquet");
Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub2.parquet");
targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub2.parquet");
Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub3.parquet");
targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub3.parquet");
Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
log.info("copyFile: Files copied successfully");
}
}
Loading