Skip to content

Commit a6c7f69

Browse files
committed
minor fix
1 parent 3889069 commit a6c7f69

File tree

12 files changed

+649
-373
lines changed

12 files changed

+649
-373
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -710,14 +710,14 @@ public static FsPath remoteLakeTableSnapshotDir(
710710
* <p>The path contract:
711711
*
712712
* <pre>
713-
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{uuid}.manifest
713+
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{UUID}.offsets
714714
* </pre>
715715
*/
716-
public static FsPath remoteLakeTableSnapshotManifestPath(
716+
public static FsPath remoteLakeTableSnapshotOffsetPath(
717717
String remoteDataDir, TablePath tablePath, long tableId) {
718718
return new FsPath(
719719
String.format(
720-
"%s/metadata/%s.manifest",
720+
"%s/metadata/%s.offsets",
721721
remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId),
722722
UUID.randomUUID()));
723723
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,8 +1305,8 @@ private void handleCommitLakeTableSnapshotV1(
13051305
}
13061306

13071307
// this involves IO operation (ZK), so we do it in ioExecutor
1308-
lakeTableHelper.upsertLakeTable(
1309-
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1308+
lakeTableHelper.upsertLakeTableV1(
1309+
tableId, lakeTableSnapshotEntry.getValue());
13101310
} catch (Exception e) {
13111311
ApiError error = ApiError.fromThrowable(e);
13121312
tableResp.setError(error.error().code(), error.message());
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.Objects;
25+
26+
/**
27+
* Represents the offsets for all buckets of a table. This class stores the mapping from {@link
28+
* TableBucket} to their corresponding offsets.
29+
*
30+
* <p>This class is used to track the log end offsets for each bucket in a table. It supports both
31+
* non-partitioned tables (where buckets are identified only by bucket id) and partitioned tables
32+
* (where buckets are identified by partition id and bucket id).
33+
*
34+
* <p>The offsets map contains entries for each bucket that has a valid offset. Missing buckets are
35+
* not included in the map.
36+
*
37+
* @see TableBucketOffsetsJsonSerde for JSON serialization and deserialization.
38+
*/
39+
public class TableBucketOffsets {
40+
41+
/** The table ID that all buckets belong to. */
42+
private final long tableId;
43+
44+
/**
45+
* The mapping from {@link TableBucket} to their offsets. The map contains entries only for
46+
* buckets that have valid offsets.
47+
*/
48+
private final Map<TableBucket, Long> offsets;
49+
50+
/**
51+
* Creates a new {@link TableBucketOffsets} instance.
52+
*
53+
* @param tableId the table ID that all buckets belong to
54+
* @param offsets the mapping from {@link TableBucket} to their offsets
55+
*/
56+
public TableBucketOffsets(long tableId, Map<TableBucket, Long> offsets) {
57+
this.tableId = tableId;
58+
this.offsets = offsets;
59+
}
60+
61+
/**
62+
* Returns the table ID that all buckets belong to.
63+
*
64+
* @return the table ID
65+
*/
66+
public long getTableId() {
67+
return tableId;
68+
}
69+
70+
/**
71+
* Returns the mapping from {@link TableBucket} to their offsets.
72+
*
73+
* @return the offsets map
74+
*/
75+
public Map<TableBucket, Long> getOffsets() {
76+
return offsets;
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (this == o) {
82+
return true;
83+
}
84+
if (o == null || getClass() != o.getClass()) {
85+
return false;
86+
}
87+
TableBucketOffsets that = (TableBucketOffsets) o;
88+
return tableId == that.tableId && Objects.equals(offsets, that.offsets);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hash(tableId, offsets);
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return "TableBucketOffsets{" + "tableId=" + tableId + ", offsets=" + offsets + '}';
99+
}
100+
}
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
23+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
24+
import org.apache.fluss.utils.json.JsonDeserializer;
25+
import org.apache.fluss.utils.json.JsonSerializer;
26+
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.Comparator;
30+
import java.util.HashMap;
31+
import java.util.Iterator;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.TreeMap;
35+
36+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
37+
import static org.apache.fluss.utils.Preconditions.checkState;
38+
39+
/**
40+
* Json serializer and deserializer for {@link TableBucketOffsets}.
41+
*
42+
* <p>This serde supports the following JSON format:
43+
*
44+
* <ul>
45+
* <li>Non-partition table uses "bucket_offsets": [1234, 5678, 1992], where array index represents
46+
* bucket id (0, 1, 2) and value represents the offset. Missing bucket ids in the sequence are
47+
* filled with -1.
48+
* <li>Partition table uses "partition_offsets": [{"partition_id": 3001, "bucket_offsets": [1234,
49+
* 5678, 1992]}, ...], where each element contains a partition_id and a bucket_offsets array.
50+
* The array index in bucket_offsets represents bucket id and value represents the offset.
51+
* Missing bucket ids in the sequence are filled with -1.
52+
* </ul>
53+
*
54+
* <p>During deserialization, values of -1 are ignored and not added to the offsets map.
55+
*
56+
* <p>The serialized format includes:
57+
*
58+
* <ul>
59+
* <li>"version": 1 - the format version
60+
* <li>"table_id": the table ID that all buckets belong to
61+
* <li>"bucket_offsets": array of offsets for non-partitioned table buckets (optional)
62+
* <li>"partition_offsets": array of partition offset objects for partitioned table buckets
63+
* (optional)
64+
* </ul>
65+
*/
66+
public class TableBucketOffsetsJsonSerde
67+
implements JsonSerializer<TableBucketOffsets>, JsonDeserializer<TableBucketOffsets> {
68+
69+
public static final TableBucketOffsetsJsonSerde INSTANCE = new TableBucketOffsetsJsonSerde();
70+
71+
private static final String VERSION_KEY = "version";
72+
private static final String TABLE_ID_KEY = "table_id";
73+
private static final String BUCKET_OFFSETS_KEY = "bucket_offsets";
74+
private static final String PARTITION_OFFSETS_KEY = "partition_offsets";
75+
private static final String PARTITION_ID_KEY = "partition_id";
76+
77+
private static final int VERSION = 1;
78+
private static final long UNKNOWN_OFFSET = -1;
79+
80+
/**
81+
* Deserializes a JSON node to a {@link TableBucketOffsets} object.
82+
*
83+
* <p>This method reads the JSON format and reconstructs the table bucket offsets map. The array
84+
* index in "bucket_offsets" represents the bucket id, and the value represents the offset.
85+
*
86+
* @param node the JSON node to deserialize
87+
* @return the deserialized {@link TableBucketOffsets} object
88+
* @throws IllegalArgumentException if the version is not supported
89+
*/
90+
@Override
91+
public TableBucketOffsets deserialize(JsonNode node) {
92+
int version = node.get(VERSION_KEY).asInt();
93+
if (version != VERSION) {
94+
throw new IllegalArgumentException("Unsupported version: " + version);
95+
}
96+
97+
long tableId = node.get(TABLE_ID_KEY).asLong();
98+
Map<TableBucket, Long> offsets = new HashMap<>();
99+
100+
// Deserialize non-partitioned table bucket offsets
101+
JsonNode bucketOffsetsNode = node.get(BUCKET_OFFSETS_KEY);
102+
JsonNode partitionBucketOffsetsNode = node.get(PARTITION_OFFSETS_KEY);
103+
if (bucketOffsetsNode != null || partitionBucketOffsetsNode != null) {
104+
if (bucketOffsetsNode != null && partitionBucketOffsetsNode != null) {
105+
throw new IllegalArgumentException(
106+
"Both bucket_offsets and partition_bucket_offsets cannot be present at the same time");
107+
}
108+
109+
if (bucketOffsetsNode != null) {
110+
int bucketId = 0;
111+
for (JsonNode bucketOffsetNode : bucketOffsetsNode) {
112+
long offset = bucketOffsetNode.asLong();
113+
// Ignore unknown offsets (filled for missing bucket ids)
114+
if (offset != UNKNOWN_OFFSET) {
115+
TableBucket tableBucket = new TableBucket(tableId, bucketId);
116+
offsets.put(tableBucket, offset);
117+
}
118+
bucketId++;
119+
}
120+
} else {
121+
for (JsonNode partitionOffsetNode : partitionBucketOffsetsNode) {
122+
long partitionId = partitionOffsetNode.get(PARTITION_ID_KEY).asLong();
123+
JsonNode bucketOffsetsArray = partitionOffsetNode.get(BUCKET_OFFSETS_KEY);
124+
if (bucketOffsetsArray != null && bucketOffsetsArray.isArray()) {
125+
Iterator<JsonNode> elements = bucketOffsetsArray.elements();
126+
int bucketId = 0;
127+
while (elements.hasNext()) {
128+
JsonNode offsetNode = elements.next();
129+
long offset = offsetNode.asLong();
130+
// Ignore unknown offsets (filled for missing bucket ids)
131+
if (offset != UNKNOWN_OFFSET) {
132+
TableBucket tableBucket =
133+
new TableBucket(tableId, partitionId, bucketId);
134+
offsets.put(tableBucket, offset);
135+
}
136+
bucketId++;
137+
}
138+
}
139+
}
140+
}
141+
}
142+
143+
return new TableBucketOffsets(tableId, offsets);
144+
}
145+
146+
/**
147+
* Serializes a {@link TableBucketOffsets} object to JSON format.
148+
*
149+
* <p>This method writes the table bucket offsets in the JSON format. It groups buckets by
150+
* partition_id and writes non-partitioned buckets to "bucket_offsets" array and partitioned
151+
* buckets to "partition_offsets" array. The array index represents the bucket id.
152+
*
153+
* <p>This method validates that all buckets in the offsets map have the same table_id as the
154+
* {@link TableBucketOffsets#getTableId()}.
155+
*
156+
* @param tableBucketOffsets the {@link TableBucketOffsets} object to serialize
157+
* @param generator the JSON generator to write to
158+
* @throws IOException if an I/O error occurs during serialization
159+
* @throws IllegalStateException if buckets have inconsistent table IDs
160+
*/
161+
@Override
162+
public void serialize(TableBucketOffsets tableBucketOffsets, JsonGenerator generator)
163+
throws IOException {
164+
generator.writeStartObject();
165+
long expectedTableId = tableBucketOffsets.getTableId();
166+
generator.writeNumberField(VERSION_KEY, VERSION);
167+
generator.writeNumberField(TABLE_ID_KEY, expectedTableId);
168+
169+
Map<TableBucket, Long> offsets = tableBucketOffsets.getOffsets();
170+
if (!offsets.isEmpty()) {
171+
// Group buckets by partition_id and validate table_id consistency
172+
Map<Long, List<TableBucket>> partitionBuckets = new TreeMap<>();
173+
List<TableBucket> nonPartitionBuckets = new ArrayList<>();
174+
175+
for (TableBucket tableBucket : offsets.keySet()) {
176+
// Check that all buckets have the same table_id
177+
checkState(
178+
tableBucket.getTableId() == expectedTableId,
179+
"All buckets must have the same table_id. Expected: %d, but found: %d in bucket: %s",
180+
expectedTableId,
181+
tableBucket.getTableId(),
182+
tableBucket);
183+
184+
if (tableBucket.getPartitionId() != null) {
185+
partitionBuckets
186+
.computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>())
187+
.add(tableBucket);
188+
} else {
189+
nonPartitionBuckets.add(tableBucket);
190+
}
191+
}
192+
193+
// Serialize non-partitioned table bucket offsets
194+
if (!nonPartitionBuckets.isEmpty()) {
195+
checkState(
196+
partitionBuckets.isEmpty(),
197+
"partitionBuckets must be empty when nonPartitionBuckets is not empty");
198+
generator.writeArrayFieldStart(BUCKET_OFFSETS_KEY);
199+
serializeBucketLogEndOffset(offsets, nonPartitionBuckets, generator);
200+
generator.writeEndArray();
201+
} else {
202+
// nonPartitionBuckets is empty, partitionBuckets is must not empty
203+
checkState(
204+
!partitionBuckets.isEmpty(),
205+
"partitionBuckets must be not empty when nonPartitionBuckets is empty");
206+
generator.writeArrayFieldStart(PARTITION_OFFSETS_KEY);
207+
for (Map.Entry<Long, List<TableBucket>> entry : partitionBuckets.entrySet()) {
208+
Long partitionId = entry.getKey();
209+
List<TableBucket> buckets = entry.getValue();
210+
generator.writeStartObject();
211+
generator.writeNumberField(PARTITION_ID_KEY, partitionId);
212+
generator.writeArrayFieldStart(BUCKET_OFFSETS_KEY);
213+
serializeBucketLogEndOffset(offsets, buckets, generator);
214+
generator.writeEndArray();
215+
generator.writeEndObject();
216+
}
217+
generator.writeEndArray();
218+
}
219+
}
220+
221+
generator.writeEndObject();
222+
}
223+
224+
private void serializeBucketLogEndOffset(
225+
Map<TableBucket, Long> bucketLogEndOffset,
226+
List<TableBucket> buckets,
227+
JsonGenerator generator)
228+
throws IOException {
229+
// sort by bucket id
230+
buckets.sort(Comparator.comparingInt(TableBucket::getBucket));
231+
int currentBucketId = 0;
232+
for (TableBucket tableBucket : buckets) {
233+
int bucketId = tableBucket.getBucket();
234+
// Fill null values for missing bucket ids
235+
while (currentBucketId < bucketId) {
236+
generator.writeNumber(UNKNOWN_OFFSET);
237+
currentBucketId++;
238+
}
239+
long logEndOffset = checkNotNull(bucketLogEndOffset.get(tableBucket));
240+
generator.writeNumber(logEndOffset);
241+
currentBucketId++;
242+
}
243+
}
244+
}

0 commit comments

Comments
 (0)