Skip to content

Commit 01a0b3b

Browse files
committed
address comment again
1 parent efc1411 commit 01a0b3b

File tree

7 files changed

+182
-25
lines changed

7 files changed

+182
-25
lines changed

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.IOException;
3131
import java.util.Collections;
3232
import java.util.List;
33+
import java.util.Objects;
3334

3435
import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
3536
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -51,7 +52,7 @@ public class LakeTable {
5152

5253
// Version 2 (current):
5354
// a list of lake snapshot metadata, record the metadata for different lake snapshots
54-
@Nullable private final List<LakeSnapshotMetadata> lakeSnapshotMetadata;
55+
@Nullable private final List<LakeSnapshotMetadata> lakeSnapshotMetadatas;
5556

5657
// Version 1 (legacy): the full lake table snapshot info stored in ZK, will be null in version2
5758
@Nullable private final LakeTableSnapshot lakeTableSnapshot;
@@ -77,30 +78,30 @@ public LakeTable(LakeSnapshotMetadata lakeSnapshotMetadata) {
7778
/**
7879
* Creates a LakeTable with a list of lake snapshot metadata (version 2 format).
7980
*
80-
* @param lakeSnapshotMetadata the list of lake snapshot metadata
81+
* @param lakeSnapshotMetadatas the list of lake snapshot metadata
8182
*/
82-
public LakeTable(List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
83-
this(null, lakeSnapshotMetadata);
83+
public LakeTable(List<LakeSnapshotMetadata> lakeSnapshotMetadatas) {
84+
this(null, lakeSnapshotMetadatas);
8485
}
8586

8687
private LakeTable(
8788
@Nullable LakeTableSnapshot lakeTableSnapshot,
88-
List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
89+
List<LakeSnapshotMetadata> lakeSnapshotMetadatas) {
8990
this.lakeTableSnapshot = lakeTableSnapshot;
90-
this.lakeSnapshotMetadata = lakeSnapshotMetadata;
91+
this.lakeSnapshotMetadatas = lakeSnapshotMetadatas;
9192
}
9293

9394
@Nullable
94-
public LakeSnapshotMetadata getLakeTableLatestSnapshot() {
95-
if (lakeSnapshotMetadata != null && !lakeSnapshotMetadata.isEmpty()) {
96-
return lakeSnapshotMetadata.get(0);
95+
public LakeSnapshotMetadata getLatestLakeSnapshotMetadata() {
96+
if (lakeSnapshotMetadatas != null && !lakeSnapshotMetadatas.isEmpty()) {
97+
return lakeSnapshotMetadatas.get(0);
9798
}
9899
return null;
99100
}
100101

101102
@Nullable
102-
public List<LakeSnapshotMetadata> getLakeSnapshotMetadata() {
103-
return lakeSnapshotMetadata;
103+
public List<LakeSnapshotMetadata> getLakeSnapshotMetadatas() {
104+
return lakeSnapshotMetadatas;
104105
}
105106

106107
/**
@@ -116,7 +117,7 @@ public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
116117
return lakeTableSnapshot;
117118
}
118119
FsPath tieredOffsetsFilePath =
119-
checkNotNull(getLakeTableLatestSnapshot()).tieredOffsetsFilePath;
120+
checkNotNull(getLatestLakeSnapshotMetadata()).tieredOffsetsFilePath;
120121
FSDataInputStream inputStream =
121122
tieredOffsetsFilePath.getFileSystem().open(tieredOffsetsFilePath);
122123
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
@@ -178,5 +179,52 @@ private void delete(FsPath fsPath) {
178179
LOG.warn("Error deleting filePath at {}", fsPath, e);
179180
}
180181
}
182+
183+
@Override
184+
public boolean equals(Object o) {
185+
if (this == o) {
186+
return true;
187+
}
188+
if (!(o instanceof LakeSnapshotMetadata)) {
189+
return false;
190+
}
191+
LakeSnapshotMetadata that = (LakeSnapshotMetadata) o;
192+
return snapshotId == that.snapshotId
193+
&& Objects.equals(tieredOffsetsFilePath, that.tieredOffsetsFilePath)
194+
&& Objects.equals(readableOffsetsFilePath, that.readableOffsetsFilePath);
195+
}
196+
197+
@Override
198+
public int hashCode() {
199+
return Objects.hash(snapshotId, tieredOffsetsFilePath, readableOffsetsFilePath);
200+
}
201+
}
202+
203+
@Override
204+
public boolean equals(Object o) {
205+
if (this == o) {
206+
return true;
207+
}
208+
if (!(o instanceof LakeTable)) {
209+
return false;
210+
}
211+
LakeTable lakeTable = (LakeTable) o;
212+
return Objects.equals(lakeSnapshotMetadatas, lakeTable.lakeSnapshotMetadatas)
213+
&& Objects.equals(lakeTableSnapshot, lakeTable.lakeTableSnapshot);
214+
}
215+
216+
@Override
217+
public int hashCode() {
218+
return Objects.hash(lakeSnapshotMetadatas, lakeTableSnapshot);
219+
}
220+
221+
@Override
222+
public String toString() {
223+
return "LakeTable{"
224+
+ "lakeSnapshotMetadatas="
225+
+ lakeSnapshotMetadatas
226+
+ ", lakeTableSnapshot="
227+
+ lakeTableSnapshot
228+
+ '}';
181229
}
182230
}

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void upsertLakeTable(
9898
if (optPreviousLakeTable.isPresent()) {
9999
// discard previous latest lake snapshot
100100
LakeTable.LakeSnapshotMetadata previousLakeSnapshotMetadata =
101-
optPreviousLakeTable.get().getLakeTableLatestSnapshot();
101+
optPreviousLakeTable.get().getLatestLakeSnapshotMetadata();
102102
if (previousLakeSnapshotMetadata != null) {
103103
previousLakeSnapshotMetadata.discard();
104104
}

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void serialize(LakeTable lakeTable, JsonGenerator generator) throws IOExc
6666

6767
generator.writeArrayFieldStart(LAKE_SNAPSHOTS);
6868
for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata :
69-
checkNotNull(lakeTable.getLakeSnapshotMetadata())) {
69+
checkNotNull(lakeTable.getLakeSnapshotMetadatas())) {
7070
generator.writeStartObject();
7171

7272
generator.writeNumberField(SNAPSHOT_ID_KEY, lakeSnapshotMetadata.getSnapshotId());
@@ -100,22 +100,24 @@ public LakeTable deserialize(JsonNode node) {
100100
"Invalid lake_snapshots field in version 2 format");
101101
}
102102

103-
List<LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadata = new ArrayList<>();
103+
List<LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas = new ArrayList<>();
104104
Iterator<JsonNode> elements = lakeSnapshotsNode.elements();
105105
while (elements.hasNext()) {
106106
JsonNode snapshotNode = elements.next();
107107
long snapshotId = snapshotNode.get(SNAPSHOT_ID_KEY).asLong();
108108
String tieredOffsetsPath = snapshotNode.get(TIERED_OFFSETS_KEY).asText();
109-
String readableOffsetsPath = snapshotNode.get(READABLE_OFFSETS_KEY).asText();
109+
JsonNode readableOffsetsNode = snapshotNode.get(READABLE_OFFSETS_KEY);
110+
FsPath readableOffsetsPath =
111+
readableOffsetsNode != null
112+
? new FsPath(readableOffsetsNode.asText())
113+
: null;
110114

111115
LakeTable.LakeSnapshotMetadata metadata =
112116
new LakeTable.LakeSnapshotMetadata(
113-
snapshotId,
114-
new FsPath(tieredOffsetsPath),
115-
new FsPath(readableOffsetsPath));
116-
lakeSnapshotMetadata.add(metadata);
117+
snapshotId, new FsPath(tieredOffsetsPath), readableOffsetsPath);
118+
lakeSnapshotMetadatas.add(metadata);
117119
}
118-
return new LakeTable(lakeSnapshotMetadata);
120+
return new LakeTable(lakeSnapshotMetadatas);
119121
} else {
120122
throw new IllegalArgumentException("Unsupported version: " + version);
121123
}

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void setup() throws Exception {
168168
conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath());
169169
conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost");
170170
conf.set(ConfigOptions.REMOTE_DATA_DIR, tempDir.getAbsolutePath() + "/remote_data_dir");
171-
conf.set(ConfigOptions.REMOTE_LOG_DATA_TRANSFER_THREAD_NUM, 1);
171+
conf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 2);
172172
// set snapshot interval to 1 seconds for test purpose
173173
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
174174

fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception {
140140
Optional<LakeTable> optLakeTableAfter = zooKeeperClient.getLakeTable(tableId);
141141
assertThat(optLakeTableAfter).isPresent();
142142
LakeTable lakeTableAfter = optLakeTableAfter.get();
143-
assertThat(lakeTableAfter.getLakeTableLatestSnapshot())
143+
assertThat(lakeTableAfter.getLatestLakeSnapshotMetadata())
144144
.isNotNull(); // Version 2 has file path
145145

146146
// Verify: The lake snapshot file exists
147147
FsPath snapshot2FileHandle =
148-
lakeTableAfter.getLakeTableLatestSnapshot().getReadableOffsetsFilePath();
148+
lakeTableAfter.getLatestLakeSnapshotMetadata().getReadableOffsetsFilePath();
149149
FileSystem fileSystem = snapshot2FileHandle.getFileSystem();
150150
assertThat(fileSystem.exists(snapshot2FileHandle)).isTrue();
151151

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.zk.data.lake;
19+
20+
import org.apache.fluss.fs.FsPath;
21+
import org.apache.fluss.utils.json.JsonSerdeTestBase;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.List;
26+
27+
/** Test for {@link LakeTableJsonSerde}. */
28+
class LakeTableJsonSerdeTest extends JsonSerdeTestBase<LakeTable> {
29+
30+
LakeTableJsonSerdeTest() {
31+
super(LakeTableJsonSerde.INSTANCE);
32+
}
33+
34+
@Override
35+
protected LakeTable[] createObjects() {
36+
// Test case 1: Empty lake snapshots list
37+
LakeTable lakeTable1 = new LakeTable(Collections.emptyList());
38+
39+
// Test case 2: Single snapshot metadata with readable offsets
40+
LakeTable.LakeSnapshotMetadata metadata1 =
41+
new LakeTable.LakeSnapshotMetadata(
42+
1L, new FsPath("/path/to/tiered1"), new FsPath("/path/to/readable1"));
43+
LakeTable lakeTable2 = new LakeTable(Collections.singletonList(metadata1));
44+
45+
// Test case 3: Single snapshot metadata without readable offsets
46+
LakeTable.LakeSnapshotMetadata metadata2 =
47+
new LakeTable.LakeSnapshotMetadata(2L, new FsPath("/path/to/tiered2"), null);
48+
LakeTable lakeTable3 = new LakeTable(Collections.singletonList(metadata2));
49+
50+
// Test case 4: Multiple snapshot metadata
51+
List<LakeTable.LakeSnapshotMetadata> metadatas = new ArrayList<>();
52+
metadatas.add(
53+
new LakeTable.LakeSnapshotMetadata(
54+
3L, new FsPath("/path/to/tiered3"), new FsPath("/path/to/readable3")));
55+
metadatas.add(
56+
new LakeTable.LakeSnapshotMetadata(
57+
4L, new FsPath("/path/to/tiered4"), new FsPath("/path/to/readable4")));
58+
metadatas.add(new LakeTable.LakeSnapshotMetadata(5L, new FsPath("/path/to/tiered5"), null));
59+
LakeTable lakeTable4 = new LakeTable(metadatas);
60+
61+
return new LakeTable[] {lakeTable1, lakeTable2, lakeTable3, lakeTable4};
62+
}
63+
64+
@Override
65+
protected String[] expectedJsons() {
66+
return new String[] {
67+
// Test case 1: Empty lake snapshots list
68+
"{\"version\":2,\"lake_snapshots\":[]}",
69+
// Test case 2: Single snapshot metadata with readable offsets
70+
"{\"version\":2,\"lake_snapshots\":[{\"snapshot_id\":1,\"tiered_offsets\":\"/path/to/tiered1\",\"readable_offsets\":\"/path/to/readable1\"}]}",
71+
// Test case 3: Single snapshot metadata without readable offsets
72+
"{\"version\":2,\"lake_snapshots\":[{\"snapshot_id\":2,\"tiered_offsets\":\"/path/to/tiered2\"}]}",
73+
// Test case 4: Multiple snapshot metadata
74+
"{\"version\":2,\"lake_snapshots\":[{\"snapshot_id\":3,\"tiered_offsets\":\"/path/to/tiered3\",\"readable_offsets\":\"/path/to/readable3\"},{\"snapshot_id\":4,\"tiered_offsets\":\"/path/to/tiered4\",\"readable_offsets\":\"/path/to/readable4\"},{\"snapshot_id\":5,\"tiered_offsets\":\"/path/to/tiered5\"}]}"
75+
};
76+
}
77+
}

website/docs/maintenance/operations/upgrade-notes-0.9.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,34 @@ These upgrade notes discuss important aspects, such as configuration, behavior,
1010

1111
## Deprecation / End of Support
1212

13-
TODO
13+
### Configuration Options Deprecated
14+
15+
Several configuration options have been deprecated in Fluss 0.9 and replaced with a unified `server.io-pool.size` option. This change simplifies configuration management by consolidating IO thread pool settings across different components.
16+
17+
🔧 **Action Required**: Update your configuration files to use the new option.
18+
19+
#### Deprecated Options
20+
21+
The following options are deprecated and will be removed in a future version:
22+
23+
| Deprecated Option | Replacement | Description |
24+
|---------------------------------------|-----------------------|------------------------------------------------------------------|
25+
| `coordinator.io-pool.size` | `server.io-pool.size` | The size of the IO thread pool for coordinator server operations |
26+
| `remote.log.data-transfer-thread-num` | `server.io-pool.size` | The number of threads for transferring remote log files |
27+
| `kv.snapshot.transfer-thread-num` | `server.io-pool.size` | The number of threads for transferring KV snapshot files |
28+
29+
#### Migration Steps
30+
31+
1. **Identify deprecated options in your configuration**:
32+
- Check your `server.yaml` configuration file for any of the deprecated options listed above
33+
34+
2. **Replace with the unified option**:
35+
- Remove the deprecated options from your configuration
36+
- Add or update `server.io-pool.size` with an appropriate value
37+
- The default value is `10`, which should work for most use cases
38+
39+
#### Benefits of the Change
40+
41+
- **Simplified Configuration**: One option instead of multiple options for IO thread pool management
42+
- **Better Resource Management**: Unified thread pool allows better resource sharing across different IO operations
43+
- **Consistent Behavior**: All IO operations (remote log, KV snapshot, etc.) now use the same thread pool configuration

0 commit comments

Comments
 (0)