2626import org .slf4j .Logger ;
2727import org .slf4j .LoggerFactory ;
2828
29+ import javax .annotation .concurrent .ThreadSafe ;
30+
2931import java .util .ArrayDeque ;
3032import java .util .ArrayList ;
3133import java .util .Collection ;
3537import java .util .List ;
3638import java .util .Optional ;
3739import java .util .concurrent .Executor ;
40+ import java .util .concurrent .locks .ReentrantLock ;
3841
3942import static org .apache .fluss .utils .Preconditions .checkNotNull ;
43+ import static org .apache .fluss .utils .concurrent .LockUtils .inLock ;
4044
4145/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache
4246 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
4751 * managing the completed snapshots including store/subsume/get completed snapshots for single one
4852 * table bucket.
4953 */
54+ @ ThreadSafe
5055public class CompletedSnapshotStore {
5156
5257 private static final Logger LOG = LoggerFactory .getLogger (CompletedSnapshotStore .class );
@@ -62,6 +67,8 @@ public class CompletedSnapshotStore {
6267 private final Executor ioExecutor ;
6368 private final SnapshotsCleaner snapshotsCleaner ;
6469
70+ private final ReentrantLock lock = new ReentrantLock ();
71+
6572 /**
6673 * Local copy of the completed snapshots in snapshot store. This is restored from snapshot
6774 * handel store when recovering.
@@ -84,15 +91,19 @@ public CompletedSnapshotStore(
8491 }
8592
8693 public void add (final CompletedSnapshot completedSnapshot ) throws Exception {
87- addSnapshotAndSubsumeOldestOne (completedSnapshot , snapshotsCleaner , () -> {});
94+ inLock (
95+ lock ,
96+ () ->
97+ addSnapshotAndSubsumeOldestOne (
98+ completedSnapshot , snapshotsCleaner , () -> {}));
8899 }
89100
90101 public long getPhysicalStorageRemoteKvSize () {
91102 return sharedKvFileRegistry .getFileSize ();
92103 }
93104
94105 public long getNumSnapshots () {
95- return completedSnapshots .size ();
106+ return inLock ( lock , () -> completedSnapshots .size () );
96107 }
97108
98109 /**
@@ -117,34 +128,43 @@ CompletedSnapshot addSnapshotAndSubsumeOldestOne(
117128 snapshot .getTableBucket (), snapshot .getSnapshotID (), completedSnapshotHandle );
118129
119130 // Now add the new one. If it fails, we don't want to lose existing data.
120- completedSnapshots .addLast (snapshot );
121-
122- // Remove completed snapshot from queue and snapshotStateHandleStore, not discard.
123- Optional <CompletedSnapshot > subsume =
124- subsume (
125- completedSnapshots ,
126- maxNumberOfSnapshotsToRetain ,
127- completedSnapshot -> {
128- remove (
129- completedSnapshot .getTableBucket (),
130- completedSnapshot .getSnapshotID ());
131- snapshotsCleaner .addSubsumedSnapshot (completedSnapshot );
132- });
133-
134- findLowest (completedSnapshots )
135- .ifPresent (
136- id -> {
137- // unregister the unused kv file, which will then cause the kv file
138- // deletion
139- sharedKvFileRegistry .unregisterUnusedKvFile (id );
140- snapshotsCleaner .cleanSubsumedSnapshots (
141- id , Collections .emptySet (), postCleanup , ioExecutor );
142- });
143- return subsume .orElse (null );
131+ return inLock (
132+ lock ,
133+ () -> {
134+ completedSnapshots .addLast (snapshot );
135+
136+ // Remove completed snapshot from queue and snapshotStateHandleStore, not
137+ // discard.
138+ Optional <CompletedSnapshot > subsume =
139+ subsume (
140+ completedSnapshots ,
141+ maxNumberOfSnapshotsToRetain ,
142+ completedSnapshot -> {
143+ remove (
144+ completedSnapshot .getTableBucket (),
145+ completedSnapshot .getSnapshotID ());
146+ snapshotsCleaner .addSubsumedSnapshot (completedSnapshot );
147+ });
148+
149+ findLowest (completedSnapshots )
150+ .ifPresent (
151+ id -> {
152+ // unregister the unused kv file, which will then cause the
153+ // kv file
154+ // deletion
155+ sharedKvFileRegistry .unregisterUnusedKvFile (id );
156+ snapshotsCleaner .cleanSubsumedSnapshots (
157+ id ,
158+ Collections .emptySet (),
159+ postCleanup ,
160+ ioExecutor );
161+ });
162+ return subsume .orElse (null );
163+ });
144164 }
145165
146166 public List <CompletedSnapshot > getAllSnapshots () {
147- return new ArrayList <>(completedSnapshots );
167+ return inLock ( lock , () -> new ArrayList <>(completedSnapshots ) );
148168 }
149169
150170 private static Optional <CompletedSnapshot > subsume (
@@ -211,7 +231,7 @@ protected static Optional<Long> findLowest(Deque<CompletedSnapshot> unSubsumedSn
211231 * added.
212232 */
213233 public Optional <CompletedSnapshot > getLatestSnapshot () {
214- return Optional .ofNullable (completedSnapshots .peekLast ());
234+ return inLock ( lock , () -> Optional .ofNullable (completedSnapshots .peekLast () ));
215235 }
216236
217237 /**
0 commit comments