Skip to content

Commit 872a475

Browse files
committed
Revert "fix: handle BerkeleyJE DB interruption [tp-tests]" CTR [tp-tests]
This reverts commit 90b9694. Reason: tp-tests are failing after this commit. Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
1 parent 9f09767 commit 872a475

File tree

9 files changed

+135
-259
lines changed

9 files changed

+135
-259
lines changed

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,11 @@
2727
import com.sleepycat.je.OperationStatus;
2828
import com.sleepycat.je.Put;
2929
import com.sleepycat.je.ReadOptions;
30-
import com.sleepycat.je.ThreadInterruptedException;
3130
import com.sleepycat.je.Transaction;
3231
import com.sleepycat.je.WriteOptions;
33-
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
3432
import org.janusgraph.diskstorage.BackendException;
3533
import org.janusgraph.diskstorage.PermanentBackendException;
3634
import org.janusgraph.diskstorage.StaticBuffer;
37-
import org.janusgraph.diskstorage.TemporaryBackendException;
3835
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
3936
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KVQuery;
4037
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KeySelector;
@@ -63,10 +60,10 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
6360
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());
6461

6562

66-
private volatile Database db;
63+
private final Database db;
6764
private final String name;
6865
private final BerkeleyJEStoreManager manager;
69-
private volatile boolean isOpen;
66+
private boolean isOpen;
7067

