Skip to content

Commit 481b2cd

Browse files
committed
[server] Fix ReplicaFetcherThread keeps throwing OutOfOrderSequenceException because of writer id expire
1 parent 3a48f6d commit 481b2cd

File tree

14 files changed

+331
-25
lines changed

14 files changed

+331
-25
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class MetricNames {
4141
public static final String TABLE_COUNT = "tableCount";
4242
public static final String BUCKET_COUNT = "bucketCount";
4343
public static final String REPLICAS_TO_DELETE_COUNT = "replicasToDeleteCount";
44+
public static final String PARTITION_COUNT = "partitionCount";
45+
public static final String TOTAL_PARTITION_COUNT = "totalPartitionCount";
4446

4547
// for coordinator event processor
4648
public static final String EVENT_QUEUE_SIZE = "eventQueueSize";

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public class CoordinatorContext {
104104
private ServerInfo coordinatorServerInfo = null;
105105
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
106106

107+
private Runnable partitionCountUpdateCallback = null;
108+
107109
public CoordinatorContext() {}
108110

109111
public int getCoordinatorEpoch() {
@@ -232,6 +234,17 @@ public void putTableInfo(TableInfo tableInfo) {
232234
public void putPartition(long partitionId, PhysicalTablePath physicalTablePath) {
233235
this.pathByPartitionId.put(partitionId, physicalTablePath);
234236
this.partitionIdByPath.put(physicalTablePath, partitionId);
237+
updatePartitionCountMetrics();
238+
}
239+
240+
public void setPartitionCountUpdateCallback(Runnable callback) {
241+
this.partitionCountUpdateCallback = callback;
242+
}
243+
244+
private void updatePartitionCountMetrics() {
245+
if (partitionCountUpdateCallback != null) {
246+
partitionCountUpdateCallback.run();
247+
}
235248
}
236249

237250
public TableInfo getTableInfoById(long tableId) {
@@ -551,6 +564,23 @@ public Set<TablePartition> getPartitionsToBeDeleted() {
551564
return partitionsToBeDeleted;
552565
}
553566

567+
public int getTotalPartitionCount() {
568+
return partitionAssignments.size();
569+
}
570+
571+
public int getPartitionCountForTable(long tableId) {
572+
return (int)
573+
partitionAssignments.keySet().stream()
574+
.filter(partition -> partition.getTableId() == tableId)
575+
.count();
576+
}
577+
578+
public Set<TablePartition> getPartitionsForTable(long tableId) {
579+
return partitionAssignments.keySet().stream()
580+
.filter(partition -> partition.getTableId() == tableId)
581+
.collect(Collectors.toSet());
582+
}
583+
554584
public boolean isToBeDeleted(TableBucket tableBucket) {
555585
if (tableBucket.getPartitionId() == null) {
556586
return isTableQueuedForDeletion(tableBucket.getTableId());
@@ -614,6 +644,8 @@ public void removePartition(TablePartition tablePartition) {
614644
if (physicalTablePath != null) {
615645
partitionIdByPath.remove(physicalTablePath);
616646
}
647+
// Update partition count metrics when partition is removed
648+
updatePartitionCountMetrics();
617649
}
618650

619651
private void clearTablesState() {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,16 @@ protected void startServices() throws Exception {
176176
this.coordinatorContext = new CoordinatorContext();
177177
this.metadataCache = new CoordinatorMetadataCache();
178178

179+
coordinatorContext.setPartitionCountUpdateCallback(
180+
() -> {
181+
if (serverMetricGroup != null) {
182+
int totalPartitionCount = coordinatorContext.getTotalPartitionCount();
183+
((CoordinatorMetricGroup.MutableGauge<Integer>)
184+
serverMetricGroup.totalPartitionCount())
185+
.setValue(totalPartitionCount);
186+
}
187+
});
188+
179189
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
180190
if (authorizer != null) {
181191
authorizer.startup();

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
643643
// now that we have valid records, offsets assigned, we need to validate the idempotent
644644
// state of the writers and collect some metadata.
645645
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
646-
analyzeAndValidateWriterState(validRecords);
646+
analyzeAndValidateWriterState(validRecords, appendAsLeader);
647647

648648
if (validateResult.isLeft()) {
649649
// have duplicated batch metadata, skip the append and update append info.
@@ -1006,7 +1006,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10061006

10071007
/** Returns either the duplicated batch metadata (left) or the updated writers (right). */
10081008
private Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>>
1009-
analyzeAndValidateWriterState(MemoryLogRecords records) {
1009+
analyzeAndValidateWriterState(MemoryLogRecords records, boolean isAppendAsLeader) {
10101010
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
10111011

10121012
for (LogRecordBatch batch : records.batches()) {
@@ -1023,14 +1023,15 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10231023
}
10241024

10251025
// update write append info.
1026-
updateWriterAppendInfo(writerStateManager, batch, updatedWriters);
1026+
updateWriterAppendInfo(writerStateManager, batch, updatedWriters, isAppendAsLeader);
10271027
}
10281028
}
10291029

10301030
return Either.right(updatedWriters.values());
10311031
}
10321032

1033-
void removeExpiredWriter(long currentTimeMs) {
1033+
@VisibleForTesting
1034+
public void removeExpiredWriter(long currentTimeMs) {
10341035
synchronized (lock) {
10351036
writerStateManager.removeExpiredWriters(currentTimeMs);
10361037
}
@@ -1110,14 +1111,16 @@ private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionR
11101111
private static void updateWriterAppendInfo(
11111112
WriterStateManager writerStateManager,
11121113
LogRecordBatch batch,
1113-
Map<Long, WriterAppendInfo> writers) {
1114+
Map<Long, WriterAppendInfo> writers,
1115+
boolean isAppendAsLeader) {
11141116
long writerId = batch.writerId();
11151117
// update writers.
11161118
WriterAppendInfo appendInfo =
11171119
writers.computeIfAbsent(writerId, id -> writerStateManager.prepareUpdate(writerId));
11181120
appendInfo.append(
11191121
batch,
1120-
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
1122+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
1123+
isAppendAsLeader);
11211124
}
11221125

11231126
static void rebuildWriterState(
@@ -1230,7 +1233,7 @@ private static void loadWritersFromRecords(
12301233
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
12311234
for (LogRecordBatch batch : records.batches()) {
12321235
if (batch.hasWriterId()) {
1233-
updateWriterAppendInfo(writerStateManager, batch, loadedWriters);
1236+
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
12341237
}
12351238
}
12361239
loadedWriters.values().forEach(writerStateManager::update);

fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ public long writerId() {
4444
return writerId;
4545
}
4646

47-
public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
47+
public void append(
48+
LogRecordBatch batch, boolean isWriterInBatchExpired, boolean isAppendAsLeader) {
4849
LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(batch.baseLogOffset());
4950
appendDataBatch(
5051
batch.batchSequence(),
5152
firstOffsetMetadata,
5253
batch.lastLogOffset(),
5354
isWriterInBatchExpired,
55+
isAppendAsLeader,
5456
batch.commitTimestamp());
5557
}
5658

@@ -59,8 +61,9 @@ public void appendDataBatch(
5961
LogOffsetMetadata firstOffsetMetadata,
6062
long lastOffset,
6163
boolean isWriterInBatchExpired,
64+
boolean isAppendAsLeader,
6265
long batchTimestamp) {
63-
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset);
66+
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset, isAppendAsLeader);
6467
updatedEntry.addBath(
6568
batchSequence,
6669
lastOffset,
@@ -69,13 +72,16 @@ public void appendDataBatch(
6972
}
7073

7174
private void maybeValidateDataBatch(
72-
int appendFirstSeq, boolean isWriterInBatchExpired, long lastOffset) {
75+
int appendFirstSeq,
76+
boolean isWriterInBatchExpired,
77+
long lastOffset,
78+
boolean isAppendAsLeader) {
7379
int currentLastSeq =
7480
!updatedEntry.isEmpty()
7581
? updatedEntry.lastBatchSequence()
7682
: currentEntry.lastBatchSequence();
7783
// must be in sequence, even for the first batch should start from 0
78-
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired)) {
84+
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired, isAppendAsLeader)) {
7985
throw new OutOfOrderSequenceException(
8086
String.format(
8187
"Out of order batch sequence for writer %s at offset %s in "
@@ -93,16 +99,53 @@ public WriterStateEntry toEntry() {
9399
* three scenarios will be judged as in sequence:
94100
*
95101
* <ul>
96-
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check whether the committed
97-
* timestamp of the next batch under the current writerId has expired. If it has expired,
98-
* we consider this a special case caused by writerId expiration, for this case, to ensure
99-
* the correctness of follower sync, we still treat it as in sequence.
102+
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two scenarios will be judged as
103+
* in sequence:
104+
* <ul>
105+
* <li>If the committed timestamp of the next batch under the current writerId has
106+
* expired, we consider this a special case caused by writerId expiration, for this
107+
* case, to ensure the correctness of follower sync, we still treat it as in
108+
* sequence.
109+
* <li>If the append request is from the follower, we consider this is a special case
110+
* caused by inconsistent expiration of writerId between the leader and follower. To
111+
* prevent continuous fetch failures on the follower side, we still treat it as in
112+
* sequence. Here is a detailed example: The expiration of a writer is triggered
113+
* asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at intervals
114+
* defined by {@code server.writer-id.expiration-check-interval}, which can result
115+
* in slight differences in the actual expiration times of the same writer on the
116+
* leader replica and follower replicas. This slight difference leads to a dreadful
117+
* corner case. Imagine the following scenario(set {@code
118+
* server.writer-id.expiration-check-interval}: 10min, {@code
119+
* server.writer-id.expiration-time}: 12h):
120+
* <pre>{@code
121+
* Step Time Action of Leader Action of Follower
122+
* 1 00:03:38 receive batch 0 of writer 101
123+
* 2 00:03:38 fetch batch 0 of writer 101
124+
* 3 12:05:00 remove state of writer 101
125+
* 4 12:10:02 receive batch 1 of writer 101
126+
* 5 12:10:02 fetch batch 0 of writer 101
127+
* 6 12:11:00 remove state of writer 101
128+
*
129+
* }</pre>
130+
* In step 3, the follower removes the state of writer 101 first, since it has been
131+
* more than 12 hours since writer 101's last batch write, making it safe to remove.
132+
* However, since the expiration of writer 101 has not yet occurred on the leader,
133+
* and a new batch 1 is received at this time, it is successfully written on the
134+
* leader. At this point, the fetcher pulls batch 1 from the leader, but since the
135+
* state of writer 101 has already been cleaned up, an {@link
136+
* OutOfOrderSequenceException} will occur during to write if we don't treat it as
137+
* in sequence.
138+
* </ul>
100139
* <li>nextBatchSeq == lastBatchSeq + 1L
101140
* <li>lastBatchSeq reaches its maximum value
102141
* </ul>
103142
*/
104-
private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean isWriterInBatchExpired) {
105-
return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
143+
private boolean inSequence(
144+
int lastBatchSeq,
145+
int nextBatchSeq,
146+
boolean isWriterInBatchExpired,
147+
boolean isAppendAsLeader) {
148+
return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired || !isAppendAsLeader))
106149
|| nextBatchSeq == lastBatchSeq + 1L
107150
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
108151
}

fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.fluss.metadata.TableInfo;
2626
import org.apache.fluss.metadata.TablePath;
2727
import org.apache.fluss.server.coordinator.MetadataManager;
28+
import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
29+
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
2830
import org.apache.fluss.server.tablet.TabletServer;
2931
import org.apache.fluss.server.zk.ZooKeeperClient;
3032

@@ -68,11 +70,16 @@ public class TabletServerMetadataCache implements ServerMetadataCache {
6870

6971
private final MetadataManager metadataManager;
7072
private final ZooKeeperClient zkClient;
73+
private final TabletServerMetricGroup metricGroup;
7174

72-
public TabletServerMetadataCache(MetadataManager metadataManager, ZooKeeperClient zkClient) {
75+
public TabletServerMetadataCache(
76+
MetadataManager metadataManager,
77+
ZooKeeperClient zkClient,
78+
TabletServerMetricGroup metricGroup) {
7379
this.serverMetadataSnapshot = ServerMetadataSnapshot.empty();
7480
this.metadataManager = metadataManager;
7581
this.zkClient = zkClient;
82+
this.metricGroup = metricGroup;
7683
}
7784

7885
@Override
@@ -269,9 +276,36 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
269276
partitionIdByPath,
270277
bucketMetadataMapForTables,
271278
bucketMetadataMapForPartitions);
279+
280+
// Update partition count metrics
281+
updatePartitionCountMetrics(partitionIdByPath);
272282
});
273283
}
274284

285+
private void updatePartitionCountMetrics(Map<PhysicalTablePath, Long> partitionIdByPath) {
286+
if (metricGroup != null) {
287+
Map<TablePath, Integer> partitionCountByTable = new HashMap<>();
288+
for (PhysicalTablePath physicalTablePath : partitionIdByPath.keySet()) {
289+
TablePath tablePath = physicalTablePath.getTablePath();
290+
partitionCountByTable.merge(tablePath, 1, Integer::sum);
291+
}
292+
293+
for (Map.Entry<TablePath, Integer> entry : partitionCountByTable.entrySet()) {
294+
TablePath tablePath = entry.getKey();
295+
int partitionCount = entry.getValue();
296+
297+
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, null);
298+
PhysicalTableMetricGroup tableMetricGroup =
299+
metricGroup.getPhysicalTableMetricGroup(physicalTablePath);
300+
if (tableMetricGroup != null) {
301+
((PhysicalTableMetricGroup.MutableGauge<Integer>)
302+
tableMetricGroup.partitionCount())
303+
.setValue(partitionCount);
304+
}
305+
}
306+
}
307+
}
308+
275309
@VisibleForTesting
276310
public void clearTableMetadata() {
277311
inLock(

fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.fluss.server.metrics.group;
1919

2020
import org.apache.fluss.metrics.CharacterFilter;
21+
import org.apache.fluss.metrics.Gauge;
22+
import org.apache.fluss.metrics.MetricNames;
2123
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
2224
import org.apache.fluss.metrics.registry.MetricRegistry;
2325

@@ -32,12 +34,17 @@ public class CoordinatorMetricGroup extends AbstractMetricGroup {
3234
protected final String hostname;
3335
protected final String serverId;
3436

37+
private final MutableGauge<Integer> totalPartitionCount;
38+
3539
public CoordinatorMetricGroup(
3640
MetricRegistry registry, String clusterId, String hostname, String serverId) {
3741
super(registry, new String[] {clusterId, hostname, NAME}, null);
3842
this.clusterId = clusterId;
3943
this.hostname = hostname;
4044
this.serverId = serverId;
45+
46+
this.totalPartitionCount = new MutableGauge<>(0);
47+
gauge(MetricNames.TOTAL_PARTITION_COUNT, totalPartitionCount);
4148
}
4249

4350
@Override
@@ -51,4 +58,25 @@ protected final void putVariables(Map<String, String> variables) {
5158
variables.put("host", hostname);
5259
variables.put("server_id", serverId);
5360
}
61+
62+
public Gauge<Integer> totalPartitionCount() {
63+
return totalPartitionCount;
64+
}
65+
66+
public static class MutableGauge<T> implements Gauge<T> {
67+
private volatile T value;
68+
69+
public MutableGauge(T initialValue) {
70+
this.value = initialValue;
71+
}
72+
73+
@Override
74+
public T getValue() {
75+
return value;
76+
}
77+
78+
public void setValue(T value) {
79+
this.value = value;
80+
}
81+
}
5482
}

0 commit comments

Comments
 (0)