Skip to content

Commit abb3b86

Browse files
authored
[client] Support dynamically create partition when writing (#1002)
1 parent ed42b36 commit abb3b86

31 files changed

+665
-249
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,9 @@ public WriterClient getOrCreateWriterClient() {
119119
if (writerClient == null) {
120120
synchronized (this) {
121121
if (writerClient == null) {
122-
writerClient = new WriterClient(conf, metadataUpdater, clientMetricGroup);
122+
writerClient =
123+
new WriterClient(
124+
conf, metadataUpdater, clientMetricGroup, this.getAdmin());
123125
}
124126
}
125127
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.client.metadata.KvSnapshots;
2222
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2323
import com.alibaba.fluss.cluster.ServerNode;
24+
import com.alibaba.fluss.config.ConfigOptions;
2425
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2526
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2627
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -36,6 +37,8 @@
3637
import com.alibaba.fluss.exception.TableAlreadyExistException;
3738
import com.alibaba.fluss.exception.TableNotExistException;
3839
import com.alibaba.fluss.exception.TableNotPartitionedException;
40+
import com.alibaba.fluss.exception.TooManyBucketsException;
41+
import com.alibaba.fluss.exception.TooManyPartitionsException;
3942
import com.alibaba.fluss.metadata.DatabaseDescriptor;
4043
import com.alibaba.fluss.metadata.DatabaseInfo;
4144
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -255,6 +258,10 @@ CompletableFuture<Void> createTable(
255258
* <li>{@link PartitionAlreadyExistsException} if the partition already exists and {@code
256259
* ignoreIfExists} is false.
257260
* <li>{@link InvalidPartitionException} if the input partition spec is invalid.
261+
* <li>{@link TooManyPartitionsException} if the number of partitions is larger than the
262+
* maximum number of partitions of one table, see {@link ConfigOptions#MAX_PARTITION_NUM}.
263+
* <li>{@link TooManyBucketsException} if the number of buckets is larger than the maximum
264+
* number of buckets of one table, see {@link ConfigOptions#MAX_BUCKET_NUM}.
258265
* </ul>
259266
*
260267
* @param tablePath The table path of the table.

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ public long getTableId(TablePath tablePath) {
8686
return cluster.getTableId(tablePath);
8787
}
8888

89-
public Long getPartitionIdOrElseThrow(PhysicalTablePath physicalTablePath) {
90-
return cluster.getPartitionIdOrElseThrow(physicalTablePath);
89+
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
90+
return cluster.getPartitionId(physicalTablePath);
9191
}
9292

93-
public String getPartitionNameOrElseThrow(long partitionId) {
94-
return cluster.getPartitionNameOrElseThrow(partitionId);
93+
public Long getPartitionIdOrElseThrow(PhysicalTablePath physicalTablePath) {
94+
return cluster.getPartitionIdOrElseThrow(physicalTablePath);
9595
}
9696

9797
public TableInfo getTableInfoOrElseThrow(TablePath tablePath) {
@@ -186,10 +186,11 @@ public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
186186
*
187187
* <p>and update partition metadata .
188188
*/
189-
public void checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePath) {
189+
public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePath) {
190190
if (!cluster.getPartitionId(physicalTablePath).isPresent()) {
191191
updateMetadata(null, Collections.singleton(physicalTablePath), null);
192192
}
193+
return cluster.getPartitionId(physicalTablePath).isPresent();
193194
}
194195

195196
/**

fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ public Append newAppend() {
7373
!hasPrimaryKey,
7474
"Table %s is not a Log Table and doesn't support AppendWriter.",
7575
tablePath);
76-
return new TableAppend(
77-
tablePath, tableInfo, conn.getMetadataUpdater(), conn.getOrCreateWriterClient());
76+
return new TableAppend(tablePath, tableInfo, conn.getOrCreateWriterClient());
7877
}
7978

8079
@Override
@@ -83,8 +82,7 @@ public Upsert newUpsert() {
8382
hasPrimaryKey,
8483
"Table %s is not a Primary Key Table and doesn't support UpsertWriter.",
8584
tablePath);
86-
return new TableUpsert(
87-
tablePath, tableInfo, conn.getMetadataUpdater(), conn.getOrCreateWriterClient());
85+
return new TableUpsert(tablePath, tableInfo, conn.getOrCreateWriterClient());
8886
}
8987

9088
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/AbstractTableWriter.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.client.table.writer;
1818

19-
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2019
import com.alibaba.fluss.client.table.getter.PartitionGetter;
2120
import com.alibaba.fluss.client.write.WriteRecord;
2221
import com.alibaba.fluss.client.write.WriterClient;
@@ -38,21 +37,16 @@ public abstract class AbstractTableWriter implements TableWriter {
3837
protected final WriterClient writerClient;
3938
protected final int fieldCount;
4039
private final @Nullable PartitionGetter partitionFieldGetter;
41-
private final MetadataUpdater metadataUpdater;
4240

4341
protected AbstractTableWriter(
44-
TablePath tablePath,
45-
TableInfo tableInfo,
46-
MetadataUpdater metadataUpdater,
47-
WriterClient writerClient) {
42+
TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
4843
this.tablePath = tablePath;
4944
this.writerClient = writerClient;
5045
this.fieldCount = tableInfo.getRowType().getFieldCount();
5146
this.partitionFieldGetter =
5247
tableInfo.isPartitioned()
5348
? new PartitionGetter(tableInfo.getRowType(), tableInfo.getPartitionKeys())
5449
: null;
55-
this.metadataUpdater = metadataUpdater;
5650
}
5751

5852
/**
@@ -87,10 +81,7 @@ protected PhysicalTablePath getPhysicalPath(InternalRow row) {
8781
} else {
8882
// partitioned table, extract partition from the row
8983
String partition = partitionFieldGetter.getPartition(row);
90-
PhysicalTablePath partitionPath = PhysicalTablePath.of(tablePath, partition);
91-
// may update partition info
92-
metadataUpdater.checkAndUpdatePartitionMetadata(partitionPath);
93-
return partitionPath;
84+
return PhysicalTablePath.of(tablePath, partition);
9485
}
9586
}
9687

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.client.table.writer;
1818

19-
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2019
import com.alibaba.fluss.client.write.WriteRecord;
2120
import com.alibaba.fluss.client.write.WriterClient;
2221
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -46,12 +45,8 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
4645
private final IndexedRowEncoder indexedRowEncoder;
4746
private final FieldGetter[] fieldGetters;
4847

49-
AppendWriterImpl(
50-
TablePath tablePath,
51-
TableInfo tableInfo,
52-
MetadataUpdater metadataUpdater,
53-
WriterClient writerClient) {
54-
super(tablePath, tableInfo, metadataUpdater, writerClient);
48+
AppendWriterImpl(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
49+
super(tablePath, tableInfo, writerClient);
5550
List<String> bucketKeys = tableInfo.getBucketKeys();
5651
if (bucketKeys.isEmpty()) {
5752
this.bucketKeyEncoder = null;
@@ -86,7 +81,7 @@ record = WriteRecord.forIndexedAppend(physicalPath, indexedRow, bucketKey);
8681
// ARROW format supports general internal row
8782
record = WriteRecord.forArrowAppend(physicalPath, row, bucketKey);
8883
}
89-
return send(record).thenApply(r -> APPEND_SUCCESS);
84+
return send(record).thenApply(ignored -> APPEND_SUCCESS);
9085
}
9186

9287
private IndexedRow encodeIndexedRow(InternalRow row) {

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/TableAppend.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.client.table.writer;
1818

19-
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2019
import com.alibaba.fluss.client.write.WriterClient;
2120
import com.alibaba.fluss.metadata.TableInfo;
2221
import com.alibaba.fluss.metadata.TablePath;
@@ -26,22 +25,16 @@ public class TableAppend implements Append {
2625

2726
private final TablePath tablePath;
2827
private final TableInfo tableInfo;
29-
private final MetadataUpdater metadataUpdater;
3028
private final WriterClient writerClient;
3129

32-
public TableAppend(
33-
TablePath tablePath,
34-
TableInfo tableInfo,
35-
MetadataUpdater metadataUpdater,
36-
WriterClient writerClient) {
30+
public TableAppend(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
3731
this.tablePath = tablePath;
3832
this.tableInfo = tableInfo;
39-
this.metadataUpdater = metadataUpdater;
4033
this.writerClient = writerClient;
4134
}
4235

4336
@Override
4437
public AppendWriter createWriter() {
45-
return new AppendWriterImpl(tablePath, tableInfo, metadataUpdater, writerClient);
38+
return new AppendWriterImpl(tablePath, tableInfo, writerClient);
4639
}
4740
}

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/TableUpsert.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.client.table.writer;
1818

19-
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2019
import com.alibaba.fluss.client.write.WriterClient;
2120
import com.alibaba.fluss.metadata.TableInfo;
2221
import com.alibaba.fluss.metadata.TablePath;
@@ -32,28 +31,21 @@ public class TableUpsert implements Upsert {
3231
private final TablePath tablePath;
3332
private final TableInfo tableInfo;
3433
private final WriterClient writerClient;
35-
private final MetadataUpdater metadataUpdater;
3634

3735
private final @Nullable int[] targetColumns;
3836

39-
public TableUpsert(
40-
TablePath tablePath,
41-
TableInfo tableInfo,
42-
MetadataUpdater metadataUpdater,
43-
WriterClient writerClient) {
44-
this(tablePath, tableInfo, metadataUpdater, writerClient, null);
37+
public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
38+
this(tablePath, tableInfo, writerClient, null);
4539
}
4640

4741
private TableUpsert(
4842
TablePath tablePath,
4943
TableInfo tableInfo,
50-
MetadataUpdater metadataUpdater,
5144
WriterClient writerClient,
5245
@Nullable int[] targetColumns) {
5346
this.tablePath = tablePath;
5447
this.tableInfo = tableInfo;
5548
this.writerClient = writerClient;
56-
this.metadataUpdater = metadataUpdater;
5749
this.targetColumns = targetColumns;
5850
}
5951

@@ -75,7 +67,7 @@ public Upsert partialUpdate(@Nullable int[] targetColumns) {
7567
}
7668
}
7769
}
78-
return new TableUpsert(tablePath, tableInfo, metadataUpdater, writerClient, targetColumns);
70+
return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns);
7971
}
8072

8173
@Override
@@ -100,7 +92,6 @@ public Upsert partialUpdate(String... targetColumnNames) {
10092

10193
@Override
10294
public UpsertWriter createWriter() {
103-
return new UpsertWriterImpl(
104-
tablePath, tableInfo, targetColumns, writerClient, metadataUpdater);
95+
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient);
10596
}
10697
}

fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.client.table.writer;
1818

19-
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2019
import com.alibaba.fluss.client.write.WriteRecord;
2120
import com.alibaba.fluss.client.write.WriterClient;
2221
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -57,9 +56,8 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
5756
TablePath tablePath,
5857
TableInfo tableInfo,
5958
@Nullable int[] partialUpdateColumns,
60-
WriterClient writerClient,
61-
MetadataUpdater metadataUpdater) {
62-
super(tablePath, tableInfo, metadataUpdater, writerClient);
59+
WriterClient writerClient) {
60+
super(tablePath, tableInfo, writerClient);
6361
RowType rowType = tableInfo.getRowType();
6462
sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
6563

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.alibaba.fluss.client.metadata.KvSnapshots;
2424
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2525
import com.alibaba.fluss.client.write.KvWriteBatch;
26-
import com.alibaba.fluss.client.write.WriteBatch;
26+
import com.alibaba.fluss.client.write.ReadyWriteBatch;
2727
import com.alibaba.fluss.fs.FsPath;
2828
import com.alibaba.fluss.fs.FsPathAndFileName;
2929
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
@@ -48,7 +48,6 @@
4848
import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket;
4949
import com.alibaba.fluss.rpc.messages.PbLookupReqForBucket;
5050
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
51-
import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath;
5251
import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5352
import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket;
5453
import com.alibaba.fluss.rpc.messages.PbPutKvReqForBucket;
@@ -77,49 +76,43 @@
7776
public class ClientRpcMessageUtils {
7877

7978
public static ProduceLogRequest makeProduceLogRequest(
80-
long tableId, int acks, int maxRequestTimeoutMs, List<WriteBatch> batches) {
79+
long tableId, int acks, int maxRequestTimeoutMs, List<ReadyWriteBatch> readyBatches) {
8180
ProduceLogRequest request =
8281
new ProduceLogRequest()
8382
.setTableId(tableId)
8483
.setAcks(acks)
8584
.setTimeoutMs(maxRequestTimeoutMs);
86-
batches.forEach(
87-
batch -> {
85+
readyBatches.forEach(
86+
readyBatch -> {
87+
TableBucket tableBucket = readyBatch.tableBucket();
8888
PbProduceLogReqForBucket pbProduceLogReqForBucket =
8989
request.addBucketsReq()
90-
.setBucketId(batch.tableBucket().getBucket())
91-
.setRecordsBytesView(batch.build());
92-
TableBucket tableBucket = batch.tableBucket();
90+
.setBucketId(tableBucket.getBucket())
91+
.setRecordsBytesView(readyBatch.writeBatch().build());
9392
if (tableBucket.getPartitionId() != null) {
9493
pbProduceLogReqForBucket.setPartitionId(tableBucket.getPartitionId());
9594
}
9695
});
9796
return request;
9897
}
9998

100-
public static PbPhysicalTablePath fromPhysicalTablePath(PhysicalTablePath physicalPath) {
101-
PbPhysicalTablePath pbPath =
102-
new PbPhysicalTablePath()
103-
.setDatabaseName(physicalPath.getDatabaseName())
104-
.setTableName(physicalPath.getTableName());
105-
if (physicalPath.getPartitionName() != null) {
106-
pbPath.setPartitionName(physicalPath.getPartitionName());
107-
}
108-
return pbPath;
109-
}
110-
11199
public static PutKvRequest makePutKvRequest(
112-
long tableId, int acks, int maxRequestTimeoutMs, List<WriteBatch> batches) {
100+
long tableId,
101+
int acks,
102+
int maxRequestTimeoutMs,
103+
List<ReadyWriteBatch> readyWriteBatches) {
113104
PutKvRequest request =
114105
new PutKvRequest()
115106
.setTableId(tableId)
116107
.setAcks(acks)
117108
.setTimeoutMs(maxRequestTimeoutMs);
118109
// check the target columns in the batch list should be the same. If not same,
119110
// we throw exception directly currently.
120-
int[] targetColumns = ((KvWriteBatch) batches.get(0)).getTargetColumns();
121-
for (int i = 1; i < batches.size(); i++) {
122-
int[] currentBatchTargetColumns = ((KvWriteBatch) batches.get(i)).getTargetColumns();
111+
int[] targetColumns =
112+
((KvWriteBatch) readyWriteBatches.get(0).writeBatch()).getTargetColumns();
113+
for (int i = 1; i < readyWriteBatches.size(); i++) {
114+
int[] currentBatchTargetColumns =
115+
((KvWriteBatch) readyWriteBatches.get(i).writeBatch()).getTargetColumns();
123116
if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) {
124117
throw new IllegalStateException(
125118
String.format(
@@ -132,13 +125,13 @@ public static PutKvRequest makePutKvRequest(
132125
if (targetColumns != null) {
133126
request.setTargetColumns(targetColumns);
134127
}
135-
batches.forEach(
136-
batch -> {
128+
readyWriteBatches.forEach(
129+
readyBatch -> {
130+
TableBucket tableBucket = readyBatch.tableBucket();
137131
PbPutKvReqForBucket pbPutKvReqForBucket =
138132
request.addBucketsReq()
139-
.setBucketId(batch.tableBucket().getBucket())
140-
.setRecordsBytesView(batch.build());
141-
TableBucket tableBucket = batch.tableBucket();
133+
.setBucketId(tableBucket.getBucket())
134+
.setRecordsBytesView(readyBatch.writeBatch().build());
142135
if (tableBucket.getPartitionId() != null) {
143136
pbPutKvReqForBucket.setPartitionId(tableBucket.getPartitionId());
144137
}
@@ -355,14 +348,6 @@ public static Map<String, String> toKeyValueMap(List<PbKeyValue> pbKeyValues) {
355348
PbKeyValue::getKey, PbKeyValue::getValue));
356349
}
357350

358-
public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) {
359-
Map<String, String> partitionSpecMap = partitionSpec.getSpecMap();
360-
List<PbKeyValue> pbKeyValues = new ArrayList<>(partitionSpecMap.size());
361-
partitionSpecMap.forEach(
362-
(key, value) -> pbKeyValues.add(new PbKeyValue().setKey(key).setValue(value)));
363-
return new PbPartitionSpec().addAllPartitionKeyValues(pbKeyValues);
364-
}
365-
366351
public static ResolvedPartitionSpec toResolvedPartitionSpec(PbPartitionSpec pbPartitionSpec) {
367352
List<String> partitionKeys = new ArrayList<>();
368353
List<String> partitionValues = new ArrayList<>();

0 commit comments

Comments
 (0)