Skip to content

Commit b49b000

Browse files
committed
Address review comments
1 parent b0db17c commit b49b000

File tree

5 files changed

+64
-159
lines changed

5 files changed

+64
-159
lines changed

fluss-flink/fluss-flink-common/pom.xml

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -177,53 +177,6 @@
177177
<type>test-jar</type>
178178
</dependency>
179179

180-
<dependency>
181-
<groupId>org.apache.hadoop</groupId>
182-
<artifactId>hadoop-hdfs-client</artifactId>
183-
<version>${fluss.hadoop.version}</version>
184-
<scope>test</scope>
185-
</dependency>
186-
<dependency>
187-
<groupId>org.apache.hadoop</groupId>
188-
<artifactId>hadoop-common</artifactId>
189-
<scope>test</scope>
190-
<exclusions>
191-
<exclusion>
192-
<artifactId>avro</artifactId>
193-
<groupId>org.apache.avro</groupId>
194-
</exclusion>
195-
<exclusion>
196-
<artifactId>log4j</artifactId>
197-
<groupId>log4j</groupId>
198-
</exclusion>
199-
<exclusion>
200-
<artifactId>slf4j-log4j12</artifactId>
201-
<groupId>org.slf4j</groupId>
202-
</exclusion>
203-
<exclusion>
204-
<groupId>ch.qos.reload4j</groupId>
205-
<artifactId>reload4j</artifactId>
206-
</exclusion>
207-
<exclusion>
208-
<groupId>org.slf4j</groupId>
209-
<artifactId>slf4j-reload4j</artifactId>
210-
</exclusion>
211-
<exclusion>
212-
<artifactId>jdk.tools</artifactId>
213-
<groupId>jdk.tools</groupId>
214-
</exclusion>
215-
<exclusion>
216-
<artifactId>protobuf-java</artifactId>
217-
<groupId>com.google.protobuf</groupId>
218-
</exclusion>
219-
<exclusion>
220-
<artifactId>commons-io</artifactId>
221-
<groupId>commons-io</groupId>
222-
</exclusion>
223-
</exclusions>
224-
</dependency>
225-
226-
227180
</dependencies>
228181

229182
<build>

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import static com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper.heartBeatWithRequestNewTieringTable;
6565
import static com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper.tieringTableHeartBeat;
6666
import static com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper.waitHeartbeatResponse;
67-
import static com.alibaba.fluss.utils.Preconditions.checkState;
6867

