Skip to content

Commit e543d1c

Browse files
authored
[hotfix] Refactor fluss-offsets naming in lake snapshot to align with origin design (#1772)
1 parent 8dfd695 commit e543d1c

File tree

7 files changed

+63
-14
lines changed

7 files changed

+63
-14
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public class BucketOffsetJsonSerde
2929

3030
public static final BucketOffsetJsonSerde INSTANCE = new BucketOffsetJsonSerde();
3131
private static final String PARTITION_ID = "partition_id";
32-
private static final String BUCKET_ID = "bucket_id";
32+
private static final String BUCKET_ID = "bucket";
3333
private static final String PARTITION_NAME = "partition_name";
34-
private static final String LOG_OFFSET = "log_offset";
34+
private static final String LOG_OFFSET = "offset";
3535

3636
@Override
3737
public BucketOffset deserialize(JsonNode node) {

fluss-common/src/test/java/org/apache/fluss/utils/json/BucketOffsetJsonSerdeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ protected BucketOffset[] createObjects() {
3737
@Override
3838
protected String[] expectedJsons() {
3939
return new String[] {
40-
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10}",
41-
"{\"bucket_id\":2,\"log_offset\":20}"
40+
"{\"partition_id\":1,\"bucket\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"offset\":10}",
41+
"{\"bucket\":2,\"offset\":20}"
4242
};
4343
}
4444
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ void testTiering() throws Exception {
188188
{
189189
put(
190190
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
191-
"[{\"bucket_id\":0,\"log_offset\":3}]");
191+
"[{\"bucket\":0,\"offset\":3}]");
192192
}
193193
};
194194
checkSnapshotPropertyInIceberg(t1, properties);
@@ -402,8 +402,8 @@ private void testPartitionedTableTiering() throws Exception {
402402
put(
403403
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
404404
"["
405-
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
406-
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
405+
+ "{\"partition_id\":0,\"bucket\":0,\"partition_name\":\"date=2025\",\"offset\":3},"
406+
+ "{\"partition_id\":1,\"bucket\":0,\"partition_name\":\"date=2026\",\"offset\":3}"
407407
+ "]");
408408
}
409409
};

fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ void testTiering() throws Exception {
102102
{
103103
put(
104104
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
105-
"[{\"bucket_id\":0,\"log_offset\":30}]");
105+
"[{\"bucket\":0,\"offset\":30}]");
106106
put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
107107
}
108108
};

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void testTiering() throws Exception {
104104
{
105105
put(
106106
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
107-
"[{\"bucket_id\":0,\"log_offset\":3}]");
107+
"[{\"bucket\":0,\"offset\":3}]");
108108
}
109109
};
110110
checkSnapshotPropertyInPaimon(t1, properties);
@@ -219,7 +219,7 @@ void testTieringForAlterTable() throws Exception {
219219
{
220220
put(
221221
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
222-
"[{\"bucket_id\":0,\"log_offset\":3}]");
222+
"[{\"bucket\":0,\"offset\":3}]");
223223
}
224224
};
225225
checkSnapshotPropertyInPaimon(t1, properties);
@@ -315,7 +315,7 @@ void testTieringForAlterTable() throws Exception {
315315

316316
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
317317
String raw =
318-
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
318+
"{\"partition_id\":%s,\"bucket\":0,\"partition_name\":\"date=%s\",\"offset\":3}";
319319
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
320320
Collections.sort(partitionIds);
321321
List<String> partitionOffsetStrs = new ArrayList<>();

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,8 @@ void testThreePartitionTiering() throws Exception {
389389
String offsetProperty = getSnapshotLogOffsetProperty(tablePath, snapshot);
390390
assertThat(offsetProperty)
391391
.isEqualTo(
392-
"[{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"region=us-east/year=2024/month=01\",\"log_offset\":2},"
393-
+ "{\"partition_id\":2,\"bucket_id\":0,\"partition_name\":\"region=eu-central/year=2023/month=12\",\"log_offset\":2}]");
392+
"[{\"partition_id\":1,\"bucket\":0,\"partition_name\":\"region=us-east/year=2024/month=01\",\"offset\":2},"
393+
+ "{\"partition_id\":2,\"bucket\":0,\"partition_name\":\"region=eu-central/year=2023/month=12\",\"offset\":2}]");
394394

395395
// Verify data for each partition
396396
for (String partition : partitionIdAndName.values()) {

website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,53 @@ The following table shows the mapping between [Fluss data types](table-design/da
174174
| TIMESTAMP | TIMESTAMP |
175175
| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP WITH LOCAL TIMEZONE |
176176
| BINARY | BINARY |
177-
| BYTES | BYTES |
177+
| BYTES | BYTES |
178+
179+
## 📊 Snapshot Metadata
180+
181+
Fluss adds specific metadata to Paimon snapshots for traceability:
182+
183+
- **commit-user**: Set to `__fluss_lake_tiering` to identify Fluss-generated snapshots
184+
- **fluss-offsets**: JSON string containing the Fluss bucket offset mapping to track the tiering progress
185+
186+
#### Non-Partitioned Tables
187+
188+
For non-partitioned tables, the metadata structure of `fluss-offsets` is:
189+
190+
```json
191+
[
192+
{"bucket": 0, "offset": 1234},
193+
{"bucket": 1, "offset": 5678},
194+
{"bucket": 2, "offset": 9012}
195+
]
196+
```
197+
198+
#### Partitioned Tables
199+
200+
For partitioned tables, the metadata structure includes partition information:
201+
202+
```json
203+
[
204+
{
205+
"partition_name": "date=2025",
206+
"partition_id": 0,
207+
"bucket": 0,
208+
"offset": 3
209+
},
210+
{
211+
"partition_name": "date=2025",
212+
"partition_id": 1,
213+
"bucket": 0,
214+
"offset": 3
215+
}
216+
]
217+
```
218+
219+
#### Metadata Fields Explanation
220+
221+
| Field | Description | Example |
222+
|------------------|----------------------------------------------|------------------------------|
223+
| `partition_id` | Unique identifier in Fluss for the partition | `0`, `1` |
224+
| `bucket` | Bucket identifier within the partition | `0`, `1`, `2` |
225+
| `partition_name` | Human-readable partition name | `"date=2025"`, `"date=2026"` |
226+
| `offset` | Offset within the partition's log | `3`, `1000` |

0 commit comments

Comments
 (0)