Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,29 @@ void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyHandle withHeadersColumnFamily = columnFamilies.get(1);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);

final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily);
noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
withHeadersColumnFamily,
HeadersBytesStore::convertToHeaderFormat,
this,
try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) {
noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
withHeadersColumnFamily,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
}
} catch (final RuntimeException e) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
throw e;
}
noHeadersIter.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ void openRocksDB(final DBOptions dbOptions,
noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening window store {} in upgrade mode from plain value format", name);
// Migrate from [value] to [headers][value]
// Add empty headers prefix [0x00] to plain value
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
Expand All @@ -92,6 +90,11 @@ void openRocksDB(final DBOptions dbOptions,
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
}
} catch (final RuntimeException e) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,26 @@ void openDB(final Map<String, Object> configs, final File stateDir) {
// Setup statistics before the database is opened, otherwise the statistics are not updated
// with the measurements from Rocks DB
setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
final Position existingPositionOrEmpty = cfAccessor.open(dbAccessor, !eosEnabled);
if (position == null) {
position = existingPositionOrEmpty;
} else {
// For segmented stores, the overall position is composed of multiple underlying stores, so merge this store's position into it.
position.merge(existingPositionOrEmpty);
openRocksDB(dbOptions, columnFamilyOptions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method internally calls createColumnFamilies() or mergeColumnFamilyHandleLists() which might fail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should do the necessary exception-handling inside openRocksDB instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see openRocksDB() already handles its own cleanup (CF handles + db), but the closeNativeResources() in openDB() covers a different failure scope that openRocksDB() can't handle.

These 6 native resources (userSpecifiedOptions, cache, filter, wOptions, fOptions, statistics) are all created in openDB() before openRocksDB() is ever called and openRocksDB() doesn't own them and has no references to clean them up.

If openRocksDB() itself fails, it will cleanup everything which is in finally block.

If openRocksDB() does not fail but cfAccessor.open() fails — now db is open. Hence closeNativeResources handles full teardown.

Other option is to move everything into openRocksDB(), but this would change the method signature across all overrides.

dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
final Position existingPositionOrEmpty = cfAccessor.open(dbAccessor, !eosEnabled);
if (position == null) {
position = existingPositionOrEmpty;
} else {
// For segmented stores, the overall position is composed of multiple underlying stores, so merge this store's position into it.
position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
}
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
} catch (final RuntimeException e) {
closeNativeResources();
throw e;
}

addValueProvidersToMetricsRecorder();
Expand Down Expand Up @@ -361,10 +366,20 @@ protected List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
.filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName())))
.collect(Collectors.toList());
final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size());
db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies);
final List<ColumnFamilyHandle> createdColumnFamilies = db.createColumnFamilies(toCreate);

return mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
final List<ColumnFamilyHandle> createdColumnFamilies = new ArrayList<>();
try {
db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies);
createdColumnFamilies.addAll(db.createColumnFamilies(toCreate));
return mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
} catch (final Exception e) {
for (final ColumnFamilyHandle handle : existingColumnFamilies) {
handle.close();
}
for (final ColumnFamilyHandle handle : createdColumnFamilies) {
handle.close();
}
throw e;
}

} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
Expand Down Expand Up @@ -787,6 +802,64 @@ public synchronized void close() {
statistics = null;
}

/**
* Close all native RocksDB resources with null-safety.
* Used only by the error cleanup path in {@link #openDB} where some resources
* may not have been initialized yet.
*/
private void closeNativeResources() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do already have code in close() which closes all these resources -- are these new helper necessary? I would believe that the runtime ensures that we call RocksDBStore.close() if init() (which calls openDB()) fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In close method, we are checking isOpen method which in turn checks open.get()

this would wait for openDB/openRocksDb() and if these fail, it would return false.

this 'false' does not close all resources in close() method.

closeDbAndAccessors();
closeOptionsAndFilters();
}

