Skip to content

Commit 7a05278

Browse files
committed
[core] Fix retry in Consumer and SnapshotManager
1 parent c8ae95a commit 7a05278

File tree

4 files changed

+75
-44
lines changed

4 files changed

+75
-44
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.io.IOException;
22+
import java.io.UncheckedIOException;
23+
import java.util.Optional;
24+
import java.util.function.Function;
25+
26+
/** Utils for retry operations. */
27+
public class RetryUtils {
28+
29+
public static <I, R> Optional<R> retry(
30+
SupplierWithIOException<I> inputSupplier, Function<I, R> retryFunc) {
31+
int retryNumber = 0;
32+
Exception exception = null;
33+
while (retryNumber++ < 10) {
34+
I input;
35+
try {
36+
input = inputSupplier.get();
37+
} catch (IOException e) {
38+
throw new UncheckedIOException(e);
39+
}
40+
if (input == null) {
41+
return Optional.empty();
42+
}
43+
try {
44+
return Optional.ofNullable(retryFunc.apply(input));
45+
} catch (Exception e) {
46+
// retry
47+
exception = e;
48+
try {
49+
Thread.sleep(200);
50+
} catch (InterruptedException ie) {
51+
Thread.currentThread().interrupt();
52+
throw new RuntimeException(ie);
53+
}
54+
}
55+
}
56+
throw new RuntimeException("Retry fail after 10 times.", exception);
57+
}
58+
}

paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import org.apache.paimon.fs.FileIO;
2222
import org.apache.paimon.fs.Path;
2323
import org.apache.paimon.utils.JsonSerdeUtil;
24+
import org.apache.paimon.utils.RetryUtils;
2425

2526
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2627
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
2728
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2829
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
29-
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException;
3030

31-
import java.io.IOException;
32-
import java.io.UncheckedIOException;
3331
import java.util.Optional;
3432

3533
/** Consumer which contains next snapshot. */
@@ -59,24 +57,7 @@ public static Consumer fromJson(String json) {
5957
}
6058

6159
public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
62-
int retryNumber = 0;
63-
MismatchedInputException exception = null;
64-
while (retryNumber++ < 10) {
65-
try {
66-
return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
67-
} catch (MismatchedInputException e) {
68-
// retry
69-
exception = e;
70-
try {
71-
Thread.sleep(1_000);
72-
} catch (InterruptedException ie) {
73-
Thread.currentThread().interrupt();
74-
throw new RuntimeException(ie);
75-
}
76-
} catch (IOException e) {
77-
throw new UncheckedIOException(e);
78-
}
79-
}
80-
throw new UncheckedIOException(exception);
60+
return RetryUtils.retry(
61+
() -> fileIO.readOverwrittenFileUtf8(path).orElse(null), Consumer::fromJson);
8162
}
8263
}

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.paimon.table.Instant;
2525

2626
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
27-
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException;
2827

2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
@@ -771,26 +770,7 @@ public static Snapshot fromPath(FileIO fileIO, Path path) {
771770
}
772771

773772
public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
774-
int retryNumber = 0;
775-
MismatchedInputException exception = null;
776-
while (retryNumber++ < 10) {
777-
try {
778-
return Snapshot.fromJson(fileIO.readFileUtf8(path));
779-
} catch (MismatchedInputException e) {
780-
// retry
781-
exception = e;
782-
try {
783-
Thread.sleep(1_000);
784-
} catch (InterruptedException ie) {
785-
Thread.currentThread().interrupt();
786-
throw new RuntimeException(ie);
787-
}
788-
} catch (FileNotFoundException e) {
789-
throw e;
790-
} catch (IOException e) {
791-
throw new RuntimeException("Fails to read snapshot from path " + path, e);
792-
}
793-
}
794-
throw new UncheckedIOException(exception);
773+
return RetryUtils.retry(() -> fileIO.readFileUtf8(path), Snapshot::fromJson)
774+
.orElseThrow(() -> new RuntimeException("Cannot read snapshot from " + path));
795775
}
796776
}

paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import org.junit.jupiter.api.Test;
2626
import org.junit.jupiter.api.io.TempDir;
2727

28+
import java.io.File;
29+
import java.io.IOException;
2830
import java.nio.file.Path;
2931
import java.time.LocalDateTime;
3032
import java.util.Optional;
3133
import java.util.OptionalLong;
3234

3335
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3437

3538
/** Test for {@link ConsumerManager}. */
3639
public class ConsumerManagerTest {
@@ -53,6 +56,15 @@ public void before() {
5356
"branch1");
5457
}
5558

59+
@Test
60+
public void testRetry() throws IOException {
61+
File file = new File(tempDir.toFile(), "consumer/consumer-id1");
62+
file.getParentFile().mkdir();
63+
file.createNewFile();
64+
assertThatThrownBy(() -> manager.consumer("id1"))
65+
.hasMessageContaining("Retry fail after 10 times");
66+
}
67+
5668
@Test
5769
public void test() {
5870
Optional<Consumer> consumer = manager.consumer("id1");

0 commit comments

Comments
 (0)