Skip to content

Commit f2ea311

Browse files
committed
wip
1 parent 453d64b commit f2ea311

File tree

21 files changed

+404
-33
lines changed

21 files changed

+404
-33
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,14 @@ public class ConfigOptions {
631631
+ "we would fsync after every message; if it were 5 we would fsync after every "
632632
+ "five messages.");
633633

634+
public static final ConfigOption<Duration> LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL =
635+
key("log.flush.start-offset.checkpoint-interval")
636+
.durationType()
637+
.defaultValue(Duration.ofMinutes(1))
638+
.withDescription(
639+
"The frequency with which the log start offset is saved out to disk. "
640+
+ "The default setting is 1 minute.");
641+
634642
public static final ConfigOption<Duration> LOG_REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL =
635643
key("log.replica.high-watermark.checkpoint-interval")
636644
.durationType()

fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,28 +35,34 @@ public class FetchLogResultForBucket extends ResultForBucket {
3535
private final @Nullable RemoteLogFetchInfo remoteLogFetchInfo;
3636
private final @Nullable LogRecords records;
3737
private final long highWatermark;
38+
private final long logStartOffset;
3839

3940
public FetchLogResultForBucket(
40-
TableBucket tableBucket, LogRecords records, long highWatermark) {
41+
TableBucket tableBucket, LogRecords records, long highWatermark, long logStartOffset) {
4142
this(
4243
tableBucket,
4344
null,
4445
checkNotNull(records, "records can not be null"),
4546
highWatermark,
47+
logStartOffset,
4648
ApiError.NONE);
4749
}
4850

4951
public FetchLogResultForBucket(TableBucket tableBucket, ApiError error) {
50-
this(tableBucket, null, null, -1L, error);
52+
this(tableBucket, null, null, -1L, 0L, error);
5153
}
5254

5355
public FetchLogResultForBucket(
54-
TableBucket tableBucket, RemoteLogFetchInfo remoteLogFetchInfo, long highWatermark) {
56+
TableBucket tableBucket,
57+
RemoteLogFetchInfo remoteLogFetchInfo,
58+
long highWatermark,
59+
long logStartOffset) {
5560
this(
5661
tableBucket,
5762
checkNotNull(remoteLogFetchInfo, "remote log fetch info can not be null"),
5863
null,
5964
highWatermark,
65+
logStartOffset,
6066
ApiError.NONE);
6167
}
6268

@@ -65,11 +71,13 @@ private FetchLogResultForBucket(
6571
@Nullable RemoteLogFetchInfo remoteLogFetchInfo,
6672
@Nullable LogRecords records,
6773
long highWatermark,
74+
long logStartOffset,
6875
ApiError error) {
6976
super(tableBucket, error);
7077
this.remoteLogFetchInfo = remoteLogFetchInfo;
7178
this.records = records;
7279
this.highWatermark = highWatermark;
80+
this.logStartOffset = logStartOffset;
7381
}
7482

7583
/**
@@ -102,4 +110,8 @@ public LogRecords recordsOrEmpty() {
102110
public long getHighWatermark() {
103111
return highWatermark;
104112
}
113+
114+
public long getLogStartOffset() {
115+
return logStartOffset;
116+
}
105117
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,22 @@ public static FetchLogResultForBucket getFetchLogResultForBucket(
195195
pbRlfInfo.getFirstStartPos());
196196
fetchLogResultForBucket =
197197
new FetchLogResultForBucket(
198-
tb, rlFetchInfo, respForBucket.getHighWatermark());
198+
tb,
199+
rlFetchInfo,
200+
respForBucket.getHighWatermark(),
201+
respForBucket.getLogStartOffset());
199202
} else {
200203
ByteBuffer recordsBuffer = toByteBuffer(respForBucket.getRecordsSlice());
201204
LogRecords records =
202205
respForBucket.hasRecords()
203206
? MemoryLogRecords.pointToByteBuffer(recordsBuffer)
204207
: MemoryLogRecords.EMPTY;
205208
fetchLogResultForBucket =
206-
new FetchLogResultForBucket(tb, records, respForBucket.getHighWatermark());
209+
new FetchLogResultForBucket(
210+
tb,
211+
records,
212+
respForBucket.getHighWatermark(),
213+
respForBucket.getLogStartOffset());
207214
}
208215
}
209216

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ message PbFetchLogRespForBucket {
674674
optional int32 error_code = 3;
675675
optional string error_message = 4;
676676
optional int64 high_watermark = 5;
677-
optional int64 log_start_offset = 6; // TODO now we don't introduce log start offset, but remain it in protobuf
677+
optional int64 log_start_offset = 6;
678678
optional PbRemoteLogFetchInfo remote_log_fetch_info = 7;
679679
optional bytes records = 8;
680680
}

fluss-server/src/main/java/org/apache/fluss/server/log/LoadedLogOffsets.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,24 @@
2424
/** A class to represent the loaded log offsets. */
2525
@Internal
2626
final class LoadedLogOffsets {
27+
private final long logStartOffset;
2728
private final long recoveryPoint;
2829
private final LogOffsetMetadata nextOffsetMetadata;
2930

30-
public LoadedLogOffsets(final long recoveryPoint, final LogOffsetMetadata nextOffsetMetadata) {
31+
public LoadedLogOffsets(
32+
final long logStartOffset,
33+
final long recoveryPoint,
34+
final LogOffsetMetadata nextOffsetMetadata) {
35+
this.logStartOffset = logStartOffset;
3136
this.recoveryPoint = recoveryPoint;
3237
this.nextOffsetMetadata =
3338
Objects.requireNonNull(nextOffsetMetadata, "nextOffsetMetadata should not be null");
3439
}
3540

41+
public long getLogStartOffset() {
42+
return logStartOffset;
43+
}
44+
3645
public long getRecoveryPoint() {
3746
return recoveryPoint;
3847
}
@@ -50,20 +59,24 @@ public boolean equals(Object o) {
5059
return false;
5160
}
5261
final LoadedLogOffsets that = (LoadedLogOffsets) o;
53-
return recoveryPoint == that.recoveryPoint
62+
return logStartOffset == that.logStartOffset
63+
&& recoveryPoint == that.recoveryPoint
5464
&& nextOffsetMetadata.equals(that.nextOffsetMetadata);
5565
}
5666

5767
@Override
5868
public int hashCode() {
59-
int result = Long.hashCode(recoveryPoint);
69+
int result = Long.hashCode(logStartOffset);
70+
result = 31 * result + Long.hashCode(recoveryPoint);
6071
result = 31 * result + nextOffsetMetadata.hashCode();
6172
return result;
6273
}
6374

6475
@Override
6576
public String toString() {
6677
return "LoadedLogOffsets("
78+
+ "logStartOffset="
79+
+ logStartOffset
6780
+ ", recoveryPoint="
6881
+ recoveryPoint
6982
+ ", nextOffsetMetadata="

fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ void checkIfMemoryMappedBufferClosed() {
169169
}
170170
}
171171

172-
private void updateRecoveryPoint(long newRecoveryPoint) {
172+
public void updateRecoveryPoint(long newRecoveryPoint) {
173173
recoveryPoint = newRecoveryPoint;
174174
}
175175

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ final class LogLoader {
5353
private final Configuration conf;
5454
private final LogSegments logSegments;
5555
private final long recoveryPointCheckpoint;
56+
private final long logStartOffsetCheckpoint;
5657
private final LogFormat logFormat;
5758
private final WriterStateManager writerStateManager;
5859
private final boolean isCleanShutdown;
@@ -62,13 +63,15 @@ public LogLoader(
6263
Configuration conf,
6364
LogSegments logSegments,
6465
long recoveryPointCheckpoint,
66+
long logStartOffsetCheckpoint,
6567
LogFormat logFormat,
6668
WriterStateManager writerStateManager,
6769
boolean isCleanShutdown) {
6870
this.logTabletDir = logTabletDir;
6971
this.conf = conf;
7072
this.logSegments = logSegments;
7173
this.recoveryPointCheckpoint = recoveryPointCheckpoint;
74+
this.logStartOffsetCheckpoint = logStartOffsetCheckpoint;
7275
this.logFormat = logFormat;
7376
this.writerStateManager = writerStateManager;
7477
this.isCleanShutdown = isCleanShutdown;
@@ -94,6 +97,8 @@ public LoadedLogOffsets load() throws IOException {
9497
newRecoveryPoint = result.f0;
9598
nextOffset = result.f1;
9699

100+
// TODO: revisit here if we need to update logStartOffset if remote log disable
101+
97102
// Any segment loading or recovery code must not use writerStateManager, so that we can
98103
// build the full state here from scratch.
99104
if (!writerStateManager.isEmpty()) {
@@ -119,6 +124,7 @@ public LoadedLogOffsets load() throws IOException {
119124
LogSegment activeSegment = logSegments.lastSegment().get();
120125
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
121126
return new LoadedLogOffsets(
127+
logStartOffsetCheckpoint,
122128
newRecoveryPoint,
123129
new LogOffsetMetadata(
124130
nextOffset, activeSegment.getBaseOffset(), activeSegment.getSizeInBytes()));

0 commit comments

Comments
 (0)