diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java index a54a846cb459d..f4d782ec9d8a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java @@ -67,24 +67,34 @@ void openRocksDB(final DBOptions dbOptions, final ColumnFamilyHandle withHeadersColumnFamily = columnFamilies.get(1); final ColumnFamilyHandle offsetsCf = columnFamilies.get(2); - final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily); - noHeadersIter.seekToFirst(); - if (noHeadersIter.isValid()) { - log.info("Opening store {} in upgrade mode", name); - cfAccessor = new DualColumnFamilyAccessor( - offsetsCf, - noHeadersColumnFamily, - withHeadersColumnFamily, - HeadersBytesStore::convertToHeaderFormat, - this, - open - ); - } else { - log.info("Opening store {} in regular mode", name); - cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily); - noHeadersColumnFamily.close(); + boolean success = false; + try { + try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) { + noHeadersIter.seekToFirst(); + if (noHeadersIter.isValid()) { + log.info("Opening store {} in upgrade mode", name); + cfAccessor = new DualColumnFamilyAccessor( + offsetsCf, + noHeadersColumnFamily, + withHeadersColumnFamily, + HeadersBytesStore::convertToHeaderFormat, + this, + open + ); + } else { + log.info("Opening store {} in regular mode", name); + cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily); + noHeadersColumnFamily.close(); + } + } + success = true; + } finally { + if (!success) { + for (final ColumnFamilyHandle handle : columnFamilies) { + handle.close(); + } + } } - noHeadersIter.close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index c2628a863af4c..d766b54d5f798 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -249,15 +249,23 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB setupStatistics(configs, dbOptions); - openRocksDB(dbOptions, columnFamilyOptions); - dbAccessor = new DirectDBAccessor(db, fOptions, wOptions); + boolean success = false; try { - cfAccessor.open(dbAccessor, !eosEnabled); - } catch (final StreamsException fatal) { - final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores"; - throw new ProcessorStateException(fatalMessage, fatal); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error opening store " + name, e); + openRocksDB(dbOptions, columnFamilyOptions); + dbAccessor = new DirectDBAccessor(db, fOptions, wOptions); + try { + cfAccessor.open(dbAccessor, !eosEnabled); + } catch (final StreamsException fatal) { + final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores"; + throw new ProcessorStateException(fatalMessage, fatal); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error opening store " + name, e); + } + success = true; + } finally { + if (!success) { + closeNativeResources(); + } } addValueProvidersToMetricsRecorder(); @@ -359,10 +367,27 @@ protected List openRocksDB(final DBOptions dbOptions, .filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName()))) .collect(Collectors.toList()); final List existingColumnFamilies = new ArrayList<>(existingDescriptors.size()); + final List createdColumnFamilies = new ArrayList<>(); db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies); - final List createdColumnFamilies = db.createColumnFamilies(toCreate); - - return mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors); + boolean openSuccess = false; + try { + createdColumnFamilies.addAll(db.createColumnFamilies(toCreate)); + final List result = + mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors); + openSuccess = true; + return result; + } finally { + if (!openSuccess) { + for (final ColumnFamilyHandle handle : existingColumnFamilies) { + handle.close(); + } + for (final ColumnFamilyHandle handle : createdColumnFamilies) { + handle.close(); + } + db.close(); + db = null; + } + } } catch (final RocksDBException e) { throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); @@ -776,6 +801,64 @@ public synchronized void close() { statistics = null; } + /** + * Close all native RocksDB resources with null-safety. + * Used only by the error cleanup path in {@link #openDB} where some resources + * may not have been initialized yet. + */ + private void closeNativeResources() { + closeDbAndAccessors(); + closeOptionsAndFilters(); + } + + private void closeDbAndAccessors() { + if (cfAccessor != null) { + try { + if (dbAccessor != null) { + cfAccessor.close(dbAccessor); + } + } catch (final Exception e) { + log.error("Error while closing column family handles for store " + name, e); + } + cfAccessor = null; + } + if (dbAccessor != null) { + dbAccessor.close(); + dbAccessor = null; + } + if (db != null) { + db.close(); + db = null; + } + } + + private void closeOptionsAndFilters() { + if (userSpecifiedOptions != null) { + userSpecifiedOptions.close(); + userSpecifiedOptions = null; + } + if (wOptions != null) { + wOptions.close(); + wOptions = null; + } + if (fOptions != null) { + fOptions.close(); + fOptions = null; + } + if (filter != null) { + filter.close(); + filter = null; + } + if (cache != null) { + cache.close(); + cache = null; + } + if (statistics != null) { + statistics.close(); + statistics = null; + } + } + private void closeOpenIterators() { final HashSet> iterators; synchronized (openIterators) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index b7f8fd64f3b10..f9faf594ab18f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -63,23 +63,33 @@ void openRocksDB(final DBOptions dbOptions, final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1); final ColumnFamilyHandle offsetsColumnFamily = columnFamilies.get(2); - final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); - noTimestampsIter.seekToFirst(); - if (noTimestampsIter.isValid()) { - log.info("Opening store {} in upgrade mode", name); - cfAccessor = new DualColumnFamilyAccessor( - offsetsColumnFamily, - noTimestampColumnFamily, - withTimestampColumnFamily, - TimestampedBytesStore::convertToTimestampedFormat, - this, open - ); - } else { - log.info("Opening store {} in regular mode", name); - cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily); - noTimestampColumnFamily.close(); + boolean success = false; + try { + try (final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily)) { + noTimestampsIter.seekToFirst(); + if (noTimestampsIter.isValid()) { + log.info("Opening store {} in upgrade mode", name); + cfAccessor = new DualColumnFamilyAccessor( + offsetsColumnFamily, + noTimestampColumnFamily, + withTimestampColumnFamily, + TimestampedBytesStore::convertToTimestampedFormat, + this, open + ); + } else { + log.info("Opening store {} in regular mode", name); + cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily); + noTimestampColumnFamily.close(); + } + } + success = true; + } finally { + if (!success) { + for (final ColumnFamilyHandle handle : columnFamilies) { + handle.close(); + } + } } - noTimestampsIter.close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java index ea8ff9bc91764..2d86ae39f1e44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java @@ -105,23 +105,33 @@ private void openFromDefaultStore(final DBOptions dbOptions, final ColumnFamilyHandle headersCf = columnFamilies.get(1); final ColumnFamilyHandle offsetsCf = columnFamilies.get(2); - // Check if default CF has data (plain store upgrade) - try (final RocksIterator defaultIter = db.newIterator(defaultCf)) { - defaultIter.seekToFirst(); - if (defaultIter.isValid()) { - log.info("Opening store {} in upgrade mode from plain key value store", name); - cfAccessor = new DualColumnFamilyAccessor( - offsetsCf, - defaultCf, - headersCf, - HeadersBytesStore::convertFromPlainToHeaderFormat, - this, - open - ); - } else { - log.info("Opening store {} in regular headers-aware mode", name); - cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf); - defaultCf.close(); + boolean success = false; + try { + // Check if default CF has data (plain store upgrade) + try (final RocksIterator defaultIter = db.newIterator(defaultCf)) { + defaultIter.seekToFirst(); + if (defaultIter.isValid()) { + log.info("Opening store {} in upgrade mode from plain key value store", name); + cfAccessor = new DualColumnFamilyAccessor( + offsetsCf, + defaultCf, + headersCf, + HeadersBytesStore::convertFromPlainToHeaderFormat, + this, + open + ); + } else { + log.info("Opening store {} in regular headers-aware mode", name); + cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf); + defaultCf.close(); + } + } + success = true; + } finally { + if (!success) { + for (final ColumnFamilyHandle handle : columnFamilies) { + handle.close(); + } } } } @@ -137,51 +147,56 @@ private void openFromTimestampedStore(final DBOptions dbOptions, new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions) ); - // verify and close empty Default ColumnFamily - try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) { - defaultIter.seekToFirst(); - if (defaultIter.isValid()) { - // Close all column family handles before throwing - columnFamilies.get(0).close(); - columnFamilies.get(1).close(); - columnFamilies.get(2).close(); - throw new ProcessorStateException( - "Inconsistent store state for " + name + ". " + - "Cannot have both plain (DEFAULT) and timestamped data simultaneously. " + - "Headers store can upgrade from either plain or timestamped format, but not both." - ); + boolean success = false; + try { + // verify and close empty Default ColumnFamily + try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) { + defaultIter.seekToFirst(); + if (defaultIter.isValid()) { + throw new ProcessorStateException( + "Inconsistent store state for " + name + ". " + + "Cannot have both plain (DEFAULT) and timestamped data simultaneously. " + + "Headers store can upgrade from either plain or timestamped format, but not both." + ); + } } // close default column family handle columnFamilies.get(0).close(); - } - final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1); - final ColumnFamilyHandle headersCf = columnFamilies.get(2); - final ColumnFamilyHandle offsetsCf = columnFamilies.get(3); - - - // Check if legacy timestamped CF has data - try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) { - legacyIter.seekToFirst(); - if (legacyIter.isValid()) { - log.info("Opening store {} in upgrade mode from timestamped store", name); - cfAccessor = new DualColumnFamilyAccessor( - offsetsCf, - legacyTimestampedCf, - headersCf, - HeadersBytesStore::convertToHeaderFormat, - this, - open - ); - } else { - log.info("Opening store {} in regular headers-aware mode", name); - cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf); - try { - db.dropColumnFamily(legacyTimestampedCf); - } catch (final RocksDBException e) { - throw new RuntimeException(e); - } finally { - legacyTimestampedCf.close(); + final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1); + final ColumnFamilyHandle headersCf = columnFamilies.get(2); + final ColumnFamilyHandle offsetsCf = columnFamilies.get(3); + + // Check if legacy timestamped CF has data + try (final RocksIterator legacyIter = db.newIterator(legacyTimestampedCf)) { + legacyIter.seekToFirst(); + if (legacyIter.isValid()) { + log.info("Opening store {} in upgrade mode from timestamped store", name); + cfAccessor = new DualColumnFamilyAccessor( + offsetsCf, + legacyTimestampedCf, + headersCf, + HeadersBytesStore::convertToHeaderFormat, + this, + open + ); + } else { + log.info("Opening store {} in regular headers-aware mode", name); + cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, headersCf); + try { + db.dropColumnFamily(legacyTimestampedCf); + } catch (final RocksDBException e) { + throw new RuntimeException(e); + } finally { + legacyTimestampedCf.close(); + } + } + } + success = true; + } finally { + if (!success) { + for (final ColumnFamilyHandle handle : columnFamilies) { + handle.close(); } } }