6968
/**
7069
* An implementation of {@link SplitEnumerator} used to request {@link TieringSplit} from Fluss
@@ -169,8 +168,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
169168
public void addSplitsBack(List<TieringSplit> splits, int subtaskId) {
170169
readersAwaitingSplit.add(subtaskId);
171170
pendingSplits.addAll(splits);
172-
this.context.callAsync(
173-
this::requestTieringTableSplitsViaHeartBeat, this::generateAndAssignSplits);
171+
assignSplits();
174172
}
175173

176174
@Override
@@ -187,8 +185,10 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
187185
}
188186

189187
private void generateAndAssignSplits(
190-
Tuple3<Long, Long, TablePath> tieringTable, Throwable throwable) {
191-
checkState(throwable == null, "Failed to request tiering table due to:", throwable);
188+
@Nullable Tuple3<Long, Long, TablePath> tieringTable, Throwable throwable) {
189+
if (throwable != null) {
190+
LOG.warn("Failed to request tiering table, will retry later.", throwable);
191+
}
192192
if (tieringTable != null) {
193193
generateTieringSplits(tieringTable);
194194
}
@@ -212,7 +212,7 @@ private void assignSplits() {
212212
}
213213
}
214214

215-
private Tuple3<Long, Long, TablePath> requestTieringTableSplitsViaHeartBeat() {
215+
private @Nullable Tuple3<Long, Long, TablePath> requestTieringTableSplitsViaHeartBeat() {
216216
LakeTieringHeartbeatRequest tieringHeartbeatRequest =
217217
tieringTableHeartBeat(
218218
basicHeartBeat(), this.tieringTableEpochs, this.flussCoordinatorEpoch);

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TieringTestBase.java

Lines changed: 11 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,10 @@
3838
import com.alibaba.fluss.row.encode.KeyEncoder;
3939
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
4040
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
41-
import com.alibaba.fluss.server.zk.ZooKeeperClient;
4241
import com.alibaba.fluss.types.DataTypes;
4342
import com.alibaba.fluss.types.RowType;
4443

4544
import org.apache.flink.test.util.AbstractTestBase;
46-
import org.apache.paimon.catalog.Catalog;
47-
import org.apache.paimon.catalog.CatalogContext;
48-
import org.apache.paimon.catalog.CatalogFactory;
49-
import org.apache.paimon.options.Options;
5045
import org.junit.jupiter.api.AfterAll;
5146
import org.junit.jupiter.api.BeforeAll;
5247
import org.junit.jupiter.api.BeforeEach;
@@ -58,11 +53,8 @@
5853
import java.time.Duration;
5954
import java.util.HashMap;
6055
import java.util.Map;
61-
import java.util.Optional;
6256

63-
import static com.alibaba.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
6457
import static com.alibaba.fluss.testutils.DataTestUtils.row;
65-
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
6658

6759
/** A base class for testing {@link TieringSource} with Fluss cluster prepared. */
6860
public class TieringTestBase extends AbstractTestBase {
@@ -153,7 +145,6 @@ public class TieringTestBase extends AbstractTestBase {
153145
protected static Connection conn;
154146
protected static Admin admin;
155147
protected static CoordinatorGateway coordinatorGateway;
156-
protected static Catalog paimonCatalog;
157148

158149
protected static Configuration clientConf;
159150
protected static String bootstrapServers;
@@ -206,10 +197,6 @@ private static Configuration flussClusterConfig() {
206197
}
207198
conf.setString("datalake.paimon.warehouse", warehousePath);
208199

209-
paimonCatalog =
210-
CatalogFactory.createCatalog(
211-
CatalogContext.create(Options.fromMap(extractLakeProperties(conf))));
212-
213200
return conf;
214201
}
215202

@@ -236,10 +223,10 @@ protected void waitUntilSnapshot(long tableId, long snapshotId) {
236223
}
237224

238225
protected void waitUntilPartitionTableSnapshot(
239-
long tableId, Map<Long, String> partitionNameByIds, long snapshotId) {
240-
for (Map.Entry<Long, String> entry : partitionNameByIds.entrySet()) {
226+
long tableId, Map<String, Long> partitionNameByIds, long snapshotId) {
227+
for (Map.Entry<String, Long> entry : partitionNameByIds.entrySet()) {
241228
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
242-
TableBucket tableBucket = new TableBucket(tableId, entry.getKey(), i);
229+
TableBucket tableBucket = new TableBucket(tableId, entry.getValue(), i);
243230
FLUSS_CLUSTER_EXTENSION.waitUtilSnapshotFinished(tableBucket, snapshotId);
244231
}
245232
}
@@ -252,53 +239,19 @@ protected void waitUntilSnapshot(long tableId, @Nullable Long partitionId, long
252239
}
253240
}
254241

255-
/**
256-
* Wait until the default number of partitions is created. Return the map from partition id to
257-
* partition name. .
258-
*/
259-
public static Map<Long, String> waitUntilPartitions(
260-
ZooKeeperClient zooKeeperClient, TablePath tablePath) {
261-
return waitUntilPartitions(
262-
zooKeeperClient,
263-
tablePath,
264-
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
265-
}
266-
267-
/**
268-
* Wait until the given number of partitions is created. Return the map from partition id to
269-
* partition name.
270-
*/
271-
public static Map<Long, String> waitUntilPartitions(
272-
ZooKeeperClient zooKeeperClient, TablePath tablePath, int expectPartitions) {
273-
return waitValue(
274-
() -> {
275-
Map<Long, String> gotPartitions =
276-
zooKeeperClient.getPartitionIdAndNames(tablePath);
277-
return expectPartitions == gotPartitions.size()
278-
? Optional.of(gotPartitions)
279-
: Optional.empty();
280-
},
281-
Duration.ofMinutes(1),
282-
String.format("expect %d table partition has not been created", expectPartitions));
283-
}
284-
285242
protected static Map<Long, Map<Integer, Long>> upsertRowForPartitionedTable(
286243
TablePath tablePath,
287244
TableDescriptor tableDescriptor,
288-
Map<Long, String> partitionNameByIds,
245+
Map<String, Long> partitionNameByIds,
289246
int pkStart,
290247
int rowsNum)
291248
throws Exception {
292249
Map<Long, Map<Integer, Long>> bucketRows = new HashMap<>();
293-
for (Map.Entry<Long, String> partitionEntry : partitionNameByIds.entrySet()) {
250+
for (Map.Entry<String, Long> partitionEntry : partitionNameByIds.entrySet()) {
294251
Map<Integer, Long> bucketRowsForPartition =
295252
upsertRow(
296-
tablePath,
297-
tableDescriptor,
298-
pkStart,
299-
rowsNum,
300-
partitionEntry.getValue());
301-
bucketRows.put(partitionEntry.getKey(), bucketRowsForPartition);
253+
tablePath, tableDescriptor, pkStart, rowsNum, partitionEntry.getKey());
254+
bucketRows.put(partitionEntry.getValue(), bucketRowsForPartition);
302255
}
303256
return bucketRows;
304257
}
@@ -343,20 +296,16 @@ protected static Map<Integer, Long> upsertRow(
343296
protected static Map<Long, Map<Integer, Long>> appendRowForPartitionedTable(
344297
TablePath tablePath,
345298
TableDescriptor tableDescriptor,
346-
Map<Long, String> partitionNameByIds,
299+
Map<String, Long> partitionNameByIds,
347300
int pkStart,
348301
int rowsNum)
349302
throws Exception {
350303
Map<Long, Map<Integer, Long>> bucketRows = new HashMap<>();
351-
for (Map.Entry<Long, String> partitionEntry : partitionNameByIds.entrySet()) {
304+
for (Map.Entry<String, Long> partitionEntry : partitionNameByIds.entrySet()) {
352305
Map<Integer, Long> bucketRowsForPartition =
353306
appendRow(
354-
tablePath,
355-
tableDescriptor,
356-
pkStart,
357-
rowsNum,
358-
partitionEntry.getValue());
359-
bucketRows.put(partitionEntry.getKey(), bucketRowsForPartition);
307+
tablePath, tableDescriptor, pkStart, rowsNum, partitionEntry.getKey());
308+
bucketRows.put(partitionEntry.getValue(), bucketRowsForPartition);
360309
}
361310
return bucketRows;
362311
}

0 commit comments

Comments
 (0)