Skip to content

Commit d3b6bfb

Browse files
fix concurrency issue
1 parent 53ce847 commit d3b6bfb

File tree

4 files changed

+75
-54
lines changed

4 files changed

+75
-54
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -275,29 +275,44 @@ public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
275275
new MeterView(kvPreWriteBuffer.getTruncateAsErrorCount()));
276276

277277
// Initialize RocksDB metrics reporter
278-
if (rocksDBMetricsCollector == null && !isClosed) {
279-
try {
280-
// Get RocksDB Statistics from options container
281-
org.rocksdb.Statistics statistics = rocksDBKv.getOptionsContainer().getStatistics();
282-
if (statistics != null) {
283-
// Set server metric group in manager if not already set
284-
RocksDBMetricsManager.getInstance().setServerMetricGroup(serverMetricGroup);
285-
286-
rocksDBMetricsCollector =
287-
new RocksDBMetricsCollector(
288-
rocksDBKv.getDb(),
289-
statistics,
290-
tableBucket,
291-
physicalPath.getTablePath(),
292-
physicalPath.getPartitionName(),
293-
rocksDBKv.getOptionsContainer());
278+
// Use double-check locking pattern to ensure thread-safe initialization
279+
if (rocksDBMetricsCollector == null) {
280+
synchronized (this) {
281+
// Double-check after acquiring lock
282+
if (rocksDBMetricsCollector == null) {
283+
// Check if closed while holding lock
284+
boolean closed;
285+
synchronized (kvLock) {
286+
closed = isClosed;
287+
}
288+
if (!closed) {
289+
try {
290+
// Get RocksDB Statistics from options container
291+
org.rocksdb.Statistics statistics =
292+
rocksDBKv.getOptionsContainer().getStatistics();
293+
if (statistics != null) {
294+
// Set server metric group in manager if not already set
295+
RocksDBMetricsManager.getInstance()
296+
.setServerMetricGroup(serverMetricGroup);
297+
298+
rocksDBMetricsCollector =
299+
new RocksDBMetricsCollector(
300+
rocksDBKv.getDb(),
301+
statistics,
302+
tableBucket,
303+
physicalPath.getTablePath(),
304+
physicalPath.getPartitionName(),
305+
rocksDBKv.getOptionsContainer());
306+
}
307+
} catch (Exception e) {
308+
LOG.warn(
309+
"Failed to initialize RocksDB metrics reporter for table {} bucket {}",
310+
physicalPath,
311+
tableBucket.getBucket(),
312+
e);
313+
}
314+
}
294315
}
295-
} catch (Exception e) {
296-
LOG.warn(
297-
"Failed to initialize RocksDB metrics reporter for table {} bucket {}",
298-
physicalPath,
299-
tableBucket.getBucket(),
300-
e);
301316
}
302317
}
303318
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetricsCollector.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -329,30 +329,38 @@ private long computeBlockCachePinnedUsage() {
329329

330330
@Override
331331
public void close() {
332-
LOG.info(
333-
"Closing RocksDB metrics collector for table {} bucket {}.",
334-
tablePath,
335-
tableBucket.getBucket());
336-
337-
// Set cleanup in progress flag to prevent new operations
338-
cleanupInProgress = true;
339-
340-
// Set closed flag to prevent new operations
341-
closed = true;
332+
// Use synchronized block to ensure thread-safe close operation
333+
synchronized (this) {
334+
// Check if already closed to prevent double closing
335+
if (closed) {
336+
return;
337+
}
342338

343-
// Unregister from global collector
344-
if (registered) {
345-
try {
346-
RocksDBMetricsManager.getInstance().unregisterCollector(this);
347-
} catch (Exception e) {
348-
LOG.warn("Error unregistering collector from manager: {}", e.getMessage());
349-
} finally {
350-
registered = false;
339+
LOG.info(
340+
"Closing RocksDB metrics collector for table {} bucket {}.",
341+
tablePath,
342+
tableBucket.getBucket());
343+
344+
// Set cleanup in progress flag to prevent new operations
345+
cleanupInProgress = true;
346+
347+
// Set closed flag to prevent new operations
348+
closed = true;
349+
350+
// Unregister from global collector
351+
if (registered) {
352+
try {
353+
RocksDBMetricsManager.getInstance().unregisterCollector(this);
354+
} catch (Exception e) {
355+
LOG.warn("Error unregistering collector from manager: {}", e.getMessage());
356+
} finally {
357+
registered = false;
358+
}
351359
}
352-
}
353360

354-
// Signal cleanup completion
355-
cleanupLatch.countDown();
361+
// Signal cleanup completion
362+
cleanupLatch.countDown();
363+
}
356364
}
357365

358366
public TableBucket getTableBucket() {

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBMetricsManager.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,14 @@ public void unregisterCollector(RocksDBMetricsCollector collector) {
154154

155155
synchronized (collectorLock) {
156156
String key = createCollectorKey(collector);
157-
synchronized (key.intern()) {
158-
RocksDBMetricsCollector removed = collectors.remove(key);
159-
if (removed != null) {
160-
LOG.debug(
161-
"Unregistered RocksDB metrics collector for {}, remaining collectors: {}",
162-
key,
163-
collectors.size());
164-
} else {
165-
LOG.debug("Collector {} was not found in registry", key);
166-
}
157+
RocksDBMetricsCollector removed = collectors.remove(key);
158+
if (removed != null) {
159+
LOG.debug(
160+
"Unregistered RocksDB metrics collector for {}, remaining collectors: {}",
161+
key,
162+
collectors.size());
163+
} else {
164+
LOG.debug("Collector {} was not found in registry", key);
167165
}
168166
}
169167
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
303303

304304
// Create explicit cache instance instead of using setBlockCacheSize
305305
long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
306-
if (blockCacheSize > 0) {
306+
if (blockCacheSize > 0 && blockCache != null) {
307307
blockBasedTableConfig.setBlockCache(blockCache);
308308
}
309309

0 commit comments

Comments
 (0)