Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,34 @@ void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyHandle withHeadersColumnFamily = columnFamilies.get(1);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);

final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily);
noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
withHeadersColumnFamily,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
boolean success = false;
try {
try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating here with try-with resources

noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
withHeadersColumnFamily,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
}
}
success = true;
} finally {
if (!success) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
}
}
noHeadersIter.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,23 @@ void openDB(final Map<String, Object> configs, final File stateDir) {
// Setup statistics before the database is opened, otherwise the statistics are not updated
// with the measurements from Rocks DB
setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
boolean success = false;
try {
cfAccessor.open(dbAccessor, !eosEnabled);
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
openRocksDB(dbOptions, columnFamilyOptions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method internally calls createColumnFamilies() or mergeColumnFamilyHandleLists() which might fail

dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
cfAccessor.open(dbAccessor, !eosEnabled);
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
}
success = true;
} finally {
if (!success) {
closeNativeResources();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if openRocksDB() or cfAccessor.open() fail

}
}

addValueProvidersToMetricsRecorder();
Expand Down Expand Up @@ -359,10 +367,27 @@ protected List<ColumnFamilyHandle> openRocksDB(final DBOptions dbOptions,
.filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName())))
.collect(Collectors.toList());
final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size());
final List<ColumnFamilyHandle> createdColumnFamilies = new ArrayList<>();
db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies);
final List<ColumnFamilyHandle> createdColumnFamilies = db.createColumnFamilies(toCreate);

return mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
boolean openSuccess = false;
try {
createdColumnFamilies.addAll(db.createColumnFamilies(toCreate));
final List<ColumnFamilyHandle> result =
mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
openSuccess = true;
return result;
} finally {
if (!openSuccess) {
for (final ColumnFamilyHandle handle : existingColumnFamilies) {
handle.close();
}
for (final ColumnFamilyHandle handle : createdColumnFamilies) {
handle.close();
}
db.close();
db = null;
}
}

} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
Expand Down Expand Up @@ -776,6 +801,64 @@ public synchronized void close() {
statistics = null;
}

/**
* Close all native RocksDB resources with null-safety.
* Used only by the error cleanup path in {@link #openDB} where some resources
* may not have been initialized yet.
*/
private void closeNativeResources() {
closeDbAndAccessors();
closeOptionsAndFilters();
}

private void closeDbAndAccessors() {
if (cfAccessor != null) {
try {
if (dbAccessor != null) {
cfAccessor.close(dbAccessor);
}
} catch (final Exception e) {
log.error("Error while closing column family handles for store " + name, e);
}
cfAccessor = null;
}
if (dbAccessor != null) {
dbAccessor.close();
dbAccessor = null;
}
if (db != null) {
db.close();
db = null;
}
}

private void closeOptionsAndFilters() {
if (userSpecifiedOptions != null) {
userSpecifiedOptions.close();
userSpecifiedOptions = null;
}
if (wOptions != null) {
wOptions.close();
wOptions = null;
}
if (fOptions != null) {
fOptions.close();
fOptions = null;
}
if (filter != null) {
filter.close();
filter = null;
}
if (cache != null) {
cache.close();
cache = null;
}
if (statistics != null) {
statistics.close();
statistics = null;
}
}

private void closeOpenIterators() {
final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
synchronized (openIterators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,33 @@ void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1);
final ColumnFamilyHandle offsetsColumnFamily = columnFamilies.get(2);

final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily);
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsColumnFamily,
noTimestampColumnFamily,
withTimestampColumnFamily,
TimestampedBytesStore::convertToTimestampedFormat,
this, open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
noTimestampColumnFamily.close();
boolean success = false;
try {
try (final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating with try-resources

noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsColumnFamily,
noTimestampColumnFamily,
withTimestampColumnFamily,
TimestampedBytesStore::convertToTimestampedFormat,
this, open
);
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
noTimestampColumnFamily.close();
}
}
success = true;
} finally {
if (!success) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
}
}
noTimestampsIter.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,33 @@ private void openFromDefaultStore(final DBOptions dbOptions,
final ColumnFamilyHandle headersCf = columnFamilies.get(1);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);

// Check if default CF has data (plain store upgrade)
try (final RocksIterator defaultIter = db.newIterator(defaultCf)) {
defaultIter.seekToFirst();
if (defaultIter.isValid()) {
log.info("Opening store {} in upgrade mode from plain key value store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
defaultCf,
headersCf,
HeadersBytesStore::convertFromPlainToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
defaultCf.close();
boolean success = false;
try {
// Check if default CF has data (plain store upgrade)
try (final RocksIterator defaultIter = db.newIterator(defaultCf)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also updating with try-with resources

defaultIter.seekToFirst();
if (defaultIter.isValid()) {
log.info("Opening store {} in upgrade mode from plain key value store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
defaultCf,
headersCf,
HeadersBytesStore::convertFromPlainToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
defaultCf.close();
}
}
success = true;
} finally {
if (!success) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
}
}
}
Expand All @@ -137,51 +147,56 @@ private void openFromTimestampedStore(final DBOptions dbOptions,
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
);

// verify and close empty Default ColumnFamily
try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) {
defaultIter.seekToFirst();
if (defaultIter.isValid()) {
// Close all column family handles before throwing
columnFamilies.get(0).close();
columnFamilies.get(1).close();
columnFamilies.get(2).close();
throw new ProcessorStateException(
"Inconsistent store state for " + name + ". " +
"Cannot have both plain (DEFAULT) and timestamped data simultaneously. " +
"Headers store can upgrade from either plain or timestamped format, but not both."
);
boolean success = false;
try {
// verify and close empty Default ColumnFamily
try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating here as well in the same way try with resources

defaultIter.seekToFirst();
if (defaultIter.isValid()) {
throw new ProcessorStateException(
"Inconsistent store state for " + name + ". " +
"Cannot have both plain (DEFAULT) and timestamped data simultaneously. " +
"Headers store can upgrade from either plain or timestamped format, but not both."
);
}
}
// close default column family handle
columnFamilies.get(0).close();
}

final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
final ColumnFamilyHandle headersCf = columnFamilies.get(2);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);


// Check if legacy timestamped CF has data
try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) {
legacyIter.seekToFirst();
if (legacyIter.isValid()) {
log.info("Opening store {} in upgrade mode from timestamped store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
legacyTimestampedCf,
headersCf,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
try {
db.dropColumnFamily(legacyTimestampedCf);
} catch (final RocksDBException e) {
throw new RuntimeException(e);
} finally {
legacyTimestampedCf.close();
final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
final ColumnFamilyHandle headersCf = columnFamilies.get(2);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);

// Check if legacy timestamped CF has data
try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) {
legacyIter.seekToFirst();
if (legacyIter.isValid()) {
log.info("Opening store {} in upgrade mode from timestamped store", name);
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
legacyTimestampedCf,
headersCf,
HeadersBytesStore::convertToHeaderFormat,
this,
open
);
} else {
log.info("Opening store {} in regular headers-aware mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf);
try {
db.dropColumnFamily(legacyTimestampedCf);
} catch (final RocksDBException e) {
throw new RuntimeException(e);
} finally {
legacyTimestampedCf.close();
}
}
}
success = true;
} finally {
if (!success) {
for (final ColumnFamilyHandle handle : columnFamilies) {
handle.close();
}
}
}
Expand Down