Skip to content

Commit 1ff4fc6

Browse files
committed
address jark's comments
1 parent 131d1a1 commit 1ff4fc6

File tree

3 files changed

+10
-17
lines changed

3 files changed

+10
-17
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ public void updateHighWatermark(long highWatermark) {
385385
updateHighWatermarkMetadata(newHighWatermarkMetadata);
386386
}
387387

388-
public void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
388+
private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
389389
if (newHighWatermark.getMessageOffset() < 0) {
390390
throw new IllegalArgumentException("High watermark offset should be non-negative");
391391
}
@@ -403,14 +403,11 @@ public void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
403403
}
404404

405405
/**
406-
* Judge whether to update the highWatermark to a new value if and only if it is larger than the
407-
* old value. It is an error to update to a value which is larger than the log end offset.
406+
* Update the highWatermark to a new value if and only if it is larger than the old value. It is
407+
* an error to update to a value which is larger than the log end offset.
408408
*
409409
* <p>This method is intended to be used by the leader to update the highWatermark after
410410
* follower fetch offsets have been updated.
411-
*
412-
* @param newHighWatermark the suggested new value for the high watermark.
413-
* @return the old high watermark if the new high watermark can be updated, otherwise empty.
414411
*/
415412
public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(
416413
LogOffsetMetadata newHighWatermark) throws IOException {
@@ -428,6 +425,7 @@ public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(
428425
if (oldHighWatermark.getMessageOffset() < newHighWatermark.getMessageOffset()
429426
|| (oldHighWatermark.getMessageOffset() == newHighWatermark.getMessageOffset()
430427
&& oldHighWatermark.onOlderSegment(newHighWatermark))) {
428+
updateHighWatermarkMetadata(newHighWatermark);
431429
return Optional.of(oldHighWatermark);
432430
} else {
433431
return Optional.empty();

fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -932,16 +932,15 @@ private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs)
932932
}
933933
}
934934

935+
// when the watermark can be advanced, we may need to flush kv first if it's kv replica,
936+
// and then update highWatermark.
937+
// TODO The flushKV and updateHighWatermark need to be atomic operation. See
938+
// https://github.com/alibaba/fluss/issues/513
939+
mayFlushKv(newHighWatermark.getMessageOffset());
935940
Optional<LogOffsetMetadata> oldWatermark =
936941
leaderLog.maybeIncrementHighWatermark(newHighWatermark);
937942
if (oldWatermark.isPresent()) {
938943
LOG.debug("High watermark update from {} to {}.", oldWatermark.get(), newHighWatermark);
939-
// when the watermark can be advanced, we may need to flush kv first if it's kv replica,
940-
// and then update highWatermark.
941-
// TODO The flushKV and updateHighWatermark need to be atomic operation. See
942-
// https://github.com/alibaba/fluss/issues/513
943-
mayFlushKv(newHighWatermark.getMessageOffset());
944-
logTablet.updateHighWatermarkMetadata(newHighWatermark);
945944
return true;
946945
} else {
947946
return false;

fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,7 @@ void testHighWatermarkMaintenance() throws Exception {
161161
logTablet.appendAsLeader(mr1);
162162
assertHighWatermark(0L);
163163
// Update highWatermark as leader.
164-
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(1L);
165-
Optional<LogOffsetMetadata> oldLogOffsetMetadata =
166-
logTablet.maybeIncrementHighWatermark(logOffsetMetadata);
167-
assertThat(oldLogOffsetMetadata).isPresent();
168-
logTablet.updateHighWatermarkMetadata(logOffsetMetadata);
164+
logTablet.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
169165
assertHighWatermark(1L);
170166
// Cannot update past the log end offset.
171167
logTablet.updateHighWatermark(5L);

0 commit comments

Comments
 (0)