Skip to content

Commit d0715ea

Browse files
committed
#4855 Fix Plan B session state range
1 parent 57576ac commit d0715ea

File tree

9 files changed

+160
-16
lines changed

9 files changed

+160
-16
lines changed

stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/RangedStateDb.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
public class RangedStateDb extends AbstractDb<Key, StateValue> {
1919

20+
private static final ByteBuffer ZERO = ByteBuffer.allocateDirect(0);
21+
2022
RangedStateDb(final Path path,
2123
final ByteBufferFactory byteBufferFactory) {
2224
this(
@@ -53,11 +55,24 @@ public static RangedStateDb create(final Path path,
5355
public Optional<RangedState> getState(final RangedStateRequest request) {
5456
final ByteBuffer start = byteBufferFactory.acquire(Long.BYTES);
5557
try {
56-
start.putLong(request.key());
58+
start.putLong(request.key() + 1);
5759
start.flip();
5860

59-
final KeyRange<ByteBuffer> keyRange = KeyRange.atLeastBackward(start);
61+
// read(readTxn -> {
62+
// try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn)) {
63+
// final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
64+
// while (iterator.hasNext()
65+
// && !Thread.currentThread().isInterrupted()) {
66+
// final BBKV kv = BBKV.create(iterator.next());
67+
// final long keyStart = kv.key().getLong(0);
68+
// final long keyEnd = kv.key().getLong(Long.BYTES);
69+
// System.out.println("start=" + keyStart + ", keyEnd=" + keyEnd);
70+
// }
71+
// }
72+
// return Optional.empty();
73+
// });
6074

75+
final KeyRange<ByteBuffer> keyRange = KeyRange.openBackward(start, ZERO);
6176
return read(readTxn -> {
6277
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
6378
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();

stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionDb.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -181,16 +181,16 @@ public Optional<Session> getState(final SessionRequest request) {
181181
final long nameHash = LongHashFunction.xx3().hashBytes(request.name());
182182
final long time = request.time();
183183
final ByteBuffer start = byteBufferFactory.acquire(Long.BYTES + Long.BYTES);
184-
final ByteBuffer end = byteBufferFactory.acquire(Long.BYTES);
184+
final ByteBuffer stop = byteBufferFactory.acquire(Long.BYTES);
185185
try {
186186
start.putLong(nameHash);
187-
start.putLong(time);
187+
start.putLong(time + 1);
188188
start.flip();
189189

190-
end.putLong(nameHash);
191-
end.flip();
190+
stop.putLong(nameHash);
191+
stop.flip();
192192

193-
final KeyRange<ByteBuffer> keyRange = KeyRange.closedBackward(start, end);
193+
final KeyRange<ByteBuffer> keyRange = KeyRange.openBackward(start, stop);
194194
return read(readTxn -> {
195195
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
196196
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
@@ -219,7 +219,7 @@ public Optional<Session> getState(final SessionRequest request) {
219219
});
220220
} finally {
221221
byteBufferFactory.release(start);
222-
byteBufferFactory.release(end);
222+
byteBufferFactory.release(stop);
223223
}
224224
}
225225

stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateDb.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
public class TemporalRangedStateDb extends AbstractDb<Key, StateValue> {
1919

20+
private static final ByteBuffer ZERO = ByteBuffer.allocateDirect(0);
21+
2022
TemporalRangedStateDb(final Path path,
2123
final ByteBufferFactory byteBufferFactory) {
2224
this(
@@ -53,10 +55,30 @@ public static TemporalRangedStateDb create(final Path path,
5355
public Optional<TemporalRangedState> getState(final TemporalRangedStateRequest request) {
5456
final ByteBuffer start = byteBufferFactory.acquire(Long.BYTES);
5557
try {
56-
start.putLong(request.key());
58+
start.putLong(request.key() + 1);
5759
start.flip();
5860

59-
final KeyRange<ByteBuffer> keyRange = KeyRange.atLeastBackward(start);
61+
// read(readTxn -> {
62+
// try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn)) {
63+
// final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
64+
// while (iterator.hasNext()
65+
// && !Thread.currentThread().isInterrupted()) {
66+
// final BBKV kv = BBKV.create(iterator.next());
67+
// final long keyStart = kv.key().getLong(0);
68+
// final long keyEnd = kv.key().getLong(Long.BYTES);
69+
// final long effectiveTime = kv.key().getLong(Long.BYTES + Long.BYTES);
70+
// System.out.println("start=" +
71+
// keyStart +
72+
// ", keyEnd=" +
73+
// keyEnd +
74+
// ", effectiveTime=" +
75+
// DateUtil.createNormalDateTimeString(effectiveTime));
76+
// }
77+
// }
78+
// return Optional.empty();
79+
// });
80+
81+
final KeyRange<ByteBuffer> keyRange = KeyRange.openBackward(start, ZERO);
6082
return read(readTxn -> {
6183
Optional<TemporalRangedState> result = Optional.empty();
6284
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
@@ -69,7 +91,7 @@ public Optional<TemporalRangedState> getState(final TemporalRangedStateRequest r
6991
final long effectiveTime = kv.key().getLong(Long.BYTES + Long.BYTES);
7092
if (keyEnd < request.key()) {
7193
return result;
72-
} else if (effectiveTime >= request.effectiveTime() &&
94+
} else if (effectiveTime <= request.effectiveTime() &&
7395
keyStart <= request.key()) {
7496
final Key key = Key
7597
.builder()

stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalStateDb.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ public Optional<TemporalState> getState(final TemporalStateRequest request) {
5858
final ByteBuffer stop = byteBufferFactory.acquire(Long.BYTES);
5959
try {
6060
start.putLong(rowHash);
61-
start.putLong(request.effectiveTime());
61+
start.putLong(request.effectiveTime() + 1);
6262
start.flip();
6363

6464
stop.putLong(rowHash);
6565
stop.flip();
6666

67-
final KeyRange<ByteBuffer> keyRange = KeyRange.closedBackward(start, stop);
67+
final KeyRange<ByteBuffer> keyRange = KeyRange.openBackward(start, stop);
6868
return read(readTxn -> {
6969
try (final CursorIterable<ByteBuffer> cursor = dbi.iterate(readTxn, keyRange)) {
7070
final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();

stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestRangedStateDb.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,13 @@ void test(@TempDir Path tempDir) {
5757
assertThat(db.count()).isEqualTo(1);
5858
testGet(db);
5959

60-
final RangedStateRequest stateRequest =
61-
new RangedStateRequest(11);
62-
final Optional<RangedState> optional = db.getState(stateRequest);
60+
checkState(db, 9, false);
61+
for (int i = 10; i <= 30; i++) {
62+
checkState(db, i, true);
63+
}
64+
checkState(db, 31, false);
65+
66+
final Optional<RangedState> optional = getState(db, 11);
6367
assertThat(optional).isNotEmpty();
6468
final RangedState res = optional.get();
6569
assertThat(res.key().keyStart()).isEqualTo(10);
@@ -96,6 +100,20 @@ void test(@TempDir Path tempDir) {
96100
}
97101
}
98102

103+
private Optional<RangedState> getState(final RangedStateDb db, final long key) {
104+
final RangedStateRequest request =
105+
new RangedStateRequest(key);
106+
return db.getState(request);
107+
}
108+
109+
private void checkState(final RangedStateDb db,
110+
final long key,
111+
final boolean expected) {
112+
final Optional<RangedState> optional = getState(db, key);
113+
final boolean actual = optional.isPresent();
114+
assertThat(actual).isEqualTo(expected);
115+
}
116+
99117
@Test
100118
void testMerge(@TempDir final Path rootDir) throws IOException {
101119
final Path dbPath1 = rootDir.resolve("db1");

stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestSessionDb.java

+16
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ void test(@TempDir Path tempDir) {
6767
assertThat(db.count()).isEqualTo(109);
6868
testGet(db, key, refTime, 10);
6969

70+
checkState(db, key, highRange.max(), true);
71+
checkState(db, key, highRange.min(), true);
72+
checkState(db, key, lowRange.max(), true);
73+
checkState(db, key, lowRange.min(), true);
74+
checkState(db, key, highRange.max().plusMillis(1), false);
75+
checkState(db, key, lowRange.min().minusMillis(1), false);
7076

7177
// final SessionRequest sessionRequest = SessionRequest.builder().name("TEST").time(refTime).build();
7278
// final Optional<Session> optional = reader.getState(sessionRequest);
@@ -251,6 +257,16 @@ private void testGet(final SessionDb db,
251257
assertThat(res.key()).isEqualTo(key);
252258
}
253259

260+
private void checkState(final SessionDb db,
261+
final byte[] key,
262+
final Instant time,
263+
final boolean expected) {
264+
final SessionRequest request = new SessionRequest(key, time.toEpochMilli());
265+
final Optional<Session> optional = db.getState(request);
266+
final boolean actual = optional.isPresent();
267+
assertThat(actual).isEqualTo(expected);
268+
}
269+
254270
// @Test
255271
// void testRemoveOldData() {
256272
// ScyllaDbUtil.test((sessionProvider, tableName) -> {

stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalRangedStateDb.java

+29
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ void test(@TempDir Path tempDir) {
5959
assertThat(db.count()).isEqualTo(100);
6060
testGet(db);
6161

62+
// Check exact time states.
63+
checkState(db, 9, refTime.toEpochMilli(), false);
64+
for (int i = 10; i <= 30; i++) {
65+
checkState(db, i, refTime.toEpochMilli(), true);
66+
}
67+
checkState(db, 31, refTime.toEpochMilli(), false);
68+
69+
// Check before time states.
70+
for (int i = 9; i <= 31; i++) {
71+
checkState(db, i, refTime.toEpochMilli() - 1, false);
72+
}
73+
74+
// Check after time states.
75+
checkState(db, 9, refTime.toEpochMilli() + 1, false);
76+
for (int i = 10; i <= 30; i++) {
77+
checkState(db, i, refTime.toEpochMilli() + 1, true);
78+
}
79+
checkState(db, 31, refTime.toEpochMilli() + 1, false);
6280

6381
final TemporalRangedStateRequest stateRequest =
6482
new TemporalRangedStateRequest(11, refTime.toEpochMilli());
@@ -165,6 +183,17 @@ private void testGet(final TemporalRangedStateDb db) {
165183
assertThat(res.toString()).isEqualTo("test");
166184
}
167185

186+
private void checkState(final TemporalRangedStateDb db,
187+
final long key,
188+
final long effectiveTime,
189+
final boolean expected) {
190+
final TemporalRangedStateRequest request =
191+
new TemporalRangedStateRequest(key, effectiveTime);
192+
final Optional<TemporalRangedState> optional = db.getState(request);
193+
final boolean actual = optional.isPresent();
194+
assertThat(actual).isEqualTo(expected);
195+
}
196+
168197
//
169198
// @Test
170199
// void testRemoveOldData() {

stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalStateDb.java

+20
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ void test(@TempDir Path tempDir) {
5656
TemporalStateSettings.builder().build(),
5757
true)) {
5858
assertThat(db.count()).isEqualTo(100);
59+
60+
final byte[] byteKey = "TEST_KEY".getBytes(StandardCharsets.UTF_8);
61+
// Check exact time states.
62+
checkState(db, byteKey, refTime.toEpochMilli(), true);
63+
// Check before time states.
64+
checkState(db, byteKey, refTime.toEpochMilli() - 1, false);
65+
// Check after time states.
66+
checkState(db, byteKey, refTime.toEpochMilli() + 1, true);
67+
5968
// final TemporalStateRequest stateRequest =
6069
// new TemporalStateRequest("TEST_MAP", "TEST_KEY", refTime);
6170
final TemporalState.Key key = TemporalState.Key.builder().name("TEST_KEY").effectiveTime(refTime).build();
@@ -224,4 +233,15 @@ private void insertData(final TemporalStateDb db,
224233
}
225234
});
226235
}
236+
237+
private void checkState(final TemporalStateDb db,
238+
final byte[] key,
239+
final long effectiveTime,
240+
final boolean expected) {
241+
final TemporalStateRequest request =
242+
new TemporalStateRequest(key, effectiveTime);
243+
final Optional<TemporalState> optional = db.getState(request);
244+
final boolean actual = optional.isPresent();
245+
assertThat(actual).isEqualTo(expected);
246+
}
227247
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
* Issue **#4855** : Fix Plan B session state range.
2+
3+
4+
```sh
5+
# ********************************************************************************
6+
# Issue title: PlanB Session State
7+
# Issue link: https://github.com/gchq/stroom/issues/4855
8+
# ********************************************************************************
9+
10+
# ONLY the top line will be included as a change entry in the CHANGELOG.
11+
# The entry should be in GitHub flavour markdown and should be written on a SINGLE
12+
# line with no hard breaks. You can have multiple change files for a single GitHub issue.
13+
# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than
14+
# 'Fixed nasty bug'.
15+
#
16+
# Examples of acceptable entries are:
17+
#
18+
#
19+
# * Issue **123** : Fix bug with an associated GitHub issue in this repository
20+
#
21+
# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository
22+
#
23+
# * Fix bug with no associated GitHub issue.
24+
```

0 commit comments

Comments
 (0)