Skip to content

Commit c5257fd

Browse files
authored
[client] Get TableInfo via the Admin API instead of via metadata updater (#2016)
1 parent e70d743 commit c5257fd

File tree

22 files changed

+170
-155
lines changed

22 files changed

+170
-155
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.exception.FlussRuntimeException;
3535
import org.apache.fluss.fs.FileSystem;
36-
import org.apache.fluss.metadata.TableInfo;
3736
import org.apache.fluss.metadata.TablePath;
3837
import org.apache.fluss.metrics.registry.MetricRegistry;
3938
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -61,6 +60,7 @@ public final class FlussConnection implements Connection {
6160
private volatile LookupClient lookupClient;
6261
private volatile RemoteFileDownloader remoteFileDownloader;
6362
private volatile SecurityTokenManager securityTokenManager;
63+
private volatile Admin admin;
6464

6565
FlussConnection(Configuration conf) {
6666
this(conf, MetricRegistry.create(conf, null));
@@ -93,19 +93,16 @@ public Configuration getConfiguration() {
9393

9494
@Override
9595
public Admin getAdmin() {
96-
return new FlussAdmin(rpcClient, metadataUpdater);
96+
return getOrCreateAdmin();
9797
}
9898

9999
@Override
100100
public Table getTable(TablePath tablePath) {
101-
// force to update the table info from server to avoid stale data in cache
101+
// force to update the table info from server to avoid stale data in cache.
102102
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
103-
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
104-
return new FlussTable(this, tablePath, tableInfo);
105-
}
106103

107-
public RpcClient getRpcClient() {
108-
return rpcClient;
104+
Admin admin = getOrCreateAdmin();
105+
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
109106
}
110107

111108
public MetadataUpdater getMetadataUpdater() {
@@ -140,6 +137,17 @@ public LookupClient getOrCreateLookupClient() {
140137
return lookupClient;
141138
}
142139

140+
public Admin getOrCreateAdmin() {
141+
if (admin == null) {
142+
synchronized (this) {
143+
if (admin == null) {
144+
admin = new FlussAdmin(rpcClient, metadataUpdater);
145+
}
146+
}
147+
}
148+
return admin;
149+
}
150+
143151
public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
144152
if (remoteFileDownloader == null) {
145153
synchronized (this) {

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,15 +420,16 @@ private ListOffsetsResult listOffsets(
420420
OffsetSpec offsetSpec) {
421421
Long partitionId = null;
422422
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(), null);
423-
long tableId = metadataUpdater.getTableId(physicalTablePath.getTablePath());
423+
TableInfo tableInfo = getTableInfo(physicalTablePath.getTablePath()).join();
424+
424425
// if partition name is not null, we need to check and update partition metadata
425426
if (physicalTablePath.getPartitionName() != null) {
426427
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
427428
partitionId = metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
428429
}
429430
Map<Integer, ListOffsetsRequest> requestMap =
430431
prepareListOffsetsRequests(
431-
metadataUpdater, tableId, partitionId, buckets, offsetSpec);
432+
metadataUpdater, tableInfo.getTableId(), partitionId, buckets, offsetSpec);
432433
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
433434
for (int bucket : buckets) {
434435
bucketToOffsetMap.put(bucket, new CompletableFuture<>());

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.fluss.exception.StaleMetadataException;
3333
import org.apache.fluss.metadata.PhysicalTablePath;
3434
import org.apache.fluss.metadata.TableBucket;
35-
import org.apache.fluss.metadata.TableInfo;
3635
import org.apache.fluss.metadata.TablePartition;
3736
import org.apache.fluss.metadata.TablePath;
3837
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -87,10 +86,6 @@ public Cluster getCluster() {
8786
return cluster.getCoordinatorServer();
8887
}
8988

90-
public long getTableId(TablePath tablePath) {
91-
return cluster.getTableId(tablePath);
92-
}
93-
9489
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
9590
return cluster.getPartitionId(physicalTablePath);
9691
}
@@ -99,26 +94,10 @@ public Long getPartitionIdOrElseThrow(PhysicalTablePath physicalTablePath) {
9994
return cluster.getPartitionIdOrElseThrow(physicalTablePath);
10095
}
10196

102-
public TableInfo getTableInfoOrElseThrow(TablePath tablePath) {
103-
return cluster.getTableOrElseThrow(tablePath);
104-
}
105-
10697
public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
10798
return cluster.getBucketLocation(tableBucket);
10899
}
109100

110-
private Optional<TableInfo> getTableInfo(TablePath tablePath) {
111-
return cluster.getTable(tablePath);
112-
}
113-
114-
public TableInfo getTableInfoOrElseThrow(long tableId) {
115-
return getTableInfo(cluster.getTablePathOrElseThrow(tableId))
116-
.orElseThrow(
117-
() ->
118-
new FlussRuntimeException(
119-
"Table not found for table id: " + tableId));
120-
}
121-
122101
public int leaderFor(TableBucket tableBucket) {
123102
Integer serverNode = cluster.leaderFor(tableBucket);
124103
if (serverNode == null) {

fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
4545
private final LogFormat logFormat;
4646
private final IndexedRowEncoder indexedRowEncoder;
4747
private final FieldGetter[] fieldGetters;
48+
private final TableInfo tableInfo;
4849

4950
AppendWriterImpl(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
5051
super(tablePath, tableInfo, writerClient);
@@ -60,6 +61,7 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
6061
this.logFormat = tableInfo.getTableConfig().getLogFormat();
6162
this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
6263
this.fieldGetters = InternalRow.createFieldGetters(tableInfo.getRowType());
64+
this.tableInfo = tableInfo;
6365
}
6466

6567
/**
@@ -77,10 +79,10 @@ public CompletableFuture<AppendResult> append(InternalRow row) {
7779
final WriteRecord record;
7880
if (logFormat == LogFormat.INDEXED) {
7981
IndexedRow indexedRow = encodeIndexedRow(row);
80-
record = WriteRecord.forIndexedAppend(physicalPath, indexedRow, bucketKey);
82+
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey);
8183
} else {
8284
// ARROW format supports general internal row
83-
record = WriteRecord.forArrowAppend(physicalPath, row, bucketKey);
85+
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey);
8486
}
8587
return send(record).thenApply(ignored -> APPEND_SUCCESS);
8688
}

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
5252
private final KvFormat kvFormat;
5353
private final RowEncoder rowEncoder;
5454
private final FieldGetter[] fieldGetters;
55+
private final TableInfo tableInfo;
5556

5657
UpsertWriterImpl(
5758
TablePath tablePath,
@@ -75,6 +76,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7576
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
7677
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
7778
this.fieldGetters = InternalRow.createFieldGetters(rowType);
79+
this.tableInfo = tableInfo;
7880
}
7981

8082
private static void sanityCheck(
@@ -129,7 +131,12 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
129131
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
130132
WriteRecord record =
131133
WriteRecord.forUpsert(
132-
getPhysicalPath(row), encodeRow(row), key, bucketKey, targetColumns);
134+
tableInfo,
135+
getPhysicalPath(row),
136+
encodeRow(row),
137+
key,
138+
bucketKey,
139+
targetColumns);
133140
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
134141
}
135142

@@ -146,7 +153,8 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
146153
byte[] bucketKey =
147154
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
148155
WriteRecord record =
149-
WriteRecord.forDelete(getPhysicalPath(row), key, bucketKey, targetColumns);
156+
WriteRecord.forDelete(
157+
tableInfo, getPhysicalPath(row), key, bucketKey, targetColumns);
150158
return send(record).thenApply(ignored -> DELETE_SUCCESS);
151159
}
152160

fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ public ArrowLogWriteBatch(
6262
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true);
6363
}
6464

65+
@Override
66+
public boolean isLogBatch() {
67+
return true;
68+
}
69+
6570
@Override
6671
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
6772
InternalRow row = writeRecord.getRow();

fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
2525
import org.apache.fluss.metadata.ResolvedPartitionSpec;
26-
import org.apache.fluss.metadata.TableInfo;
2726
import org.apache.fluss.metadata.TablePath;
2827
import org.apache.fluss.utils.ExceptionUtils;
2928

@@ -64,7 +63,8 @@ public DynamicPartitionCreator(
6463
this.fatalErrorHandler = fatalErrorHandler;
6564
}
6665

67-
public void checkAndCreatePartitionAsync(PhysicalTablePath physicalTablePath) {
66+
public void checkAndCreatePartitionAsync(
67+
PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
6868
String partitionName = physicalTablePath.getPartitionName();
6969
if (partitionName == null) {
7070
// no need to check and create partition
@@ -89,7 +89,7 @@ public void checkAndCreatePartitionAsync(PhysicalTablePath physicalTablePath) {
8989
// if the partition is not in inflightPartitionsToCreate, we should create it.
9090
// this means that the partition is not being created by other threads.
9191
LOG.info("Dynamically creating partition partition for {}", physicalTablePath);
92-
createPartition(physicalTablePath);
92+
createPartition(physicalTablePath, partitionKeys);
9393
} else {
9494
// if the partition is already in inflightPartitionsToCreate, we should skip
9595
// creating it.
@@ -121,12 +121,10 @@ private boolean forceCheckPartitionExist(PhysicalTablePath physicalTablePath) {
121121
return idExist;
122122
}
123123

124-
private void createPartition(PhysicalTablePath physicalTablePath) {
124+
private void createPartition(PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
125125
String partitionName = physicalTablePath.getPartitionName();
126126
TablePath tablePath = physicalTablePath.getTablePath();
127127
checkArgument(partitionName != null, "Partition name shouldn't be null.");
128-
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
129-
List<String> partitionKeys = tableInfo.getPartitionKeys();
130128
ResolvedPartitionSpec resolvedPartitionSpec =
131129
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName);
132130

fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public IndexedLogWriteBatch(
6161
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView, true);
6262
}
6363

64+
@Override
65+
public boolean isLogBatch() {
66+
return true;
67+
}
68+
6469
@Override
6570
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
6671
checkNotNull(callback, "write callback must be not null");

fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public KvWriteBatch(
6767
this.targetColumns = targetColumns;
6868
}
6969

70+
@Override
71+
public boolean isLogBatch() {
72+
return false;
73+
}
74+
7075
@Override
7176
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
7277
// currently, we throw exception directly when the target columns of the write record is

0 commit comments

Comments
 (0)