7168
public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
7269
db = data;
@@ -78,11 +75,6 @@ public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m
7875
public DatabaseConfig getConfiguration() throws BackendException {
7976
try {
8077
return db.getConfig();
81-
} catch (ThreadInterruptedException e) {
82-
Thread.currentThread().interrupt();
83-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
84-
} catch (EnvironmentFailureException e) {
85-
throw new TemporaryBackendException(e);
8678
} catch (DatabaseException e) {
8779
throw new PermanentBackendException(e);
8880
}
@@ -103,24 +95,15 @@ private Cursor openCursor(StoreTransaction txh) throws BackendException {
10395
return ((BerkeleyJETx) txh).openCursor(db);
10496
}
10597

106-
private static void closeCursor(StoreTransaction txh, Cursor cursor) throws BackendException {
98+
private static void closeCursor(StoreTransaction txh, Cursor cursor) {
10799
Preconditions.checkArgument(txh!=null);
108100
((BerkeleyJETx) txh).closeCursor(cursor);
109101
}
110102

111-
public void reopen(final Database db) {
112-
this.db = db;
113-
}
114-
115103
@Override
116104
public synchronized void close() throws BackendException {
117105
try {
118106
if(isOpen) db.close();
119-
} catch (ThreadInterruptedException e) {
120-
Thread.currentThread().interrupt();
121-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
122-
} catch (EnvironmentFailureException e) {
123-
throw new TemporaryBackendException(e);
124107
} catch (DatabaseException e) {
125108
throw new PermanentBackendException(e);
126109
}
@@ -144,11 +127,6 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx
144127
} else {
145128
return null;
146129
}
147-
} catch (ThreadInterruptedException e) {
148-
Thread.currentThread().interrupt();
149-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
150-
} catch (EnvironmentFailureException e) {
151-
throw new TemporaryBackendException(e);
152130
} catch (DatabaseException e) {
153131
throw new PermanentBackendException(e);
154132
}
@@ -183,11 +161,7 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
183161
@Override
184162
public boolean hasNext() {
185163
if (current == null) {
186-
try {
187-
current = getNextEntry();
188-
} catch (BackendException e) {
189-
throw new RuntimeException(e);
190-
}
164+
current = getNextEntry();
191165
}
192166
return current != null;
193167
}
@@ -202,26 +176,16 @@ public KeyValueEntry next() {
202176
return next;
203177
}
204178

205-
private KeyValueEntry getNextEntry() throws BackendException {
179+
private KeyValueEntry getNextEntry() {
206180
if (status != null && status != OperationStatus.SUCCESS) {
207181
return null;
208182
}
209183
while (!selector.reachedLimit()) {
210-
try {
211-
if (status == null) {
212-
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
213-
} else {
214-
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
215-
}
216-
} catch (ThreadInterruptedException e) {
217-
Thread.currentThread().interrupt();
218-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
219-
} catch (EnvironmentFailureException e) {
220-
throw new TemporaryBackendException(e);
221-
} catch (DatabaseException e) {
222-
throw new PermanentBackendException(e);
184+
if (status == null) {
185+
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
186+
} else {
187+
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
223188
}
224-
225189
if (status != OperationStatus.SUCCESS) {
226190
break;
227191
}
@@ -241,11 +205,7 @@ private KeyValueEntry getNextEntry() throws BackendException {
241205

242206
@Override
243207
public void close() {
244-
try {
245-
closeCursor(txh, cursor);
246-
} catch (BackendException e) {
247-
throw new RuntimeException(e);
248-
}
208+
closeCursor(txh, cursor);
249209
}
250210

251211
@Override
@@ -277,22 +237,13 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
277237
int convertedTtl = ttlConverter.apply(ttl);
278238
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
279239
}
280-
try {
281-
if (allowOverwrite) {
282-
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
283-
EnvironmentFailureException.assertState(result != null);
284-
status = OperationStatus.SUCCESS;
285-
} else {
286-
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
287-
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
288-
}
289-
} catch (ThreadInterruptedException e) {
290-
Thread.currentThread().interrupt();
291-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
292-
} catch (EnvironmentFailureException e) {
293-
throw new TemporaryBackendException(e);
294-
} catch (DatabaseException e) {
295-
throw new PermanentBackendException(e);
240+
if (allowOverwrite) {
241+
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
242+
EnvironmentFailureException.assertState(result != null);
243+
status = OperationStatus.SUCCESS;
244+
} else {
245+
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
246+
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
296247
}
297248

298249
if (status != OperationStatus.SUCCESS) {
@@ -310,11 +261,6 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
310261
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
311262
throw new PermanentBackendException("Could not remove: " + status);
312263
}
313-
} catch (ThreadInterruptedException e) {
314-
Thread.currentThread().interrupt();
315-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
316-
} catch (EnvironmentFailureException e) {
317-
throw new TemporaryBackendException(e);
318264
} catch (DatabaseException e) {
319265
throw new PermanentBackendException(e);
320266
}

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

Lines changed: 21 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,13 @@
2222
import com.sleepycat.je.DatabaseException;
2323
import com.sleepycat.je.Environment;
2424
import com.sleepycat.je.EnvironmentConfig;
25-
import com.sleepycat.je.EnvironmentFailureException;
2625
import com.sleepycat.je.LockMode;
27-
import com.sleepycat.je.ThreadInterruptedException;
2826
import com.sleepycat.je.Transaction;
2927
import com.sleepycat.je.TransactionConfig;
30-
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
3128
import org.janusgraph.diskstorage.BackendException;
3229
import org.janusgraph.diskstorage.BaseTransactionConfig;
3330
import org.janusgraph.diskstorage.PermanentBackendException;
3431
import org.janusgraph.diskstorage.StaticBuffer;
35-
import org.janusgraph.diskstorage.TemporaryBackendException;
3632
import org.janusgraph.diskstorage.common.LocalStoreManager;
3733
import org.janusgraph.diskstorage.configuration.ConfigNamespace;
3834
import org.janusgraph.diskstorage.configuration.ConfigOption;
@@ -52,10 +48,9 @@
5248
import org.slf4j.Logger;
5349
import org.slf4j.LoggerFactory;
5450

51+
import java.util.HashMap;
5552
import java.util.List;
5653
import java.util.Map;
57-
import java.util.concurrent.ConcurrentHashMap;
58-
import java.util.concurrent.ConcurrentMap;
5954

6055
import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;
6156

@@ -93,16 +88,19 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered
9388
ConfigOption.Type.MASKABLE, String.class,
9489
IsolationLevel.REPEATABLE_READ.toString(), disallowEmpty(String.class));
9590

96-
private final ConcurrentMap<String, BerkeleyJEKeyValueStore> stores;
91+
private final Map<String, BerkeleyJEKeyValueStore> stores;
9792

98-
protected volatile Environment environment;
93+
protected Environment environment;
9994
protected final StoreFeatures features;
10095

10196
public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
10297
super(configuration);
103-
stores = new ConcurrentHashMap<>();
98+
stores = new HashMap<>();
10499

105-
initialize();
100+
int cachePercentage = configuration.get(JVM_CACHE);
101+
boolean sharedCache = configuration.get(SHARED_CACHE);
102+
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
103+
initialize(cachePercentage, sharedCache, cacheMode);
106104

107105
features = new StandardStoreFeatures.Builder()
108106
.orderedScan(true)
@@ -113,24 +111,14 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
113111
.scanTxConfig(GraphDatabaseConfiguration.buildGraphConfiguration()
114112
.set(ISOLATION_LEVEL, IsolationLevel.READ_UNCOMMITTED.toString())
115113
)
116-
.supportsInterruption(true)
114+
.supportsInterruption(false)
117115
.cellTTL(true)
118116
.optimisticLocking(false)
119117
.build();
120118
}
121119

122-
private synchronized void initialize() throws BackendException {
120+
private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
123121
try {
124-
if (environment != null && environment.isValid()) {
125-
return;
126-
}
127-
128-
close(true);
129-
130-
int cachePercent = storageConfig.get(JVM_CACHE);
131-
boolean sharedCache = storageConfig.get(SHARED_CACHE);
132-
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);
133-
134122
EnvironmentConfig envConfig = new EnvironmentConfig();
135123
envConfig.setAllowCreate(true);
136124
envConfig.setTransactional(transactional);
@@ -143,28 +131,15 @@ private synchronized void initialize() throws BackendException {
143131
envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
144132
}
145133

146-
// Open the environment
134+
//Open the environment
147135
environment = new Environment(directory, envConfig);
148136

149-
// Reopen any existing DB connections
150-
for (String storeName : stores.keySet()) {
151-
openDatabase(storeName, true);
152-
}
153137
} catch (DatabaseException e) {
154138
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
155139
}
156140

157141
}
158142

159-
private synchronized void reInitialize(DatabaseException exception) throws BackendException {
160-
initialize();
161-
162-
if (exception instanceof ThreadInterruptedException) {
163-
Thread.currentThread().interrupt();
164-
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(exception);
165-
}
166-
}
167-
168143
@Override
169144
public StoreFeatures getFeatures() {
170145
return features;
@@ -175,7 +150,8 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
175150
throw new UnsupportedOperationException();
176151
}
177152

178-
private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retryEnvironmentFailure) throws BackendException {
153+
@Override
154+
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
179155
try {
180156
Transaction tx = null;
181157

@@ -206,27 +182,15 @@ private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean
206182
}
207183

208184
return btx;
209-
} catch (EnvironmentFailureException e) {
210-
reInitialize(e);
211-
212-
if (retryEnvironmentFailure) {
213-
return beginTransaction(txCfg, false);
214-
}
215-
216-
throw new TemporaryBackendException("Could not start BerkeleyJE transaction", e);
217185
} catch (DatabaseException e) {
218186
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
219187
}
220188
}
221189

