Skip to content

Commit d8e5e97

Browse files
committed
fix
1 parent 7a05278 commit d8e5e97

File tree

4 files changed

+69
-63
lines changed

4 files changed

+69
-63
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/RetryUtils.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
import org.apache.paimon.fs.FileIO;
2222
import org.apache.paimon.fs.Path;
2323
import org.apache.paimon.utils.JsonSerdeUtil;
24-
import org.apache.paimon.utils.RetryUtils;
2524

2625
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2726
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
2827
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2928
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
3029

30+
import java.io.IOException;
31+
import java.io.UncheckedIOException;
3132
import java.util.Optional;
3233

3334
/** Consumer which contains next snapshot. */
@@ -57,7 +58,33 @@ public static Consumer fromJson(String json) {
5758
}
5859

5960
public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
60-
return RetryUtils.retry(
61-
() -> fileIO.readOverwrittenFileUtf8(path).orElse(null), Consumer::fromJson);
61+
int retryNumber = 0;
62+
Exception exception = null;
63+
while (retryNumber++ < 10) {
64+
Optional<String> content;
65+
try {
66+
content = fileIO.readOverwrittenFileUtf8(path);
67+
} catch (IOException e) {
68+
throw new UncheckedIOException(e);
69+
}
70+
71+
if (!content.isPresent()) {
72+
return Optional.empty();
73+
}
74+
75+
try {
76+
return content.map(Consumer::fromJson);
77+
} catch (Exception e) {
78+
// retry
79+
exception = e;
80+
try {
81+
Thread.sleep(200);
82+
} catch (InterruptedException ie) {
83+
Thread.currentThread().interrupt();
84+
throw new RuntimeException(ie);
85+
}
86+
}
87+
}
88+
throw new RuntimeException("Retry fail after 10 times", exception);
6289
}
6390
}

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,31 @@ public static Snapshot fromPath(FileIO fileIO, Path path) {
770770
}
771771

772772
public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
773-
return RetryUtils.retry(() -> fileIO.readFileUtf8(path), Snapshot::fromJson)
774-
.orElseThrow(() -> new RuntimeException("Cannot read snapshot from " + path));
773+
int retryNumber = 0;
774+
Exception exception = null;
775+
while (retryNumber++ < 10) {
776+
String content;
777+
try {
778+
content = fileIO.readFileUtf8(path);
779+
} catch (FileNotFoundException e) {
780+
throw e;
781+
} catch (IOException e) {
782+
throw new RuntimeException("Fails to read snapshot from path " + path, e);
783+
}
784+
785+
try {
786+
return Snapshot.fromJson(content);
787+
} catch (Exception e) {
788+
// retry
789+
exception = e;
790+
try {
791+
Thread.sleep(200);
792+
} catch (InterruptedException ie) {
793+
Thread.currentThread().interrupt();
794+
throw new RuntimeException(ie);
795+
}
796+
}
797+
}
798+
throw new RuntimeException("Retry fail after 10 times", exception);
775799
}
776800
}

paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import javax.annotation.Nullable;
3535

36+
import java.io.File;
3637
import java.io.IOException;
3738
import java.util.ArrayList;
3839
import java.util.HashSet;
@@ -46,6 +47,7 @@
4647
import static org.apache.paimon.SnapshotTest.newSnapshotManager;
4748
import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
4849
import static org.assertj.core.api.Assertions.assertThat;
50+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4951
import static org.assertj.core.api.Assertions.fail;
5052
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
5153

@@ -54,6 +56,17 @@ public class SnapshotManagerTest {
5456

5557
@TempDir java.nio.file.Path tempDir;
5658

59+
@Test
60+
public void testRetry() throws IOException {
61+
SnapshotManager snapshotManager =
62+
newSnapshotManager(LocalFileIO.create(), new Path(tempDir.toString()));
63+
File file = new File(tempDir.toFile(), "/snapshot/snapshot-1");
64+
file.getParentFile().mkdir();
65+
file.createNewFile();
66+
assertThatThrownBy(() -> snapshotManager.snapshot(1))
67+
.hasMessageContaining("Retry fail after 10 times");
68+
}
69+
5770
@Test
5871
public void testSnapshotPath() {
5972
SnapshotManager snapshotManager =

0 commit comments

Comments
 (0)