private void closeDbAndAccessors() {
if (cfAccessor != null) {
try {
if (dbAccessor != null) {
cfAccessor.close(dbAccessor);
}
} catch (final Exception e) {
log.error("Error while closing column family handles for store " + name, e);
}
cfAccessor = null;
}
if (dbAccessor != null) {
dbAccessor.close();
dbAccessor = null;
}
if (db != null) {
db.close();
db = null;
}
}

private void closeOptionsAndFilters() {
if (userSpecifiedOptions != null) {
userSpecifiedOptions.close();
userSpecifiedOptions = null;
}
if (wOptions != null) {
wOptions.close();
wOptions = null;
}
if (fOptions != null) {
fOptions.close();
fOptions = null;
}
if (filter != null) {
filter.close();
filter = null;
}
if (cache != null) {
cache.close();
cache = null;
}
if (statistics != null) {
statistics.close();
statistics = null;
}
}

private void closeOpenIterators() {
final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
synchronized (openIterators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,29 @@ void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1);
final ColumnFamilyHandle offsetsColumnFamily = columnFamilies.get(2);

final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily);
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsColumnFamily,
noTimestampColumnFamily,
withTimestampColumnFamily,
TimestampedBytesStore::convertToTimestampedFormat,
this, open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
noTimestampColumnFamily.close();
try (final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily)) {
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsColumnFamily,
noTimestampColumnFamily,
withTimestampColumnFamily,
TimestampedBytesStore::convertToTimestampedFormat,
this,
open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
noTimestampColumnFamily.close();
}
} catch (final RuntimeException e) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
throw e;
}
noTimestampsIter.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,18 @@ private void openFromDefaultStore(final DBOptions dbOptions,
headersCf,
HeadersBytesStore::convertFromPlainToHeaderFormat,
this,
open
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
defaultCf.close();
}
} catch (final RuntimeException e) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
throw e;
}
}

Expand All @@ -137,53 +142,55 @@ private void openFromTimestampedStore(final DBOptions dbOptions,
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
);

// verify and close empty Default ColumnFamily
try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) {
defaultIter.seekToFirst();
if (defaultIter.isValid()) {
// Close all column family handles before throwing
columnFamilies.get(0).close();
columnFamilies.get(1).close();
columnFamilies.get(2).close();
throw new ProcessorStateException(
"Inconsistent store state for " + name + ". " +
"Cannot have both plain (DEFAULT) and timestamped data simultaneously. " +
"Headers store can upgrade from either plain or timestamped format, but not both."
);
try {
// verify and close empty Default ColumnFamily
try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating here as well in the same way try with resources

defaultIter.seekToFirst();
if (defaultIter.isValid()) {
throw new ProcessorStateException(
"Inconsistent store state for " + name + ". " +
"Cannot have both plain (DEFAULT) and timestamped data simultaneously. " +
"Headers store can upgrade from either plain or timestamped format, but not both."
);
}
}
// close default column family handle
columnFamilies.get(0).close();
}

final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
final ColumnFamilyHandle headersCf = columnFamilies.get(2);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);


// Check if legacy timestamped CF has data
try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) {
legacyIter.seekToFirst();
if (legacyIter.isValid()) {
log.info("Opening store {} in upgrade mode from timestamped store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
legacyTimestampedCf,
headersCf,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
try {
db.dropColumnFamily(legacyTimestampedCf);
} catch (final RocksDBException e) {
throw new RuntimeException(e);
} finally {
legacyTimestampedCf.close();
final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
final ColumnFamilyHandle headersCf = columnFamilies.get(2);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);

// Check if legacy timestamped CF has data
try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) {
legacyIter.seekToFirst();
if (legacyIter.isValid()) {
log.info("Opening store {} in upgrade mode from timestamped store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
legacyTimestampedCf,
headersCf,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
try {
db.dropColumnFamily(legacyTimestampedCf);
} catch (final RocksDBException e) {
throw new RuntimeException(e);
} finally {
legacyTimestampedCf.close();
}
}
}
} catch (final RuntimeException e) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
throw e;
}
}

Expand Down
Loading