Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,6 @@ public void remove(final Bytes key) {
segment.delete(key);
}

@Override
public void remove(final Bytes key, final long timestamp) {
final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp);
final S segment = segments.segmentForTimestamp(timestamp);
if (segment != null) {
segment.deleteRange(keyBytes, keyBytes);
}
}

@Override
public void put(final Bytes key,
final byte[] value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,6 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
forward);
}


@Override
public void remove(final Bytes key, final long timestamp) {
throw new UnsupportedOperationException("Not supported operation");
}

@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}

protected abstract S createSegment(long segmentId, String segmentName);

protected abstract void openSegmentDB(final S segment, final StateStoreContext context);

public void setPosition(final Position position) {
this.position = position;
}
Expand All @@ -80,6 +84,23 @@ public S segmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp));
}

@Override
public S getOrCreateSegment(final long segmentId,
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
final S newSegment = createSegment(segmentId, segmentName(segmentId));

if (segments.put(segmentId, newSegment) != null) {
throw new IllegalStateException(newSegment.getClass().getSimpleName() + " already exists. Possible concurrent access.");
}

openSegmentDB(newSegment, context);
return newSegment;
}
}

@Override
public S getOrCreateSegmentIfLive(final long segmentId,
final StateStoreContext context,
Expand All @@ -89,7 +110,9 @@ public S getOrCreateSegmentIfLive(final long segmentId,

if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
return getOrCreateSegment(segmentId, context);
final S segment = getOrCreateSegment(segmentId, context);
cleanupExpiredSegments(streamTime);
return segment;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.Map;
import java.util.Objects;

class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment>, Segment {
public final long id;
class KeyValueSegment extends RocksDBStore implements Segment {
private final long id;

KeyValueSegment(final String segmentName,
final String windowName,
Expand All @@ -39,6 +39,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
this.position = position;
}

@Override
public long id() {
return id;
}

@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
Expand All @@ -49,11 +54,6 @@ public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
super.deleteRange(keyFrom, keyTo);
}

@Override
public int compareTo(final KeyValueSegment segment) {
return Long.compare(id, segment.id);
}

@Override
public void openDB(final Map<String, Object> configs, final File stateDir) {
super.openDB(configs, stateDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,13 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
}

@Override
public KeyValueSegment getOrCreateSegment(final long segmentId,
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
final KeyValueSegment newSegment =
new KeyValueSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder);

if (segments.put(segmentId, newSegment) != null) {
throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access.");
}

newSegment.openDB(context.appConfigs(), context.stateDir());
return newSegment;
}
protected KeyValueSegment createSegment(final long segmentId, final String segmentName) {
return new KeyValueSegment(segmentName, name, segmentId, position, metricsRecorder);
}

@Override
public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
final StateStoreContext context,
final long streamTime) {
final KeyValueSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
return segment;
protected void openSegmentDB(final KeyValueSegment segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* stores a key into a shared physical store by prepending the key with a prefix (unique to
* the specific logical segment), and storing the combined key into the physical store.
*/
class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment, VersionedStoreSegment {
class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);

private final long id;
Expand All @@ -78,11 +78,6 @@ public long id() {
return id;
}

@Override
public int compareTo(final LogicalKeyValueSegment segment) {
return Long.compare(id, segment.id);
}

@Override
public synchronized void destroy() {
if (id < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,18 @@ public void setPosition(final Position position) {
}

@Override
public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
if (segmentId < 0) {
throw new IllegalArgumentException(
"Negative segment IDs are reserved for reserved segments, "
+ "and should be created through createReservedSegment() instead");
}

final LogicalKeyValueSegment newSegment = new LogicalKeyValueSegment(segmentId, segmentName(segmentId), physicalStore);

if (segments.put(segmentId, newSegment) != null) {
throw new IllegalStateException("LogicalKeyValueSegment already exists. Possible concurrent access.");
}

return newSegment;
protected LogicalKeyValueSegment createSegment(final long segmentId, final String segmentName) {
if (segmentId < 0) {
throw new IllegalArgumentException(
"Negative segment IDs are reserved for reserved segments, "
+ "and should be created through createReservedSegment() instead");
}
return new LogicalKeyValueSegment(segmentId, segmentName, physicalStore);
}

@Override
protected void openSegmentDB(final LogicalKeyValueSegment segment, final StateStoreContext context) {
// no-op -- a logical segment is just a view on an underlying physical store
}

LogicalKeyValueSegment createReservedSegment(final long segmentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@

import java.io.IOException;

public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore {
public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore, Comparable<Segment> {

long id();

void destroy() throws IOException;

void deleteRange(Bytes keyFrom, Bytes keyTo);

@Override
default int compareTo(final Segment segment) {
return Long.compare(id(), segment.id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ public interface SegmentedBytesStore extends StateStore {
*/
void remove(Bytes key);

/**
* Remove all duplicated records with the provided key in the specified timestamp.
*
* @param key the segmented key to remove
* @param timestamp the timestamp to match
*/
void remove(Bytes key, long timestamp);

/**
* Write a new value to the store with the provided key. The key
* should be a composite of the record key, and the timestamp information etc
Expand Down Expand Up @@ -159,18 +151,6 @@ interface KeySchema {
*/
Bytes lowerRange(final Bytes key, final long from);

/**
* Given a record key and a time, construct a Segmented key to search when performing
* prefixed queries.
*
* @param key
* @param timestamp
* @return The key that represents the prefixed Segmented key in bytes.
*/
default Bytes toStoreBinaryKeyPrefix(final Bytes key, final long timestamp) {
throw new UnsupportedOperationException();
}

/**
* Given a range of fixed size record keys and a time, construct a Segmented key that represents
* the upper range of keys to search when performing range queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.Map;
import java.util.Objects;

class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<TimestampedSegment>, Segment {
public final long id;
class TimestampedSegment extends RocksDBTimestampedStore implements Segment {
private final long id;

TimestampedSegment(final String segmentName,
final String windowName,
Expand All @@ -39,6 +39,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
this.position = position;
}

@Override
public long id() {
return id;
}

@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
Expand All @@ -49,15 +54,9 @@ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
throw new UnsupportedOperationException();
}

@Override
public int compareTo(final TimestampedSegment segment) {
return Long.compare(id, segment.id);
}

@Override
public void openDB(final Map<String, Object> configs, final File stateDir) {
super.openDB(configs, stateDir);
// skip the registering step
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
* header-aware storage with dual-column-family migration support from
* timestamp-only format to timestamp+headers format.
*/
class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
implements Comparable<TimestampedSegmentWithHeaders>, Segment {

public final long id;
class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders implements Segment {
private final long id;

TimestampedSegmentWithHeaders(final String segmentName,
final String windowName,
Expand All @@ -48,6 +46,11 @@ class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
this.position = position;
}

@Override
public long id() {
return id;
}

@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
Expand All @@ -58,15 +61,9 @@ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
throw new UnsupportedOperationException();
}

@Override
public int compareTo(final TimestampedSegmentWithHeaders segment) {
return Long.compare(id, segment.id);
}

@Override
public void openDB(final Map<String, Object> configs, final File stateDir) {
super.openDB(configs, stateDir);
// skip the registering step
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,13 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
}

@Override
public TimestampedSegment getOrCreateSegment(final long segmentId,
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
final TimestampedSegment newSegment =
new TimestampedSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder);

if (segments.put(segmentId, newSegment) != null) {
throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access.");
}

newSegment.openDB(context.appConfigs(), context.stateDir());
return newSegment;
}
protected TimestampedSegment createSegment(final long segmentId, final String segmentName) {
return new TimestampedSegment(segmentName, name, segmentId, position, metricsRecorder);
}

@Override
public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
final StateStoreContext context,
final long streamTime) {
final TimestampedSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
return segment;
protected void openSegmentDB(final TimestampedSegment segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
}

@Override
Expand Down
Loading