Skip to content

Commit 14e3683

Browse files
committed
fix
1 parent 617bb18 commit 14e3683

File tree

1 file changed

+61
-78
lines changed

1 file changed

+61
-78
lines changed

paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java

Lines changed: 61 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040
import java.util.HashMap;
4141
import java.util.List;
4242
import java.util.Map;
43-
import java.util.concurrent.ConcurrentSkipListMap;
44-
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.TreeMap;
4544

4645
/**
4746
* A simple LSM-Tree based KV database built on top of {@link SortLookupStoreFactory}.
@@ -80,10 +79,10 @@ public class SimpleLsmKvDb implements Closeable {
8079
private final LsmCompactor compactor;
8180

8281
/** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
83-
private ConcurrentSkipListMap<MemorySlice, byte[]> memTable;
82+
private TreeMap<MemorySlice, byte[]> memTable;
8483

8584
/** Estimated size of the current MemTable in bytes. */
86-
private final AtomicLong memTableSize;
85+
private long memTableSize;
8786

8887
/**
8988
* Multi-level SST file storage. Each level contains a list of {@link SstFileMetadata} ordered
@@ -95,8 +94,8 @@ public class SimpleLsmKvDb implements Closeable {
9594
/** Cached readers for SST files, keyed by file path. Lazily populated on first lookup. */
9695
private final Map<File, SortLookupStoreReader> readerCache;
9796

98-
private final AtomicLong fileSequence;
99-
private volatile boolean closed;
97+
private long fileSequence;
98+
private boolean closed;
10099

