Skip to content

Commit 46c8025

Browse files
committed
[server] Reorganize the update mechanisms for logStartOffset, localStartOffset, and remoteLogStartOffset
1 parent c6af64d commit 46c8025

File tree

27 files changed

+803
-45
lines changed

27 files changed

+803
-45
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ void testSimple(byte recordBatchMagic) throws Exception {
100100
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
101101
FetchLogResultForBucket resultForBucket0 =
102102
new FetchLogResultForBucket(
103-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
103+
tb,
104+
createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic),
105+
10L,
106+
0L);
104107
DefaultCompletedFetch defaultCompletedFetch =
105108
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
106109
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -123,7 +126,10 @@ void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
123126
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
124127
FetchLogResultForBucket resultForBucket0 =
125128
new FetchLogResultForBucket(
126-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
129+
tb,
130+
createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic),
131+
10L,
132+
0L);
127133
DefaultCompletedFetch defaultCompletedFetch =
128134
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
129135
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
@@ -136,7 +142,7 @@ void testNoRecordsInFetch() {
136142
int bucketId = 0; // records for 0-10.
137143
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
138144
FetchLogResultForBucket resultForBucket0 =
139-
new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L);
145+
new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L, 0L);
140146
DefaultCompletedFetch defaultCompletedFetch =
141147
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
142148
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(10);
@@ -178,7 +184,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
178184
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED, magic);
179185
}
180186
FetchLogResultForBucket resultForBucket0 =
181-
new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
187+
new FetchLogResultForBucket(tb, memoryLogRecords, 10L, 0L);
182188
DefaultCompletedFetch defaultCompletedFetch =
183189
makeCompletedFetch(tb, resultForBucket0, fetchOffset, projection);
184190
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -269,7 +275,8 @@ void testComplexTypeFetch() throws Exception {
269275
LOG_MAGIC_VALUE_V0,
270276
complexData,
271277
LogFormat.ARROW),
272-
3L);
278+
3L,
279+
0L);
273280
DefaultCompletedFetch defaultCompletedFetch =
274281
new DefaultCompletedFetch(
275282
tb,

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ private boolean await(LogFetchBuffer buffer, Duration waitTime) throws Interrupt
260260
private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket) throws Exception {
261261
return new DefaultCompletedFetch(
262262
tableBucket,
263-
new FetchLogResultForBucket(tableBucket, genMemoryLogRecordsByObject(DATA1), 10L),
263+
new FetchLogResultForBucket(
264+
tableBucket, genMemoryLogRecordsByObject(DATA1), 10L, 0L),
264265
readContext,
265266
logScannerStatus,
266267
true,

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void testNormal() throws Exception {
8585
int bucketId = 0; // records for 0-10.
8686
TableBucket tb = new TableBucket(DATA1_TABLE_ID, bucketId);
8787
FetchLogResultForBucket resultForBucket0 =
88-
new FetchLogResultForBucket(tb, genMemoryLogRecordsByObject(DATA1), 10L);
88+
new FetchLogResultForBucket(tb, genMemoryLogRecordsByObject(DATA1), 10L, 0L);
8989
CompletedFetch completedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset);
9090

9191
// Validate that the buffer is empty until after we add the fetch data.
@@ -135,9 +135,9 @@ void testCollectAfterUnassign() throws Exception {
135135
logScannerStatus.assignScanBuckets(scanBuckets);
136136

137137
FetchLogResultForBucket resultForBucket1 =
138-
new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L);
138+
new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L, 0L);
139139
FetchLogResultForBucket resultForBucket2 =
140-
new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L);
140+
new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L, 0L);
141141
CompletedFetch completedFetch1 = makeCompletedFetch(tb1, resultForBucket1, 0L);
142142
CompletedFetch completedFetch2 = makeCompletedFetch(tb2, resultForBucket2, 0L);
143143

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,22 @@ public class ConfigOptions {
631631
+ "we would fsync after every message; if it were 5 we would fsync after every "
632632
+ "five messages.");
633633

634+
public static final ConfigOption<Duration> LOG_FLUSH_RECOVERY_OFFSET_CHECKPOINT_INTERVAL =
635+
key("log.flush.recovery-offset.checkpoint-interval")
636+
.durationType()
637+
.defaultValue(Duration.ofMinutes(1))
638+
.withDescription(
639+
"The frequency with which the log start offset is saved out to disk. "
640+
+ "The default setting is 1 minute.");
641+
642+
public static final ConfigOption<Duration> LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL =
643+
key("log.flush.start-offset.checkpoint-interval")
644+
.durationType()
645+
.defaultValue(Duration.ofMinutes(1))
646+
.withDescription(
647+
"The frequency with which the log start offset is saved out to disk. "
648+
+ "The default setting is 1 minute.");
649+
634650
public static final ConfigOption<Duration> LOG_REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL =
635651
key("log.replica.high-watermark.checkpoint-interval")
636652
.durationType()

fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,28 +35,34 @@ public class FetchLogResultForBucket extends ResultForBucket {
3535
private final @Nullable RemoteLogFetchInfo remoteLogFetchInfo;
3636
private final @Nullable LogRecords records;
3737
private final long highWatermark;
38+
private final long logStartOffset;
3839

3940
public FetchLogResultForBucket(
40-
TableBucket tableBucket, LogRecords records, long highWatermark) {
41+
TableBucket tableBucket, LogRecords records, long highWatermark, long logStartOffset) {
4142
this(
4243
tableBucket,
4344
null,
4445
checkNotNull(records, "records can not be null"),
4546
highWatermark,
47+
logStartOffset,
4648
ApiError.NONE);
4749
}
4850

4951
public FetchLogResultForBucket(TableBucket tableBucket, ApiError error) {
50-
this(tableBucket, null, null, -1L, error);
52+
this(tableBucket, null, null, -1L, 0L, error);
5153
}
5254

5355
public FetchLogResultForBucket(
54-
TableBucket tableBucket, RemoteLogFetchInfo remoteLogFetchInfo, long highWatermark) {
56+
TableBucket tableBucket,
57+
RemoteLogFetchInfo remoteLogFetchInfo,
58+
long highWatermark,
59+
long logStartOffset) {
5560
this(
5661
tableBucket,
5762
checkNotNull(remoteLogFetchInfo, "remote log fetch info can not be null"),
5863
null,
5964
highWatermark,
65+
logStartOffset,
6066
ApiError.NONE);
6167
}
6268

@@ -65,11 +71,13 @@ private FetchLogResultForBucket(
6571
@Nullable RemoteLogFetchInfo remoteLogFetchInfo,
6672
@Nullable LogRecords records,
6773
long highWatermark,
74+
long logStartOffset,
6875
ApiError error) {
6976
super(tableBucket, error);
7077
this.remoteLogFetchInfo = remoteLogFetchInfo;
7178
this.records = records;
7279
this.highWatermark = highWatermark;
80+
this.logStartOffset = logStartOffset;
7381
}
7482

7583
/**
@@ -102,4 +110,8 @@ public LogRecords recordsOrEmpty() {
102110
public long getHighWatermark() {
103111
return highWatermark;
104112
}
113+
114+
public long getLogStartOffset() {
115+
return logStartOffset;
116+
}
105117
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,22 @@ public static FetchLogResultForBucket getFetchLogResultForBucket(
195195
pbRlfInfo.getFirstStartPos());
196196
fetchLogResultForBucket =
197197
new FetchLogResultForBucket(
198-
tb, rlFetchInfo, respForBucket.getHighWatermark());
198+
tb,
199+
rlFetchInfo,
200+
respForBucket.getHighWatermark(),
201+
respForBucket.getLogStartOffset());
199202
} else {
200203
ByteBuffer recordsBuffer = toByteBuffer(respForBucket.getRecordsSlice());
201204
LogRecords records =
202205
respForBucket.hasRecords()
203206
? MemoryLogRecords.pointToByteBuffer(recordsBuffer)
204207
: MemoryLogRecords.EMPTY;
205208
fetchLogResultForBucket =
206-
new FetchLogResultForBucket(tb, records, respForBucket.getHighWatermark());
209+
new FetchLogResultForBucket(
210+
tb,
211+
records,
212+
respForBucket.getHighWatermark(),
213+
respForBucket.getLogStartOffset());
207214
}
208215
}
209216

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ message PbFetchLogRespForBucket {
674674
optional int32 error_code = 3;
675675
optional string error_message = 4;
676676
optional int64 high_watermark = 5;
677-
optional int64 log_start_offset = 6; // TODO now we don't introduce log start offset, but remain it in protobuf
677+
optional int64 log_start_offset = 6;
678678
optional PbRemoteLogFetchInfo remote_log_fetch_info = 7;
679679
optional bytes records = 8;
680680
}

fluss-server/src/main/java/org/apache/fluss/server/log/LoadedLogOffsets.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,24 @@
2424
/** A class to represent the loaded log offsets. */
2525
@Internal
2626
final class LoadedLogOffsets {
27+
private final long logStartOffset;
2728
private final long recoveryPoint;
2829
private final LogOffsetMetadata nextOffsetMetadata;
2930

30-
public LoadedLogOffsets(final long recoveryPoint, final LogOffsetMetadata nextOffsetMetadata) {
31+
public LoadedLogOffsets(
32+
final long logStartOffset,
33+
final long recoveryPoint,
34+
final LogOffsetMetadata nextOffsetMetadata) {
35+
this.logStartOffset = logStartOffset;
3136
this.recoveryPoint = recoveryPoint;
3237
this.nextOffsetMetadata =
3338
Objects.requireNonNull(nextOffsetMetadata, "nextOffsetMetadata should not be null");
3439
}
3540

41+
public long getLogStartOffset() {
42+
return logStartOffset;
43+
}
44+
3645
public long getRecoveryPoint() {
3746
return recoveryPoint;
3847
}
@@ -50,20 +59,24 @@ public boolean equals(Object o) {
5059
return false;
5160
}
5261
final LoadedLogOffsets that = (LoadedLogOffsets) o;
53-
return recoveryPoint == that.recoveryPoint
62+
return logStartOffset == that.logStartOffset
63+
&& recoveryPoint == that.recoveryPoint
5464
&& nextOffsetMetadata.equals(that.nextOffsetMetadata);
5565
}
5666

5767
@Override
5868
public int hashCode() {
59-
int result = Long.hashCode(recoveryPoint);
69+
int result = Long.hashCode(logStartOffset);
70+
result = 31 * result + Long.hashCode(recoveryPoint);
6071
result = 31 * result + nextOffsetMetadata.hashCode();
6172
return result;
6273
}
6374

6475
@Override
6576
public String toString() {
6677
return "LoadedLogOffsets("
78+
+ "logStartOffset="
79+
+ logStartOffset
6780
+ ", recoveryPoint="
6881
+ recoveryPoint
6982
+ ", nextOffsetMetadata="

fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ void checkIfMemoryMappedBufferClosed() {
169169
}
170170
}
171171

172-
private void updateRecoveryPoint(long newRecoveryPoint) {
172+
public void updateRecoveryPoint(long newRecoveryPoint) {
173173
recoveryPoint = newRecoveryPoint;
174174
}
175175

@@ -329,6 +329,7 @@ LogSegment createAndDeleteSegment(
329329
if (newOffset != segmentToDelete.getBaseOffset()) {
330330
segments.remove(segmentToDelete.getBaseOffset());
331331
}
332+
deleteSegmentFiles(Collections.singletonList(segmentToDelete), reason);
332333
return newSegment;
333334
}
334335

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ final class LogLoader {
5353
private final Configuration conf;
5454
private final LogSegments logSegments;
5555
private final long recoveryPointCheckpoint;
56+
private final long logStartOffsetCheckpoint;
5657
private final LogFormat logFormat;
5758
private final WriterStateManager writerStateManager;
5859
private final boolean isCleanShutdown;
@@ -62,13 +63,15 @@ public LogLoader(
6263
Configuration conf,
6364
LogSegments logSegments,
6465
long recoveryPointCheckpoint,
66+
long logStartOffsetCheckpoint,
6567
LogFormat logFormat,
6668
WriterStateManager writerStateManager,
6769
boolean isCleanShutdown) {
6870
this.logTabletDir = logTabletDir;
6971
this.conf = conf;
7072
this.logSegments = logSegments;
7173
this.recoveryPointCheckpoint = recoveryPointCheckpoint;
74+
this.logStartOffsetCheckpoint = logStartOffsetCheckpoint;
7275
this.logFormat = logFormat;
7376
this.writerStateManager = writerStateManager;
7477
this.isCleanShutdown = isCleanShutdown;
@@ -119,6 +122,7 @@ public LoadedLogOffsets load() throws IOException {
119122
LogSegment activeSegment = logSegments.lastSegment().get();
120123
activeSegment.resizeIndexes((int) conf.get(ConfigOptions.LOG_INDEX_FILE_SIZE).getBytes());
121124
return new LoadedLogOffsets(
125+
logStartOffsetCheckpoint,
122126
newRecoveryPoint,
123127
new LogOffsetMetadata(
124128
nextOffset, activeSegment.getBaseOffset(), activeSegment.getSizeInBytes()));

0 commit comments

Comments
 (0)