Skip to content

Commit bc20370

Browse files
[kv] coordinator must handle inconsistency or broken snapshot scenarios (#1470)
Co-authored-by: ocean.wy <[email protected]>
1 parent a7bf007 commit bc20370

File tree

7 files changed

+174
-8
lines changed

7 files changed

+174
-8
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.coordinator;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.metadata.TableBucket;
2122
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
2223
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
@@ -37,6 +38,7 @@
3738
import java.util.Map;
3839
import java.util.Set;
3940
import java.util.concurrent.Executor;
41+
import java.util.function.Function;
4042

4143
import static org.apache.fluss.utils.Preconditions.checkArgument;
4244
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -50,11 +52,12 @@
5052
public class CompletedSnapshotStoreManager {
5153

5254
private static final Logger LOG = LoggerFactory.getLogger(CompletedSnapshotStoreManager.class);
53-
5455
private final int maxNumberOfSnapshotsToRetain;
5556
private final ZooKeeperClient zooKeeperClient;
5657
private final Map<TableBucket, CompletedSnapshotStore> bucketCompletedSnapshotStores;
5758
private final Executor ioExecutor;
59+
private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
60+
makeZookeeperCompletedSnapshotHandleStore;
5861

5962
public CompletedSnapshotStoreManager(
6063
int maxNumberOfSnapshotsToRetain,
@@ -66,6 +69,23 @@ public CompletedSnapshotStoreManager(
6669
this.zooKeeperClient = zooKeeperClient;
6770
this.bucketCompletedSnapshotStores = new HashMap<>();
6871
this.ioExecutor = ioExecutor;
72+
this.makeZookeeperCompletedSnapshotHandleStore = ZooKeeperCompletedSnapshotHandleStore::new;
73+
}
74+
75+
@VisibleForTesting
76+
CompletedSnapshotStoreManager(
77+
int maxNumberOfSnapshotsToRetain,
78+
Executor ioExecutor,
79+
ZooKeeperClient zooKeeperClient,
80+
Function<ZooKeeperClient, CompletedSnapshotHandleStore>
81+
makeZookeeperCompletedSnapshotHandleStore) {
82+
checkArgument(
83+
maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive");
84+
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
85+
this.zooKeeperClient = zooKeeperClient;
86+
this.bucketCompletedSnapshotStores = new HashMap<>();
87+
this.ioExecutor = ioExecutor;
88+
this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore;
6989
}
7090

7191
public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(TableBucket tableBucket) {
@@ -101,7 +121,7 @@ public void removeCompletedSnapshotStoreByTableBuckets(Set<TableBucket> tableBuc
101121
private CompletedSnapshotStore createCompletedSnapshotStore(
102122
TableBucket tableBucket, Executor ioExecutor) throws Exception {
103123
final CompletedSnapshotHandleStore completedSnapshotHandleStore =
104-
new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient);
124+
this.makeZookeeperCompletedSnapshotHandleStore.apply(zooKeeperClient);
105125

106126
// Get all there is first.
107127
List<CompletedSnapshotHandle> initialSnapshots =
@@ -120,13 +140,47 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
120140
LOG.info("Trying to fetch {} snapshots from storage.", numberOfInitialSnapshots);
121141

122142
for (CompletedSnapshotHandle snapshotStateHandle : initialSnapshots) {
123-
retrievedSnapshots.add(checkNotNull(snapshotStateHandle.retrieveCompleteSnapshot()));
143+
try {
144+
retrievedSnapshots.add(
145+
checkNotNull(snapshotStateHandle.retrieveCompleteSnapshot()));
146+
} catch (Exception e) {
147+
if (e.getMessage()
148+
.contains(CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE)) {
149+
LOG.error(
150+
"Metadata not found for snapshot {} of table bucket {}, maybe snapshot already removed or broken.",
151+
snapshotStateHandle.getSnapshotId(),
152+
tableBucket,
153+
e);
154+
try {
155+
completedSnapshotHandleStore.remove(
156+
tableBucket, snapshotStateHandle.getSnapshotId());
157+
} catch (Exception t) {
158+
LOG.error(
159+
"Failed to remove snapshotStateHandle {}.", snapshotStateHandle, t);
160+
throw t;
161+
}
162+
} else {
163+
LOG.error(
164+
"Failed to retrieveCompleteSnapshot for snapshotStateHandle {}.",
165+
snapshotStateHandle,
166+
e);
167+
throw e;
168+
}
169+
}
124170
}
125171

126172
// register all the files to shared kv file registry
127173
SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(ioExecutor);
128174
for (CompletedSnapshot completedSnapshot : retrievedSnapshots) {
129-
sharedKvFileRegistry.registerAllAfterRestored(completedSnapshot);
175+
try {
176+
sharedKvFileRegistry.registerAllAfterRestored(completedSnapshot);
177+
} catch (Exception e) {
178+
LOG.error(
179+
"Failed to registerAllAfterRestored for completedSnapshot {}.",
180+
completedSnapshot,
181+
e);
182+
throw e;
183+
}
130184
}
131185

132186
return new CompletedSnapshotStore(

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class CompletedSnapshot {
7171
/** The location where the snapshot is stored. */
7272
private final FsPath snapshotLocation;
7373

74+
public static final String SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE = "No such file or directory";
75+
7476
public CompletedSnapshot(
7577
TableBucket tableBucket,
7678
long snapshotID,

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotHandle.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@
4040
*/
4141
public class CompletedSnapshotHandle {
4242

43+
private final long snapshotId;
4344
private final FsPath metadataFilePath;
4445
private final long logOffset;
4546

46-
public CompletedSnapshotHandle(FsPath metadataFilePath, long logOffset) {
47+
public CompletedSnapshotHandle(long snapshotId, FsPath metadataFilePath, long logOffset) {
4748
checkNotNull(metadataFilePath);
49+
this.snapshotId = snapshotId;
4850
this.metadataFilePath = metadataFilePath;
4951
this.logOffset = logOffset;
5052
}
@@ -78,6 +80,10 @@ public long getLogOffset() {
7880
return logOffset;
7981
}
8082

83+
public long getSnapshotId() {
84+
return snapshotId;
85+
}
86+
8187
public void discard() throws Exception {
8288
final FileSystem fs = getFileSystem();
8389

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ private CompletedSnapshotHandle store(CompletedSnapshot snapshot) throws Excepti
223223
try (FSDataOutputStream outStream =
224224
fs.create(filePath, FileSystem.WriteMode.OVERWRITE)) {
225225
outStream.write(jsonBytes);
226-
return new CompletedSnapshotHandle(filePath, snapshot.getLogOffset());
226+
return new CompletedSnapshotHandle(
227+
snapshot.getSnapshotID(), filePath, snapshot.getLogOffset());
227228
} catch (Exception e) {
228229
latestException = e;
229230
}

fluss-server/src/main/java/org/apache/fluss/server/zk/data/BucketSnapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public long getLogOffset() {
5252
}
5353

5454
public CompletedSnapshotHandle toCompletedSnapshotHandle() {
55-
return new CompletedSnapshotHandle(new FsPath(metadataPath), logOffset);
55+
return new CompletedSnapshotHandle(snapshotId, new FsPath(metadataPath), logOffset);
5656
}
5757

5858
@Override

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import org.apache.fluss.metadata.TableBucket;
2121
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
22+
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
23+
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
2224
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
25+
import org.apache.fluss.server.kv.snapshot.TestingCompletedSnapshotHandle;
2326
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
2427
import org.apache.fluss.server.testutils.KvTestUtils;
2528
import org.apache.fluss.server.zk.NOPErrorHandler;
@@ -36,11 +39,16 @@
3639
import org.junit.jupiter.params.ParameterizedTest;
3740
import org.junit.jupiter.params.provider.ValueSource;
3841

42+
import java.io.FileNotFoundException;
3943
import java.io.IOException;
4044
import java.nio.file.Path;
45+
import java.util.ArrayList;
46+
import java.util.Comparator;
4147
import java.util.HashMap;
4248
import java.util.HashSet;
49+
import java.util.List;
4350
import java.util.Map;
51+
import java.util.Optional;
4452
import java.util.Set;
4553
import java.util.concurrent.ExecutorService;
4654
import java.util.concurrent.Executors;
@@ -172,6 +180,41 @@ void testRemoveCompletedSnapshotStoreFromManager() throws Exception {
172180
assertThat(completedSnapshotStoreManager.getBucketCompletedSnapshotStores()).isEmpty();
173181
}
174182

183+
@Test
184+
void testMetadataInconsistencyWithMetadataNotExistsException() throws Exception {
185+
// setup test data with mixed valid and invalid snapshots
186+
TableBucket tableBucket = new TableBucket(1, 1);
187+
CompletedSnapshot validSnapshot =
188+
KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, 1L);
189+
TestingCompletedSnapshotHandle validSnapshotHandle =
190+
new TestingCompletedSnapshotHandle(validSnapshot);
191+
192+
CompletedSnapshot invalidSnapshot =
193+
KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, 2L);
194+
TestingCompletedSnapshotHandle invalidSnapshotHandle =
195+
new TestingCompletedSnapshotHandleWithFileNotFound(invalidSnapshot);
196+
197+
// create CompletedSnapshotHandleStore with real implementations
198+
CompletedSnapshotHandleStore completedSnapshotHandleStore =
199+
new InMemoryCompletedSnapshotHandleStore();
200+
completedSnapshotHandleStore.add(tableBucket, 1L, validSnapshotHandle);
201+
completedSnapshotHandleStore.add(tableBucket, 2L, invalidSnapshotHandle);
202+
203+
// CompletedSnapshotStoreManager
204+
CompletedSnapshotStoreManager completedSnapshotStoreManager =
205+
new CompletedSnapshotStoreManager(
206+
10,
207+
ioExecutor,
208+
zookeeperClient,
209+
zooKeeperClient -> completedSnapshotHandleStore);
210+
211+
// Verify that only the valid snapshot remains
212+
CompletedSnapshotStore completedSnapshotStore =
213+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
214+
assertThat(completedSnapshotStore.getAllSnapshots()).hasSize(1);
215+
assertThat(completedSnapshotStore.getAllSnapshots().get(0).getSnapshotID()).isEqualTo(1L);
216+
}
217+
175218
private CompletedSnapshotStoreManager createCompletedSnapshotStoreManager(
176219
int maxNumberOfSnapshotsToRetain) {
177220
return new CompletedSnapshotStoreManager(
@@ -220,4 +263,64 @@ private Set<TableBucket> createTableBuckets(int tableNum, int bucketNum) {
220263
}
221264
return tableBuckets;
222265
}
266+
267+
/**
268+
* A test-specific implementation of CompletedSnapshotHandle that throws FileNotFoundException
269+
* with the specific error message expected by CompletedSnapshotStoreManager.
270+
*/
271+
private static class TestingCompletedSnapshotHandleWithFileNotFound
272+
extends TestingCompletedSnapshotHandle {
273+
274+
public TestingCompletedSnapshotHandleWithFileNotFound(CompletedSnapshot snapshot) {
275+
super(snapshot, false);
276+
}
277+
278+
@Override
279+
public CompletedSnapshot retrieveCompleteSnapshot() throws IOException {
280+
throw new FileNotFoundException(
281+
CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE);
282+
}
283+
}
284+
285+
private static class InMemoryCompletedSnapshotHandleStore
286+
implements CompletedSnapshotHandleStore {
287+
private final Map<TableBucket, Map<Long, CompletedSnapshotHandle>> snapshotHandleMap =
288+
new HashMap<>();
289+
290+
@Override
291+
public void add(
292+
TableBucket tableBucket,
293+
long snapshotId,
294+
CompletedSnapshotHandle completedSnapshotHandle)
295+
throws Exception {
296+
snapshotHandleMap
297+
.computeIfAbsent(tableBucket, k -> new HashMap<>())
298+
.put(snapshotId, completedSnapshotHandle);
299+
}
300+
301+
@Override
302+
public void remove(TableBucket tableBucket, long snapshotId) throws Exception {
303+
snapshotHandleMap.computeIfAbsent(tableBucket, k -> new HashMap<>()).remove(snapshotId);
304+
}
305+
306+
@Override
307+
public Optional<CompletedSnapshotHandle> get(TableBucket tableBucket, long snapshotId)
308+
throws Exception {
309+
return Optional.ofNullable(snapshotHandleMap.get(tableBucket))
310+
.map(map -> map.get(snapshotId));
311+
}
312+
313+
@Override
314+
public List<CompletedSnapshotHandle> getAllCompletedSnapshotHandles(TableBucket tableBucket)
315+
throws Exception {
316+
return new ArrayList<>(snapshotHandleMap.get(tableBucket).values());
317+
}
318+
319+
@Override
320+
public Optional<CompletedSnapshotHandle> getLatestCompletedSnapshotHandle(
321+
TableBucket tableBucket) throws Exception {
322+
return new ArrayList<>(snapshotHandleMap.get(tableBucket).values())
323+
.stream().max(Comparator.comparingLong(CompletedSnapshotHandle::getSnapshotId));
324+
}
325+
}
223326
}

fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedSnapshotHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public TestingCompletedSnapshotHandle(CompletedSnapshot snapshot) {
3535

3636
public TestingCompletedSnapshotHandle(
3737
CompletedSnapshot snapshot, boolean shouldFailWhenRetrieve) {
38-
super(snapshot.getSnapshotLocation(), snapshot.getLogOffset());
38+
super(snapshot.getSnapshotID(), snapshot.getSnapshotLocation(), snapshot.getLogOffset());
3939
this.snapshot = snapshot;
4040
this.shouldFailWhenRetrieve = shouldFailWhenRetrieve;
4141
}

0 commit comments

Comments
 (0)