222190
@Override
223-
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
224-
return beginTransaction(txCfg, true);
225-
}
226-
227-
private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean retryEnvironmentFailure) throws BackendException {
191+
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
228192
Preconditions.checkNotNull(name);
229-
if (stores.containsKey(name) && !force) {
193+
if (stores.containsKey(name)) {
230194
return stores.get(name);
231195
}
232196
try {
@@ -245,34 +209,13 @@ private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean
245209
log.debug("Opened database {}", name);
246210

247211
BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
248-
if (stores.containsKey(name)) {
249-
stores.get(name).reopen(db);
250-
} else {
251-
stores.put(name, store);
252-
}
212+
stores.put(name, store);
253213
return store;
254-
} catch (EnvironmentFailureException e) {
255-
reInitialize(e);
256-
257-
if (retryEnvironmentFailure) {
258-
return openDatabase(name, force, false);
259-
}
260-
261-
throw new TemporaryBackendException("Could not open BerkeleyJE data store", e);
262214
} catch (DatabaseException e) {
263215
throw new PermanentBackendException("Could not open BerkeleyJE data store", e);
264216
}
265217
}
266218

267-
private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
268-
return openDatabase(name, force, true);
269-
}
270-
271-
@Override
272-
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
273-
return openDatabase(name, false, true);
274-
}
275-
276219
@Override
277220
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
278221
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
@@ -309,16 +252,18 @@ void removeDatabase(BerkeleyJEKeyValueStore db) {
309252
log.debug("Removed database {}", name);
310253
}
311254

312-
public void close(boolean force) throws BackendException {
255+
256+
@Override
257+
public void close() throws BackendException {
313258
if (environment != null) {
314-
if (!force && !stores.isEmpty())
259+
if (!stores.isEmpty())
315260
throw new IllegalStateException("Cannot shutdown manager since some databases are still open");
316261
try {
317262
// TODO this looks like a race condition
318263
//Wait just a little bit before closing so that independent transaction threads can clean up.
319264
Thread.sleep(30);
320265
} catch (InterruptedException e) {
321-
Thread.currentThread().interrupt();
266+
//Ignore
322267
}
323268
try {
324269
environment.close();
@@ -329,11 +274,6 @@ public void close(boolean force) throws BackendException {
329274

330275
}
331276

332-
@Override
333-
public void close() throws BackendException {
334-
close(false);
335-
}
336-
337277
private static final Transaction NULL_TRANSACTION = null;
338278

339279
@Override

0 commit comments

Comments
 (0)