Skip to content

Commit 87a3f48

Browse files
authored
[server] Introduce jitter for upload remote log (apache#872)
1 parent 5f2324d commit 87a3f48

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.Future;
5353
import java.util.concurrent.ScheduledExecutorService;
5454
import java.util.concurrent.ScheduledFuture;
55+
import java.util.concurrent.ThreadLocalRandom;
5556
import java.util.concurrent.TimeUnit;
5657
import java.util.stream.Collectors;
5758

@@ -283,7 +284,10 @@ private void doHandleLeaderReplica(
283284
LOG.info("Created a new remote log task: {} and getting scheduled", task);
284285
ScheduledFuture<?> future =
285286
rlManagerScheduledThreadPool.scheduleWithFixedDelay(
286-
task, 0, taskInterval, TimeUnit.MILLISECONDS);
287+
task,
288+
Math.abs(ThreadLocalRandom.current().nextLong(taskInterval)),
289+
taskInterval,
290+
TimeUnit.MILLISECONDS);
287291
return new TaskWithFuture(task, future);
288292
});
289293
}

fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/CommitRemoteLogManifestITCase.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
import org.junit.jupiter.api.extension.RegisterExtension;
2929

3030
import java.time.Duration;
31-
import java.util.Arrays;
31+
import java.util.List;
3232
import java.util.Objects;
33+
import java.util.stream.Collectors;
3334
import java.util.stream.Stream;
3435

3536
import static com.alibaba.fluss.record.TestData.DATA1;
@@ -78,32 +79,44 @@ void testDeleteOutOfSyncReplicaLogAfterCommit() throws Exception {
7879
i * 10L);
7980
}
8081

81-
// stop a replica to mock follower is out of sync
82-
int stopFollower = Stream.of(0, 1, 2).filter(i -> i != leader).findFirst().get();
83-
FLUSS_CLUSTER_EXTENSION.stopReplica(stopFollower, tb, 1);
82+
// stop replicas to mock followers are out of sync
83+
List<Integer> stopFollowers =
84+
Stream.of(0, 1, 2).filter(i -> i != leader).collect(Collectors.toList());
85+
for (int stopFollower : stopFollowers) {
86+
FLUSS_CLUSTER_EXTENSION.stopReplica(stopFollower, tb, 1);
87+
}
8488
leaderGateWay
8589
.produceLog(
8690
newProduceLogRequest(tableId, 0, -1, genMemoryLogRecordsByObject(DATA1)))
8791
.get();
88-
FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb, stopFollower);
89-
90-
LogTablet stopfollowerLogTablet =
91-
FLUSS_CLUSTER_EXTENSION.waitAndGetFollowerReplica(tb, stopFollower).getLogTablet();
92-
assertThat(stopfollowerLogTablet.logSegments()).hasSize(3);
92+
for (int stopFollower : stopFollowers) {
93+
FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb, stopFollower);
94+
LogTablet stopfollowerLogTablet =
95+
FLUSS_CLUSTER_EXTENSION
96+
.waitAndGetFollowerReplica(tb, stopFollower)
97+
.getLogTablet();
98+
assertThat(stopfollowerLogTablet.logSegments()).hasSize(3);
99+
}
93100

94-
// send notify leader to make remote log tier happen immediately
95-
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
101+
// restart the leader server with a small log tiering interval
102+
FLUSS_CLUSTER_EXTENSION.restartTabletServer(
96103
leader,
97-
DATA1_TABLE_PATH,
98-
tb,
99-
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getLeaderAndIsr(tb).get(),
100-
Arrays.asList(0, 1, 2));
104+
new Configuration()
105+
.set(
106+
ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
107+
Duration.ofMillis(1)));
101108
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);
102109

103-
// check has two remote log segments for the stopped replica
104-
retry(
105-
Duration.ofMinutes(1),
106-
() -> assertThat(stopfollowerLogTablet.logSegments()).hasSize(2));
110+
// check only has two remote log segments for the stopped replicas
111+
for (int stopFollower : stopFollowers) {
112+
LogTablet stopfollowerLogTablet =
113+
FLUSS_CLUSTER_EXTENSION
114+
.waitAndGetFollowerReplica(tb, stopFollower)
115+
.getLogTablet();
116+
retry(
117+
Duration.ofMinutes(1),
118+
() -> assertThat(stopfollowerLogTablet.logSegments()).hasSize(2));
119+
}
107120
}
108121

109122
private static Configuration initConfig() {

fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,21 @@ public void startTabletServer(int serverId, boolean forceStartIfExists) throws E
273273
"Tablet server " + serverId + " already exists.");
274274
}
275275
}
276+
startTabletServer(serverId, null);
277+
}
278+
279+
private void startTabletServer(int serverId, @Nullable Configuration overwriteConfig)
280+
throws Exception {
276281
String dataDir = getDataDir(serverId);
277282
Configuration tabletServerConf = new Configuration(clusterConf);
278283
tabletServerConf.set(ConfigOptions.TABLET_SERVER_ID, serverId);
279284
tabletServerConf.set(ConfigOptions.DATA_DIR, dataDir);
280285
tabletServerConf.setString(
281286
ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperServer.getConnectString());
282287
tabletServerConf.setString(ConfigOptions.BIND_LISTENERS, tabletServerListeners);
288+
if (overwriteConfig != null) {
289+
tabletServerConf.addAll(overwriteConfig);
290+
}
283291

284292
setRemoteDataDir(tabletServerConf);
285293

@@ -295,6 +303,11 @@ public void startTabletServer(int serverId, boolean forceStartIfExists) throws E
295303
tabletServerInfos.put(serverId, serverInfo);
296304
}
297305

306+
public void restartTabletServer(int serverId, Configuration overwriteConfig) throws Exception {
307+
stopTabletServer(serverId);
308+
startTabletServer(serverId, overwriteConfig);
309+
}
310+
298311
public void assertHasTabletServerNumber(int tabletServerNumber) {
299312
CoordinatorGateway coordinatorGateway = newCoordinatorClient();
300313
retry(

0 commit comments

Comments
 (0)