Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation improvement #48

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class DeviceDriver extends AbstractService implements AutoClosea
private static final String CREATE_SCOPE_KEY = "CREATE_SCOPE";

public DeviceDriver(DeviceDriverConfig config) {
this.config = Preconditions.checkNotNull(config, "deviceDriverConfig");
this.config = Preconditions.checkNotNull(config, "config");
LOGGER.info("Create Scope: {}", isCreateScope());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.accelerometer;

import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Hex;

/**
Expand All @@ -18,7 +19,7 @@ public class AccelerometerRawData {
public final byte[] bytes;

public AccelerometerRawData(byte[] bytes) {
this.bytes = bytes;
this.bytes = Preconditions.checkNotNull(bytes, "bytes");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
Expand Down Expand Up @@ -59,11 +60,12 @@ public abstract class FileProcessor {
private final Path movedFilesDirectory;

public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {
this.config = config;
this.state = state;
this.writer = writer;
this.transactionCoordinator = transactionCoordinator;
this.eventGenerator = getEventGenerator(config);
this.config = Preconditions.checkNotNull(config, "config");
Preconditions.checkNotNull(config.stateDatabaseFileName, "config.stateDatabaseFileName");
this.state = Preconditions.checkNotNull(state, "state");
this.writer = Preconditions.checkNotNull(writer, "writer");
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
this.eventGenerator = Preconditions.checkNotNull(getEventGenerator(config), "eventGenerator");
this.movedFilesDirectory = Paths.get(config.stateDatabaseFileName).getParent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
Expand Down Expand Up @@ -41,10 +42,10 @@ public class CsvFileEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public CsvFileEventGenerator(String routingKey, int maxRecordsPerEvent, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static CsvFileEventGenerator create(String routingKey, int maxRecordsPerEvent, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.CountingInputStream;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -57,10 +58,10 @@ public class ParquetEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public ParquetEventGenerator(String routingKey, int maxRecordsPerEvent, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static ParquetEventGenerator create(String routingKey, int maxRecordsPerEvent, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
Expand All @@ -37,9 +38,9 @@ public class RawEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public RawEventGenerator(String routingKey, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static RawEventGenerator create(String routingKey, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.sensor.collector.simple;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,9 +29,9 @@ public class DataCollectorService<R, S extends Samples> extends AbstractExecutio
private final SimpleDeviceDriver<R, S> driver;

public DataCollectorService(String instanceName, BlockingQueue<R> memoryQueue, SimpleDeviceDriver<R, S> driver) {
this.instanceName = instanceName;
this.memoryQueue = memoryQueue;
this.driver = driver;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.memoryQueue = Preconditions.checkNotNull(memoryQueue, "memoryQueue");
this.driver = Preconditions.checkNotNull(driver, "driver");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.sensor.collector.simple;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,10 +32,10 @@ public class MemoryQueueToPersistentQueueService<R, S extends Samples> extends A
private final int samplesPerEvent;

public MemoryQueueToPersistentQueueService(String instanceName, BlockingQueue<R> memoryQueue, PersistentQueue persistentQueue, SimpleDeviceDriver<R, S> driver, int samplesPerEvent) {
this.instanceName = instanceName;
this.memoryQueue = memoryQueue;
this.persistentQueue = persistentQueue;
this.driver = driver;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.memoryQueue = Preconditions.checkNotNull(memoryQueue, "memoryQueue");
this.persistentQueue = Preconditions.checkNotNull(persistentQueue, "persistentQueue");
this.driver = Preconditions.checkNotNull(driver, "driver");
this.samplesPerEvent = samplesPerEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.simple;

import com.google.common.base.Preconditions;
import io.pravega.sensor.collector.util.AutoRollback;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,8 +49,8 @@ public class PersistentQueue implements AutoCloseable {
*/
public PersistentQueue(Connection connection, TransactionCoordinator transactionCoordinator, long capacity) {
try {
this.connection = connection;
this.transactionCoordinator = transactionCoordinator;
this.connection = Preconditions.checkNotNull(connection, "connection");
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
final long initialSize = getDatabaseRecordCount();
LOGGER.info("Persistent queue has {} elements.", initialSize);
final int permits = (int) Long.max(Integer.MIN_VALUE, Long.min(Integer.MAX_VALUE, capacity - initialSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector.simple;

import com.google.common.base.Preconditions;

import java.nio.charset.StandardCharsets;

public class PersistentQueueElement {
Expand All @@ -19,8 +21,8 @@ public class PersistentQueueElement {

public PersistentQueueElement(long id, byte[] bytes, String routingKey, long timestamp) {
this.id = id;
this.bytes = bytes;
this.routingKey = routingKey;
this.bytes = Preconditions.checkNotNull(bytes, "bytes");
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.timestamp = timestamp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.sensor.collector.simple;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.util.EventWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,9 +32,9 @@ public class PersistentQueueToPravegaService extends AbstractExecutionThreadServ
private final long delayBetweenWriteBatchesMs;

public PersistentQueueToPravegaService(String instanceName, PersistentQueue persistentQueue, EventWriter<byte[]> writer, int maxEventsPerWriteBatch, long delayBetweenWriteBatchesMs) {
this.instanceName = instanceName;
this.persistentQueue = persistentQueue;
this.writer = writer;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.persistentQueue = Preconditions.checkNotNull(persistentQueue, "persistentQueue");
this.writer = Preconditions.checkNotNull(writer, "writer");
this.maxEventsPerWriteBatch = maxEventsPerWriteBatch;
this.delayBetweenWriteBatchesMs = delayBetweenWriteBatchesMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
*/
package io.pravega.sensor.collector.stateful;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractExecutionThreadService;

import org.slf4j.Logger;
Expand All @@ -28,9 +29,9 @@ public class DataCollectorService<S> extends AbstractExecutionThreadService {

public DataCollectorService(String instanceName, PersistentQueue persistentQueue,
StatefulSensorDeviceDriver<S> driver) {
this.instanceName = instanceName;
this.persistentQueue = persistentQueue;
this.driver = driver;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.persistentQueue = Preconditions.checkNotNull(persistentQueue, "persistentQueue");
this.driver = Preconditions.checkNotNull(driver, "driver");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@

import java.util.List;

import com.google.common.base.Preconditions;
import io.pravega.sensor.collector.simple.PersistentQueueElement;

public class PollResponse<S> {
public final List<PersistentQueueElement> events;
public final S state;

public PollResponse(List<PersistentQueueElement> events, S state) {
this.events = events;
this.state = state;
this.events = Preconditions.checkNotNull(events, "events");
this.state = Preconditions.checkNotNull(state, "state");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.stateful;

import io.pravega.keycloak.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +26,7 @@ public class ReadingState {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadingState.class);
public final Connection connection;
public ReadingState(Connection connection) {
this.connection = connection;
this.connection = Preconditions.checkNotNull(connection, "connection");
try {
try (final Statement statement = connection.createStatement()) {
statement.execute("create table if not exists LastReadingState (" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.util;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +26,7 @@ public class AutoRollback implements AutoCloseable {
private boolean committed;

public AutoRollback(Connection connection) {
this.connection = connection;
this.connection = Preconditions.checkNotNull(connection, "connection");
}

public void commit() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector.util;

import io.pravega.keycloak.com.google.common.base.Preconditions;

import java.util.Objects;

/**
Expand All @@ -25,7 +27,7 @@ public class FileNameWithOffset implements Comparable<FileNameWithOffset> {
public final long offset;

public FileNameWithOffset(String fileName, long offset) {
this.fileName = fileName;
this.fileName = Preconditions.checkNotNull(fileName, "fileName");
this.offset = offset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.util;

import com.google.common.base.Preconditions;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
Expand All @@ -27,7 +28,7 @@ public class NonTransactionalEventWriter<T> implements EventWriter<T> {
private final EventStreamWriter<T> writer;

public NonTransactionalEventWriter(EventStreamWriter<T> writer) {
this.writer = writer;
this.writer = Preconditions.checkNotNull(writer, "writer");
}

public void writeEvent(String routingKey, T event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.util;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,6 +25,7 @@ public class PersistentId {
private final UUID persistentId;

public PersistentId(Connection connection) {
Preconditions.checkNotNull(connection, "connection");
try {
try (final Statement statement = connection.createStatement()) {
// Create table if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector.util;

import com.google.common.base.Preconditions;

/**
* Event generated from file and its sequence number
*/
Expand All @@ -18,9 +20,9 @@ public class PravegaWriterEvent {
public final byte[] bytes;

public PravegaWriterEvent(String routingKey, long sequenceNumber, byte[] bytes) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.sequenceNumber = sequenceNumber;
this.bytes = bytes;
this.bytes = Preconditions.checkNotNull(bytes, "bytes");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.util;

import com.google.common.base.Preconditions;
import io.pravega.client.stream.TxnFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,8 +66,8 @@ public class TransactionCoordinator {
private final EventWriter<byte[]> writer;

public TransactionCoordinator(Connection connection, EventWriter<byte[]> writer) {
this.connection = connection;
this.writer = writer;
this.connection = Preconditions.checkNotNull(connection, "connection");
this.writer = Preconditions.checkNotNull(writer, "writer");
try {
try (final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down
Loading