101100
private SimpleLsmKvDb(
102101
File dataDirectory,
@@ -109,14 +108,14 @@ private SimpleLsmKvDb(
109108
this.storeFactory = storeFactory;
110109
this.keyComparator = keyComparator;
111110
this.memTableFlushThreshold = memTableFlushThreshold;
112-
this.memTable = new ConcurrentSkipListMap<>(keyComparator);
113-
this.memTableSize = new AtomicLong(0);
111+
this.memTable = new TreeMap<>(keyComparator);
112+
this.memTableSize = 0;
114113
this.levels = new ArrayList<>();
115114
for (int i = 0; i < MAX_LEVELS; i++) {
116115
this.levels.add(new ArrayList<SstFileMetadata>());
117116
}
118117
this.readerCache = new HashMap<>();
119-
this.fileSequence = new AtomicLong(0);
118+
this.fileSequence = 0;
120119
this.closed = false;
121120
this.compactor =
122121
new LsmCompactor(
@@ -182,7 +181,7 @@ public void put(byte[] key, byte[] value) throws IOException {
182181
if (oldValue != null) {
183182
delta -= (key.length + oldValue.length);
184183
}
185-
memTableSize.addAndGet(delta);
184+
memTableSize += delta;
186185
maybeFlushMemTable();
187186
}
188187

@@ -209,7 +208,7 @@ public void delete(byte[] key) throws IOException {
209208
if (oldValue != null) {
210209
delta -= (key.length + oldValue.length);
211210
}
212-
memTableSize.addAndGet(delta);
211+
memTableSize += delta;
213212
maybeFlushMemTable();
214213
}
215214

@@ -246,32 +245,30 @@ public byte[] get(byte[] key) throws IOException {
246245
}
247246

248247
// 2. Search each level from L0 to Lmax
249-
synchronized (levels) {
250-
for (int level = 0; level < MAX_LEVELS; level++) {
251-
List<SstFileMetadata> levelFiles = levels.get(level);
252-
if (levelFiles.isEmpty()) {
253-
continue;
254-
}
248+
for (int level = 0; level < MAX_LEVELS; level++) {
249+
List<SstFileMetadata> levelFiles = levels.get(level);
250+
if (levelFiles.isEmpty()) {
251+
continue;
252+
}
255253

256-
if (level == 0) {
257-
// L0: files may have overlapping keys, search newest-first
258-
for (SstFileMetadata meta : levelFiles) {
259-
if (!meta.mightContainKey(wrappedKey, keyComparator)) {
260-
continue;
261-
}
262-
byte[] value = lookupInFile(meta.getFile(), key);
263-
if (value != null) {
264-
return LsmCompactor.isTombstone(value) ? null : value;
265-
}
254+
if (level == 0) {
255+
// L0: files may have overlapping keys, search newest-first
256+
for (SstFileMetadata meta : levelFiles) {
257+
if (!meta.mightContainKey(wrappedKey, keyComparator)) {
258+
continue;
266259
}
267-
} else {
268-
// L1+: files have non-overlapping key ranges, binary search
269-
SstFileMetadata target = findFileForKey(levelFiles, wrappedKey);
270-
if (target != null) {
271-
byte[] value = lookupInFile(target.getFile(), key);
272-
if (value != null) {
273-
return LsmCompactor.isTombstone(value) ? null : value;
274-
}
260+
byte[] value = lookupInFile(meta.getFile(), key);
261+
if (value != null) {
262+
return LsmCompactor.isTombstone(value) ? null : value;
263+
}
264+
}
265+
} else {
266+
// L1+: files have non-overlapping key ranges, binary search
267+
SstFileMetadata target = findFileForKey(levelFiles, wrappedKey);
268+
if (target != null) {
269+
byte[] value = lookupInFile(target.getFile(), key);
270+
if (value != null) {
271+
return LsmCompactor.isTombstone(value) ? null : value;
275272
}
276273
}
277274
}
@@ -303,24 +300,20 @@ public void flush() throws IOException {
303300
return;
304301
}
305302

306-
ConcurrentSkipListMap<MemorySlice, byte[]> snapshot = memTable;
307-
memTable = new ConcurrentSkipListMap<>(keyComparator);
308-
memTableSize.set(0);
303+
TreeMap<MemorySlice, byte[]> snapshot = memTable;
304+
memTable = new TreeMap<>(keyComparator);
305+
memTableSize = 0;
309306

310307
SstFileMetadata metadata = writeMemTableToSst(snapshot);
311308

312-
synchronized (levels) {
313-
levels.get(0).add(0, metadata);
314-
}
309+
levels.get(0).add(0, metadata);
315310

316311
LOG.info(
317312
"Flushed MemTable to L0 SST file: {}, entries: {}",
318313
metadata.getFile().getName(),
319314
snapshot.size());
320315

321-
synchronized (levels) {
322-
compactor.maybeCompact(levels, MAX_LEVELS, this::newSstFile);
323-
}
316+
compactor.maybeCompact(levels, MAX_LEVELS, this::newSstFile);
324317
}
325318

326319
/**
@@ -330,9 +323,7 @@ public void flush() throws IOException {
330323
*/
331324
public void compact() throws IOException {
332325
ensureOpen();
333-
synchronized (levels) {
334-
compactor.fullCompact(levels, MAX_LEVELS, this::newSstFile);
335-
}
326+
compactor.fullCompact(levels, MAX_LEVELS, this::newSstFile);
336327
}
337328

338329
// -------------------------------------------------------------------------
@@ -348,14 +339,12 @@ public void close() throws IOException {
348339

349340
// Flush remaining MemTable data to L0
350341
if (!memTable.isEmpty()) {
351-
ConcurrentSkipListMap<MemorySlice, byte[]> snapshot = memTable;
352-
memTable = new ConcurrentSkipListMap<>(keyComparator);
353-
memTableSize.set(0);
342+
TreeMap<MemorySlice, byte[]> snapshot = memTable;
343+
memTable = new TreeMap<>(keyComparator);
344+
memTableSize = 0;
354345

355346
SstFileMetadata metadata = writeMemTableToSst(snapshot);
356-
synchronized (levels) {
357-
levels.get(0).add(0, metadata);
358-
}
347+
levels.get(0).add(0, metadata);
359348
}
360349

361350
// Close all cached readers
@@ -373,53 +362,47 @@ public void close() throws IOException {
373362

374363
/** Return the total number of SST files across all levels. */
375364
public int getSstFileCount() {
376-
synchronized (levels) {
377-
int count = 0;
378-
for (List<SstFileMetadata> levelFiles : levels) {
379-
count += levelFiles.size();
380-
}
381-
return count;
365+
int count = 0;
366+
for (List<SstFileMetadata> levelFiles : levels) {
367+
count += levelFiles.size();
382368
}
369+
return count;
383370
}
384371

385372
/** Return the number of SST files at a specific level. */
386373
public int getLevelFileCount(int level) {
387-
synchronized (levels) {
388-
if (level < 0 || level >= MAX_LEVELS) {
389-
return 0;
390-
}
391-
return levels.get(level).size();
374+
if (level < 0 || level >= MAX_LEVELS) {
375+
return 0;
392376
}
377+
return levels.get(level).size();
393378
}
394379

395380
/** Return the estimated MemTable size in bytes. */
396381
public long getMemTableSize() {
397-
return memTableSize.get();
382+
return memTableSize;
398383
}
399384

400385
/** Return a human-readable summary of file counts per level. */
401386
public String getLevelStats() {
402-
synchronized (levels) {
403-
StringBuilder sb = new StringBuilder();
404-
for (int i = 0; i < MAX_LEVELS; i++) {
405-
int count = levels.get(i).size();
406-
if (count > 0) {
407-
if (sb.length() > 0) {
408-
sb.append(", ");
409-
}
410-
sb.append("L").append(i).append("=").append(count);
387+
StringBuilder sb = new StringBuilder();
388+
for (int i = 0; i < MAX_LEVELS; i++) {
389+
int count = levels.get(i).size();
390+
if (count > 0) {
391+
if (sb.length() > 0) {
392+
sb.append(", ");
411393
}
394+
sb.append("L").append(i).append("=").append(count);
412395
}
413-
return sb.length() == 0 ? "empty" : sb.toString();
414396
}
397+
return sb.length() == 0 ? "empty" : sb.toString();
415398
}
416399

417400
// -------------------------------------------------------------------------
418401
// Internal Helpers
419402
// -------------------------------------------------------------------------
420403

421404
private void maybeFlushMemTable() throws IOException {
422-
if (memTableSize.get() >= memTableFlushThreshold) {
405+
if (memTableSize >= memTableFlushThreshold) {
423406
flush();
424407
}
425408
}
@@ -452,7 +435,7 @@ private SstFileMetadata findFileForKey(List<SstFileMetadata> sortedFiles, Memory
452435
return null;
453436
}
454437

455-
private SstFileMetadata writeMemTableToSst(ConcurrentSkipListMap<MemorySlice, byte[]> data)
438+
private SstFileMetadata writeMemTableToSst(TreeMap<MemorySlice, byte[]> data)
456439
throws IOException {
457440
File sstFile = newSstFile();
458441
SortLookupStoreWriter writer = storeFactory.createWriter(sstFile, null);
@@ -477,7 +460,7 @@ private SstFileMetadata writeMemTableToSst(ConcurrentSkipListMap<MemorySlice, by
477460
}
478461

479462
private File newSstFile() {
480-
long sequence = fileSequence.getAndIncrement();
463+
long sequence = fileSequence++;
481464
return new File(dataDirectory, String.format("sst-%06d.db", sequence));
482465
}
483466

0 commit comments

Comments
 (0)