Skip to content

Commit c2a824f

Browse files
committed
refactor
1 parent f31d6f5 commit c2a824f

File tree

12 files changed

+250
-253
lines changed

12 files changed

+250
-253
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/json/TableBucketOffsets.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,24 @@ public Map<TableBucket, Long> getOffsets() {
7676
return offsets;
7777
}
7878

79+
/**
80+
* Serialize to a JSON byte array.
81+
*
82+
* @see TableBucketOffsetsJsonSerde
83+
*/
84+
public byte[] toJsonBytes() {
85+
return JsonSerdeUtils.writeValueAsBytes(this, TableBucketOffsetsJsonSerde.INSTANCE);
86+
}
87+
88+
/**
89+
* Deserialize from JSON byte array to an instance of {@link TableBucketOffsets}.
90+
*
91+
* @see TableBucketOffsets
92+
*/
93+
public static TableBucketOffsets fromJsonBytes(byte[] json) {
94+
return JsonSerdeUtils.readValue(json, TableBucketOffsetsJsonSerde.INSTANCE);
95+
}
96+
7997
@Override
8098
public boolean equals(Object o) {
8199
if (this == o) {

fluss-common/src/main/java/org/apache/fluss/utils/json/TableBucketOffsetsJsonSerde.java

Lines changed: 63 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.ArrayList;
2727
import java.util.Comparator;
2828
import java.util.HashMap;
29-
import java.util.Iterator;
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.TreeMap;
@@ -54,7 +53,7 @@
5453
* <p>The serialized format includes:
5554
*
5655
* <ul>
57-
* <li>"version": 1 - the format version
56+
* <li>"version": the format version
5857
* <li>"table_id": the table ID that all buckets belong to
5958
* <li>"bucket_offsets": array of offsets for non-partitioned table buckets (optional)
6059
* <li>"partition_offsets": array of partition offset objects for partitioned table buckets
@@ -75,72 +74,6 @@ public class TableBucketOffsetsJsonSerde
7574
private static final int VERSION = 1;
7675
private static final long UNKNOWN_OFFSET = -1;
7776

78-
/**
79-
* Deserializes a JSON node to a {@link TableBucketOffsets} object.
80-
*
81-
* <p>This method reads the JSON format and reconstructs the table bucket offsets map. The array
82-
* index in "bucket_offsets" represents the bucket id, and the value represents the offset.
83-
*
84-
* @param node the JSON node to deserialize
85-
* @return the deserialized {@link TableBucketOffsets} object
86-
* @throws IllegalArgumentException if the version is not supported
87-
*/
88-
@Override
89-
public TableBucketOffsets deserialize(JsonNode node) {
90-
int version = node.get(VERSION_KEY).asInt();
91-
if (version != VERSION) {
92-
throw new IllegalArgumentException("Unsupported version: " + version);
93-
}
94-
95-
long tableId = node.get(TABLE_ID_KEY).asLong();
96-
Map<TableBucket, Long> offsets = new HashMap<>();
97-
98-
// Deserialize non-partitioned table bucket offsets
99-
JsonNode bucketOffsetsNode = node.get(BUCKET_OFFSETS_KEY);
100-
JsonNode partitionBucketOffsetsNode = node.get(PARTITION_OFFSETS_KEY);
101-
if (bucketOffsetsNode != null || partitionBucketOffsetsNode != null) {
102-
if (bucketOffsetsNode != null && partitionBucketOffsetsNode != null) {
103-
throw new IllegalArgumentException(
104-
"Both bucket_offsets and partition_bucket_offsets cannot be present at the same time");
105-
}
106-
107-
if (bucketOffsetsNode != null) {
108-
int bucketId = 0;
109-
for (JsonNode bucketOffsetNode : bucketOffsetsNode) {
110-
long offset = bucketOffsetNode.asLong();
111-
// Ignore unknown offsets (filled for missing bucket ids)
112-
if (offset != UNKNOWN_OFFSET) {
113-
TableBucket tableBucket = new TableBucket(tableId, bucketId);
114-
offsets.put(tableBucket, offset);
115-
}
116-
bucketId++;
117-
}
118-
} else {
119-
for (JsonNode partitionOffsetNode : partitionBucketOffsetsNode) {
120-
long partitionId = partitionOffsetNode.get(PARTITION_ID_KEY).asLong();
121-
JsonNode bucketOffsetsArray = partitionOffsetNode.get(BUCKET_OFFSETS_KEY);
122-
if (bucketOffsetsArray != null && bucketOffsetsArray.isArray()) {
123-
Iterator<JsonNode> elements = bucketOffsetsArray.elements();
124-
int bucketId = 0;
125-
while (elements.hasNext()) {
126-
JsonNode offsetNode = elements.next();
127-
long offset = offsetNode.asLong();
128-
// Ignore unknown offsets (filled for missing bucket ids)
129-
if (offset != UNKNOWN_OFFSET) {
130-
TableBucket tableBucket =
131-
new TableBucket(tableId, partitionId, bucketId);
132-
offsets.put(tableBucket, offset);
133-
}
134-
bucketId++;
135-
}
136-
}
137-
}
138-
}
139-
}
140-
141-
return new TableBucketOffsets(tableId, offsets);
142-
}
143-
14477
/**
14578
* Serializes a {@link TableBucketOffsets} object to JSON format.
14679
*
@@ -219,6 +152,68 @@ public void serialize(TableBucketOffsets tableBucketOffsets, JsonGenerator gener
219152
generator.writeEndObject();
220153
}
221154

155+
/**
156+
* Deserializes a JSON node to a {@link TableBucketOffsets} object.
157+
*
158+
* <p>This method reads the JSON format and reconstructs the table bucket offsets map. The array
159+
* index in "bucket_offsets" represents the bucket id, and the value represents the offset.
160+
*
161+
* @param node the JSON node to deserialize
162+
* @return the deserialized {@link TableBucketOffsets} object
163+
* @throws IllegalArgumentException if the version is not supported
164+
*/
165+
@Override
166+
public TableBucketOffsets deserialize(JsonNode node) {
167+
int version = node.get(VERSION_KEY).asInt();
168+
if (version != VERSION) {
169+
throw new IllegalArgumentException("Unsupported version: " + version);
170+
}
171+
172+
long tableId = node.get(TABLE_ID_KEY).asLong();
173+
Map<TableBucket, Long> offsets = new HashMap<>();
174+
175+
// Deserialize non-partitioned table bucket offsets
176+
JsonNode bucketOffsetsNode = node.get(BUCKET_OFFSETS_KEY);
177+
JsonNode partitionBucketOffsetsNode = node.get(PARTITION_OFFSETS_KEY);
178+
if (bucketOffsetsNode != null || partitionBucketOffsetsNode != null) {
179+
if (bucketOffsetsNode != null && partitionBucketOffsetsNode != null) {
180+
throw new IllegalArgumentException(
181+
"Both bucket_offsets and partition_bucket_offsets cannot be present at the same time");
182+
}
183+
184+
if (bucketOffsetsNode != null) {
185+
int bucketId = 0;
186+
for (JsonNode bucketOffsetNode : bucketOffsetsNode) {
187+
long offset = bucketOffsetNode.asLong();
188+
// Ignore unknown offsets (filled for missing bucket ids)
189+
if (offset != UNKNOWN_OFFSET) {
190+
TableBucket tableBucket = new TableBucket(tableId, bucketId);
191+
offsets.put(tableBucket, offset);
192+
}
193+
bucketId++;
194+
}
195+
} else {
196+
for (JsonNode partitionOffsetNode : partitionBucketOffsetsNode) {
197+
long partitionId = partitionOffsetNode.get(PARTITION_ID_KEY).asLong();
198+
JsonNode bucketOffsetsArray = partitionOffsetNode.get(BUCKET_OFFSETS_KEY);
199+
int bucketId = 0;
200+
for (JsonNode bucketOffsetNode : bucketOffsetsArray) {
201+
long offset = bucketOffsetNode.asLong();
202+
// Ignore unknown offsets (filled for missing bucket ids)
203+
if (offset != UNKNOWN_OFFSET) {
204+
TableBucket tableBucket =
205+
new TableBucket(tableId, partitionId, bucketId);
206+
offsets.put(tableBucket, offset);
207+
}
208+
bucketId++;
209+
}
210+
}
211+
}
212+
}
213+
214+
return new TableBucketOffsets(tableId, offsets);
215+
}
216+
222217
private void serializeBucketLogEndOffset(
223218
Map<TableBucket, Long> bucketLogEndOffset,
224219
List<TableBucket> buckets,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import org.apache.fluss.metrics.registry.MetricRegistry;
2727
import org.apache.fluss.rpc.GatewayClientProxy;
2828
import org.apache.fluss.rpc.RpcClient;
29+
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
2930
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
3031
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
32+
import org.apache.fluss.rpc.messages.PbBucketOffset;
3133
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
3234
import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
3335
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
3436
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotMetadata;
3537
import org.apache.fluss.rpc.messages.PbPrepareCommitLakeTableRespForTable;
38+
import org.apache.fluss.rpc.messages.PbTableBucketOffsets;
3639
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
3740
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
3841
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
@@ -66,6 +69,7 @@ public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
6669
private final Configuration flussConf;
6770

6871
private CoordinatorGateway coordinatorGateway;
72+
private AdminReadOnlyGateway readOnlyGateway;
6973
private RpcClient rpcClient;
7074

7175
public FlussTableLakeSnapshotCommitter(Configuration flussConf) {
@@ -83,38 +87,40 @@ public void open() {
8387
this.coordinatorGateway =
8488
GatewayClientProxy.createGatewayProxy(
8589
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
90+
91+
this.readOnlyGateway =
92+
GatewayClientProxy.createGatewayProxy(
93+
metadataUpdater::getRandomTabletServer,
94+
rpcClient,
95+
AdminReadOnlyGateway.class);
8696
}
8797

8898
String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets)
8999
throws IOException {
90-
PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
91-
Exception exception = null;
100+
PbPrepareCommitLakeTableRespForTable prepareCommitResp;
92101
try {
93102
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
94103
toPrepareCommitLakeTableSnapshotRequest(tableId, tablePath, logEndOffsets);
95104
PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
96-
coordinatorGateway
105+
readOnlyGateway
97106
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
98107
.get();
99108
List<PbPrepareCommitLakeTableRespForTable> pbPrepareCommitLakeTableRespForTables =
100109
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
101110
checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
102111
prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
103112
if (prepareCommitResp.hasErrorCode()) {
104-
exception = ApiError.fromErrorMessage(prepareCommitResp).exception();
113+
throw ApiError.fromErrorMessage(prepareCommitResp).exception();
114+
} else {
115+
return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
105116
}
106117
} catch (Exception e) {
107-
exception = e;
108-
}
109-
110-
if (exception != null) {
111118
throw new IOException(
112119
String.format(
113120
"Fail to prepare commit table lake snapshot for %s to Fluss.",
114121
tablePath),
115-
ExceptionUtils.stripExecutionException(exception));
122+
ExceptionUtils.stripExecutionException(e));
116123
}
117-
return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
118124
}
119125

120126
void commit(
@@ -124,7 +130,6 @@ void commit(
124130
Map<TableBucket, Long> logEndOffsets,
125131
Map<TableBucket, Long> logMaxTieredTimestamps)
126132
throws IOException {
127-
Exception exception = null;
128133
try {
129134
CommitLakeTableSnapshotRequest request =
130135
toCommitLakeTableSnapshotRequest(
@@ -139,13 +144,9 @@ void commit(
139144
PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
140145
commitLakeTableSnapshotRespForTables.get(0);
141146
if (commitLakeTableSnapshotRes.hasErrorCode()) {
142-
exception = ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
147+
throw ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
143148
}
144-
} catch (Exception e) {
145-
exception = e;
146-
}
147-
148-
if (exception != null) {
149+
} catch (Exception exception) {
149150
throw new IOException(
150151
String.format(
151152
"Fail to commit table lake snapshot id %d of table %d to Fluss.",
@@ -166,26 +167,22 @@ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRe
166167
long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets) {
167168
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
168169
new PrepareCommitLakeTableSnapshotRequest();
169-
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
170-
prepareCommitLakeTableSnapshotRequest.addTablesReq();
171-
pbLakeTableSnapshotInfo.setTableId(tableId);
170+
PbTableBucketOffsets pbTableBucketOffsets =
171+
prepareCommitLakeTableSnapshotRequest.addBucketOffset();
172+
pbTableBucketOffsets.setTableId(tableId);
173+
pbTableBucketOffsets
174+
.setTablePath()
175+
.setDatabaseName(tablePath.getDatabaseName())
176+
.setTableName(tablePath.getTableName());
172177

173-
// in prepare phase, we don't know the snapshot id,
174-
// set -1 since the field is required
175-
pbLakeTableSnapshotInfo.setSnapshotId(-1L);
176178
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
177-
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
178-
pbLakeTableSnapshotInfo.addBucketsReq();
179+
PbBucketOffset pbBucketOffset = pbTableBucketOffsets.addBucketOffset();
179180
TableBucket tableBucket = logEndOffsetEntry.getKey();
180-
pbLakeTableSnapshotInfo
181-
.setTablePath()
182-
.setDatabaseName(tablePath.getDatabaseName())
183-
.setTableName(tablePath.getTableName());
184181
if (tableBucket.getPartitionId() != null) {
185-
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
182+
pbBucketOffset.setPartitionId(tableBucket.getPartitionId());
186183
}
187-
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
188-
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
184+
pbBucketOffset.setBucketId(tableBucket.getBucket());
185+
pbBucketOffset.setLogEndOffset(logEndOffsetEntry.getValue());
189186
}
190187
return prepareCommitLakeTableSnapshotRequest;
191188
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void testPartitionsExpiredInFlussButExistInLake(
617617
new TableBucket(tableId, hybridPartitionId, 1), lakeEndOffset,
618618
new TableBucket(tableId, hybridPartitionId, 2), lakeEndOffset));
619619
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
620-
lakeTableHelper.upsertLakeTable(tableId, DEFAULT_TABLE_PATH, lakeTableSnapshot);
620+
lakeTableHelper.upsertLakeTableV1(tableId, lakeTableSnapshot);
621621

622622
// Create PartitionInfo for lake partitions
623623
List<PartitionInfo> lakePartitionInfos = new ArrayList<>();

0 commit comments

Comments
 (0)