Skip to content

Commit 097d221

Browse files
authored
[server] Added thread-safety to CoordinatorEventProcessor for Completed Snapshots (#2221)
1 parent 6a2586f commit 097d221

File tree

2 files changed

+382
-28
lines changed

2 files changed

+382
-28
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import javax.annotation.concurrent.ThreadSafe;
30+
2931
import java.util.ArrayDeque;
3032
import java.util.ArrayList;
3133
import java.util.Collection;
@@ -35,8 +37,10 @@
3537
import java.util.List;
3638
import java.util.Optional;
3739
import java.util.concurrent.Executor;
40+
import java.util.concurrent.locks.ReentrantLock;
3841

3942
import static org.apache.fluss.utils.Preconditions.checkNotNull;
43+
import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
4044

4145
/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache
4246
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -47,6 +51,7 @@
4751
* managing the completed snapshots including store/subsume/get completed snapshots for single one
4852
* table bucket.
4953
*/
54+
@ThreadSafe
5055
public class CompletedSnapshotStore {
5156

5257
private static final Logger LOG = LoggerFactory.getLogger(CompletedSnapshotStore.class);
@@ -62,6 +67,8 @@ public class CompletedSnapshotStore {
6267
private final Executor ioExecutor;
6368
private final SnapshotsCleaner snapshotsCleaner;
6469

70+
private final ReentrantLock lock = new ReentrantLock();
71+
6572
/**
6673
* Local copy of the completed snapshots in snapshot store. This is restored from snapshot
6774
* handel store when recovering.
@@ -84,15 +91,19 @@ public CompletedSnapshotStore(
8491
}
8592

8693
public void add(final CompletedSnapshot completedSnapshot) throws Exception {
87-
addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, () -> {});
94+
inLock(
95+
lock,
96+
() ->
97+
addSnapshotAndSubsumeOldestOne(
98+
completedSnapshot, snapshotsCleaner, () -> {}));
8899
}
89100

90101
public long getPhysicalStorageRemoteKvSize() {
91102
return sharedKvFileRegistry.getFileSize();
92103
}
93104

94105
public long getNumSnapshots() {
95-
return completedSnapshots.size();
106+
return inLock(lock, () -> completedSnapshots.size());
96107
}
97108

98109
/**
@@ -117,34 +128,43 @@ CompletedSnapshot addSnapshotAndSubsumeOldestOne(
117128
snapshot.getTableBucket(), snapshot.getSnapshotID(), completedSnapshotHandle);
118129

119130
// Now add the new one. If it fails, we don't want to lose existing data.
120-
completedSnapshots.addLast(snapshot);
121-
122-
// Remove completed snapshot from queue and snapshotStateHandleStore, not discard.
123-
Optional<CompletedSnapshot> subsume =
124-
subsume(
125-
completedSnapshots,
126-
maxNumberOfSnapshotsToRetain,
127-
completedSnapshot -> {
128-
remove(
129-
completedSnapshot.getTableBucket(),
130-
completedSnapshot.getSnapshotID());
131-
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
132-
});
133-
134-
findLowest(completedSnapshots)
135-
.ifPresent(
136-
id -> {
137-
// unregister the unused kv file, which will then cause the kv file
138-
// deletion
139-
sharedKvFileRegistry.unregisterUnusedKvFile(id);
140-
snapshotsCleaner.cleanSubsumedSnapshots(
141-
id, Collections.emptySet(), postCleanup, ioExecutor);
142-
});
143-
return subsume.orElse(null);
131+
return inLock(
132+
lock,
133+
() -> {
134+
completedSnapshots.addLast(snapshot);
135+
136+
// Remove completed snapshot from queue and snapshotStateHandleStore, not
137+
// discard.
138+
Optional<CompletedSnapshot> subsume =
139+
subsume(
140+
completedSnapshots,
141+
maxNumberOfSnapshotsToRetain,
142+
completedSnapshot -> {
143+
remove(
144+
completedSnapshot.getTableBucket(),
145+
completedSnapshot.getSnapshotID());
146+
snapshotsCleaner.addSubsumedSnapshot(completedSnapshot);
147+
});
148+
149+
findLowest(completedSnapshots)
150+
.ifPresent(
151+
id -> {
152+
// unregister the unused kv file, which will then cause the
153+
// kv file
154+
// deletion
155+
sharedKvFileRegistry.unregisterUnusedKvFile(id);
156+
snapshotsCleaner.cleanSubsumedSnapshots(
157+
id,
158+
Collections.emptySet(),
159+
postCleanup,
160+
ioExecutor);
161+
});
162+
return subsume.orElse(null);
163+
});
144164
}
145165

146166
public List<CompletedSnapshot> getAllSnapshots() {
147-
return new ArrayList<>(completedSnapshots);
167+
return inLock(lock, () -> new ArrayList<>(completedSnapshots));
148168
}
149169

150170
private static Optional<CompletedSnapshot> subsume(
@@ -211,7 +231,7 @@ protected static Optional<Long> findLowest(Deque<CompletedSnapshot> unSubsumedSn
211231
* added.
212232
*/
213233
public Optional<CompletedSnapshot> getLatestSnapshot() {
214-
return Optional.ofNullable(completedSnapshots.peekLast());
234+
return inLock(lock, () -> Optional.ofNullable(completedSnapshots.peekLast()));
215235
}
216236

217237
/**

0 commit comments

Comments
 (0)