Skip to content

Commit 131d1a1

Browse files
committed
[server] Cdc log visible only when the data already flush to rocksDb from preWriteBuffer
1 parent 0626b16 commit 131d1a1

File tree

4 files changed

+64
-6
lines changed

4 files changed

+64
-6
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ public void updateHighWatermark(long highWatermark) {
385385
updateHighWatermarkMetadata(newHighWatermarkMetadata);
386386
}
387387

388-
private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
388+
public void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
389389
if (newHighWatermark.getMessageOffset() < 0) {
390390
throw new IllegalArgumentException("High watermark offset should be non-negative");
391391
}
@@ -403,11 +403,14 @@ private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
403403
}
404404

405405
/**
406-
* Update the highWatermark to a new value if and only if it is larger than the old value. It is
407-
* an error to update to a value which is larger than the log end offset.
406+
* Judge whether to update the highWatermark to a new value if and only if it is larger than the
407+
* old value. It is an error to update to a value which is larger than the log end offset.
408408
*
409409
* <p>This method is intended to be used by the leader to update the highWatermark after
410410
* follower fetch offsets have been updated.
411+
*
412+
* @param newHighWatermark the suggested new value for the high watermark.
413+
* @return the old high watermark if the new high watermark can be updated, otherwise empty.
411414
*/
412415
public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(
413416
LogOffsetMetadata newHighWatermark) throws IOException {
@@ -425,7 +428,6 @@ public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(
425428
if (oldHighWatermark.getMessageOffset() < newHighWatermark.getMessageOffset()
426429
|| (oldHighWatermark.getMessageOffset() == newHighWatermark.getMessageOffset()
427430
&& oldHighWatermark.onOlderSegment(newHighWatermark))) {
428-
updateHighWatermarkMetadata(newHighWatermark);
429431
return Optional.of(oldHighWatermark);
430432
} else {
431433
return Optional.empty();

fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -936,8 +936,12 @@ private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs)
936936
leaderLog.maybeIncrementHighWatermark(newHighWatermark);
937937
if (oldWatermark.isPresent()) {
938938
LOG.debug("High watermark update from {} to {}.", oldWatermark.get(), newHighWatermark);
939-
// when watermark advanced, we may need to flush kv if it's kv replica
939+
// when the watermark can be advanced, we may need to flush kv first if it's kv replica,
940+
// and then update highWatermark.
941+
// TODO The flushKV and updateHighWatermark need to be atomic operation. See
942+
// https://github.com/alibaba/fluss/issues/513
940943
mayFlushKv(newHighWatermark.getMessageOffset());
944+
logTablet.updateHighWatermarkMetadata(newHighWatermark);
941945
return true;
942946
} else {
943947
return false;

fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,11 @@ void testHighWatermarkMaintenance() throws Exception {
161161
logTablet.appendAsLeader(mr1);
162162
assertHighWatermark(0L);
163163
// Update highWatermark as leader.
164-
logTablet.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
164+
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(1L);
165+
Optional<LogOffsetMetadata> oldLogOffsetMetadata =
166+
logTablet.maybeIncrementHighWatermark(logOffsetMetadata);
167+
assertThat(oldLogOffsetMetadata).isPresent();
168+
logTablet.updateHighWatermarkMetadata(logOffsetMetadata);
165169
assertHighWatermark(1L);
166170
// Cannot update past the log end offset.
167171
logTablet.updateHighWatermark(5L);

fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464

6565
import javax.annotation.Nullable;
6666

67+
import java.time.Duration;
6768
import java.util.ArrayList;
6869
import java.util.Arrays;
6970
import java.util.Collections;
@@ -80,6 +81,7 @@
8081
import static com.alibaba.fluss.record.TestData.DATA1_KEY_TYPE;
8182
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
8283
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
84+
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK;
8385
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
8486
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID_PK;
8587
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
@@ -102,6 +104,7 @@
102104
import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
103105
import static com.alibaba.fluss.testutils.DataTestUtils.getKeyValuePairs;
104106
import static com.alibaba.fluss.testutils.DataTestUtils.row;
107+
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
105108
import static org.assertj.core.api.Assertions.assertThat;
106109
import static org.assertj.core.api.Assertions.assertThatThrownBy;
107110

@@ -1163,6 +1166,51 @@ void testStopReplica() throws Exception {
11631166
replicaManager.getReplicaOrException(tb);
11641167
}
11651168

1169+
@Test
1170+
void testKvDataVisibility() throws Exception {
1171+
// The CDC log is only visible after the KV has been flushed to RocksDB. In other words,
1172+
// when we can read the CDC log, the associated kv record must have been
1173+
// inserted/updated/deleted in RocksDB. The reason for ensuring this visibility is that we
1174+
// first buffer the data in memory before flushing it to RocksDB. Thus, we need to guarantee
1175+
// visibility.
1176+
TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1);
1177+
makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket());
1178+
Replica replica = replicaManager.getReplicaOrException(tb);
1179+
1180+
// retry send kv records to kv store, if the highWatermark increased, the kv record must be
1181+
// visible in rocksdb.
1182+
int round = 1000;
1183+
CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0});
1184+
CompletableFuture<List<PutKvResultForBucket>> future;
1185+
for (int i = 0; i < round; i++) {
1186+
future = new CompletableFuture<>();
1187+
Object[] key = {i};
1188+
Object[] value = {i, "a"};
1189+
byte[] keyBytes = keyEncoder.encodeKey(row(key));
1190+
byte[] valueBytes =
1191+
ValueEncoder.encodeValue(
1192+
DEFAULT_SCHEMA_ID, compactedRow(DATA1_SCHEMA_PK.getRowType(), value));
1193+
List<byte[]> lookups = replica.lookups(Collections.singletonList(keyBytes));
1194+
assertThat(lookups.size()).isEqualTo(1);
1195+
assertThat(lookups.get(0)).isNull();
1196+
replicaManager.putRecordsToKv(
1197+
20000,
1198+
1,
1199+
Collections.singletonMap(
1200+
tb, genKvRecordBatch(Collections.singletonList(Tuple2.of(key, value)))),
1201+
null,
1202+
future::complete);
1203+
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, i + 1));
1204+
1205+
int expectedHw = i + 1;
1206+
retry(
1207+
Duration.ofMinutes(1),
1208+
() -> assertThat(replica.getLogHighWatermark()).isEqualTo(expectedHw));
1209+
assertThat(replica.lookups(Collections.singletonList(keyBytes)))
1210+
.containsExactlyInAnyOrder(valueBytes);
1211+
}
1212+
}
1213+
11661214
@Test
11671215
void testSnapshotKvReplicas() throws Exception {
11681216
// create multiple kv replicas and all do the snapshot operation

0 commit comments

Comments
 (0)