Skip to content

Commit 1fc7bc9

Browse files
committed
gh-4610 Added compaction after purge, WIP on OffHeapStoreInfo
1 parent ba309dc commit 1fc7bc9

30 files changed

+1735
-222
lines changed

stroom-lmdb/src/main/java/stroom/lmdb/AbstractLmdbDb.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,15 @@ public abstract class AbstractLmdbDb<K, V>
108108
private final Serde<K> keySerde;
109109
private final Serde<V> valueSerde;
110110
private final String dbName;
111-
private final Dbi<ByteBuffer> lmdbDbi;
111+
private final DbiFlags[] dbiFlags;
112112
private final LmdbEnv lmdbEnvironment;
113113
private final ByteBufferPool byteBufferPool;
114114

115115
private final int keyBufferCapacity;
116116
private final int valueBufferCapacity;
117117

118+
private final DbiProxy lmdbDbi;
119+
118120
/**
119121
* @param lmdbEnvironment The LMDB {@link Env} to add this DB to.
120122
* @param byteBufferPool A self loading pool of reusable ByteBuffers.
@@ -133,8 +135,9 @@ public AbstractLmdbDb(final LmdbEnv lmdbEnvironment,
133135
this.keySerde = keySerde;
134136
this.valueSerde = valueSerde;
135137
this.dbName = dbName;
138+
this.dbiFlags = dbiFlags;
136139
this.lmdbEnvironment = lmdbEnvironment;
137-
this.lmdbDbi = lmdbEnvironment.openDbi(dbName, dbiFlags);
140+
this.lmdbDbi = openDbi();
138141
this.byteBufferPool = byteBufferPool;
139142

140143
int keySerdeCapacity = keySerde.getBufferCapacity();
@@ -149,6 +152,10 @@ public AbstractLmdbDb(final LmdbEnv lmdbEnvironment,
149152
this.valueBufferCapacity = valueSerde.getBufferCapacity();
150153
}
151154

155+
private DbiProxy openDbi() {
156+
return lmdbEnvironment.openDbi(dbName, dbiFlags);
157+
}
158+
152159
private static Dbi<ByteBuffer> openDbi(final Env<ByteBuffer> env,
153160
final String name,
154161
final DbiFlags... dbiFlags) {
@@ -168,7 +175,7 @@ public String getDbName() {
168175
return dbName;
169176
}
170177

171-
public Dbi<ByteBuffer> getLmdbDbi() {
178+
public DbiProxy getLmdbDbi() {
172179
return lmdbDbi;
173180
}
174181

@@ -419,7 +426,7 @@ public <T> T streamEntriesAsBytes(final Txn<ByteBuffer> txn,
419426
final KeyRange<ByteBuffer> keyRange,
420427
final Function<Stream<CursorIterable.KeyVal<ByteBuffer>>, T> streamFunction) {
421428

422-
try (CursorIterable<ByteBuffer> cursorIterable = getLmdbDbi().iterate(txn, keyRange)) {
429+
try (CursorIterable<ByteBuffer> cursorIterable = lmdbDbi.iterate(txn, keyRange)) {
423430
final Stream<CursorIterable.KeyVal<ByteBuffer>> stream =
424431
StreamSupport.stream(cursorIterable.spliterator(), false);
425432

@@ -927,7 +934,7 @@ public long getEntryCount() {
927934
public void logDatabaseContents(final Txn<ByteBuffer> txn, Consumer<String> logEntryConsumer) {
928935
LmdbUtils.logDatabaseContents(
929936
lmdbEnvironment,
930-
lmdbDbi,
937+
lmdbDbi.getDbi(),
931938
txn,
932939
keyBuffer -> deserializeKey(keyBuffer).toString(),
933940
valueBuffer -> deserializeValue(valueBuffer).toString(),
@@ -950,7 +957,7 @@ public void logDatabaseContents(final Txn<ByteBuffer> txn) {
950957
public void logDatabaseContents(Consumer<String> logEntryConsumer) {
951958
LmdbUtils.logDatabaseContents(
952959
lmdbEnvironment,
953-
lmdbDbi,
960+
lmdbDbi.getDbi(),
954961
byteBuffer -> keySerde.deserialize(byteBuffer).toString(),
955962
byteBuffer -> valueSerde.deserialize(byteBuffer).toString(),
956963
logEntryConsumer);
@@ -965,7 +972,7 @@ public void logDatabaseContents() {
965972
public void logRawDatabaseContents(final Txn<ByteBuffer> txn, Consumer<String> logEntryConsumer) {
966973
LmdbUtils.logRawDatabaseContents(
967974
lmdbEnvironment,
968-
lmdbDbi,
975+
lmdbDbi.getDbi(),
969976
txn,
970977
logEntryConsumer);
971978
}
@@ -986,7 +993,7 @@ public void logRawDatabaseContents(final Txn<ByteBuffer> txn) {
986993
public void logRawDatabaseContents(Consumer<String> logEntryConsumer) {
987994
LmdbUtils.logRawDatabaseContents(
988995
lmdbEnvironment,
989-
lmdbDbi,
996+
lmdbDbi.getDbi(),
990997
logEntryConsumer);
991998
}
992999

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package stroom.lmdb;
2+
3+
import stroom.util.io.FileUtil;
4+
import stroom.util.logging.LambdaLogger;
5+
import stroom.util.logging.LambdaLoggerFactory;
6+
import stroom.util.logging.LogUtil;
7+
8+
import org.lmdbjava.Cursor;
9+
import org.lmdbjava.CursorIterable;
10+
import org.lmdbjava.Dbi;
11+
import org.lmdbjava.DbiFlags;
12+
import org.lmdbjava.KeyRange;
13+
import org.lmdbjava.PutFlags;
14+
import org.lmdbjava.Stat;
15+
import org.lmdbjava.Txn;
16+
17+
import java.nio.ByteBuffer;
18+
import java.util.Comparator;
19+
import java.util.List;
20+
import java.util.Objects;
21+
import java.util.function.Consumer;
22+
import java.util.function.Supplier;
23+
24+
/**
25+
* A thin wrapper around a {@link Dbi} to abstract the user of this class from the actual
26+
* {@link Dbi} instance. This allows us to recreate the {@link Dbi} instance, e.g. after
27+
* doing a copy/compact of the {@link org.lmdbjava.Env}.
28+
* <p>
29+
* {@link DbiProxy} is constructed with a {@link Supplier} so the {@link Dbi} can be renewed
30+
* when required
31+
* </p>
32+
*/
33+
public class DbiProxy {
34+
35+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(DbiProxy.class);
36+
37+
private final LmdbEnv lmdbEnv;
38+
private final Supplier<Dbi<ByteBuffer>> dbiSupplier;
39+
private final String name;
40+
private volatile Dbi<ByteBuffer> dbi;
41+
42+
private DbiProxy(final LmdbEnv lmdbEnv,
43+
final String name,
44+
final Supplier<Dbi<ByteBuffer>> dbiSupplier) {
45+
this.lmdbEnv = lmdbEnv;
46+
this.name = name;
47+
this.dbiSupplier = Objects.requireNonNull(dbiSupplier);
48+
this.dbi = dbiSupplier.get();
49+
LOGGER.debug(() -> LogUtil.message("Initialising dbi {}:{} with name '{}' in lmdbEnv: {}",
50+
System.identityHashCode(this),
51+
System.identityHashCode(dbi),
52+
name,
53+
lmdbEnv.getName().orElseGet(() -> lmdbEnv.getLocalDir().toString())));
54+
}
55+
56+
/**
57+
* @param lmdbEnv The {@link LmdbEnv} for logging purposes
58+
* @param name The name of the Dbi
59+
* @param dbiSupplier {@link Supplier#get()} will be called to initialise the {@link DbiProxy}
60+
*/
61+
public static DbiProxy create(final LmdbEnv lmdbEnv,
62+
final String name,
63+
final Supplier<Dbi<ByteBuffer>> dbiSupplier) {
64+
return new DbiProxy(lmdbEnv, name, dbiSupplier);
65+
}
66+
67+
/**
68+
* Remove the {@link Dbi} so it can no longer be used.
69+
*/
70+
void clear() {
71+
LOGGER.debug(() -> LogUtil.message("Clearing dbi {}:{} with name '{}' in lmdbEnv: {}",
72+
System.identityHashCode(this),
73+
System.identityHashCode(dbi),
74+
name,
75+
lmdbEnv.getName().orElseGet(() -> lmdbEnv.getLocalDir().toString())));
76+
dbi = null;
77+
}
78+
79+
/**
80+
* If there is no {@link Dbi} create a new one using the internal supplier.
81+
*/
82+
void renew() {
83+
if (dbi == null) {
84+
synchronized (this) {
85+
if (dbi == null) {
86+
dbi = dbiSupplier.get();
87+
LOGGER.debug(() -> LogUtil.message("Renewing dbi {}:{} with name '{}' in lmdbEnv: {}",
88+
System.identityHashCode(this),
89+
System.identityHashCode(dbi),
90+
name,
91+
lmdbEnv.getName().orElseGet(() -> lmdbEnv.getLocalDir().toString())));
92+
}
93+
}
94+
}
95+
}
96+
97+
/**
98+
* @return The underlying dbi. Do NOT hold onto the dbi instance as it may be mutated.
99+
*/
100+
Dbi<ByteBuffer> getDbi() {
101+
try {
102+
return dbi;
103+
} catch (NullPointerException e) {
104+
throw new IllegalStateException(LogUtil.message(
105+
"dbi with name '{}' in env '{}' is not initialised",
106+
name,
107+
lmdbEnv.getName().orElseGet(() ->
108+
FileUtil.getCanonicalPath(lmdbEnv.getLocalDir()))));
109+
}
110+
}
111+
112+
/**
113+
* Use the underlying {@link Dbi} instance. Do NOT hold onto the dbi instance as it may be mutated.
114+
* Best avoid using it.
115+
*/
116+
public void withDbi(final Consumer<Dbi<ByteBuffer>> dbiConsumer) {
117+
dbiConsumer.accept(dbi);
118+
}
119+
120+
@Override
121+
public String toString() {
122+
return "DbiProxy{" +
123+
"name='" + name + '\'' +
124+
" env='" + lmdbEnv.getName().orElseGet(() ->
125+
FileUtil.getCanonicalPath(lmdbEnv.getLocalDir())) + '\'' +
126+
'}';
127+
}
128+
129+
130+
// --------------------------------------------------------------------------------
131+
// Following methods all delegate to the underlying Dbi
132+
// --------------------------------------------------------------------------------
133+
134+
public void close() {
135+
getDbi().close();
136+
}
137+
138+
public boolean delete(final ByteBuffer key) {
139+
return getDbi().delete(key);
140+
}
141+
142+
public boolean delete(final Txn<ByteBuffer> txn, final ByteBuffer key) {
143+
return getDbi().delete(txn, key);
144+
}
145+
146+
public boolean delete(final Txn<ByteBuffer> txn, final ByteBuffer key, final ByteBuffer val) {
147+
return getDbi().delete(txn, key, val);
148+
}
149+
150+
public void drop(final Txn<ByteBuffer> txn) {
151+
getDbi().drop(txn);
152+
}
153+
154+
public void drop(final Txn<ByteBuffer> txn, final boolean delete) {
155+
getDbi().drop(txn, delete);
156+
}
157+
158+
public ByteBuffer get(final Txn<ByteBuffer> txn, final ByteBuffer key) {
159+
return getDbi().get(txn, key);
160+
}
161+
162+
public byte[] getName() {
163+
return getDbi().getName();
164+
}
165+
166+
public CursorIterable<ByteBuffer> iterate(final Txn<ByteBuffer> txn) {
167+
return getDbi().iterate(txn);
168+
}
169+
170+
public CursorIterable<ByteBuffer> iterate(final Txn<ByteBuffer> txn,
171+
final KeyRange<ByteBuffer> range) {
172+
return getDbi().iterate(txn, range);
173+
}
174+
175+
public CursorIterable<ByteBuffer> iterate(final Txn<ByteBuffer> txn,
176+
final KeyRange<ByteBuffer> range,
177+
final Comparator<ByteBuffer> comparator) {
178+
return getDbi().iterate(txn, range, comparator);
179+
}
180+
181+
public List<DbiFlags> listFlags(final Txn<ByteBuffer> txn) {
182+
return getDbi().listFlags(txn);
183+
}
184+
185+
public Cursor<ByteBuffer> openCursor(final Txn<ByteBuffer> txn) {
186+
return getDbi().openCursor(txn);
187+
}
188+
189+
public void put(final ByteBuffer key, final ByteBuffer val) {
190+
getDbi().put(key, val);
191+
}
192+
193+
public boolean put(final Txn<ByteBuffer> txn,
194+
final ByteBuffer key,
195+
final ByteBuffer val,
196+
final PutFlags... flags) {
197+
return getDbi().put(txn, key, val, flags);
198+
}
199+
200+
public ByteBuffer reserve(final Txn<ByteBuffer> txn,
201+
final ByteBuffer key,
202+
final int size,
203+
final PutFlags... op) {
204+
return getDbi().reserve(txn, key, size, op);
205+
}
206+
207+
public Stat stat(final Txn<ByteBuffer> txn) {
208+
return getDbi().stat(txn);
209+
}
210+
}

0 commit comments

Comments
 (0)