Skip to content

Commit 8d571b7

Browse files
committed
tmp2
1 parent a795929 commit 8d571b7

File tree

13 files changed

+436
-39
lines changed

13 files changed

+436
-39
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public CompletableFuture<Void> alterTable(
281281
.setDatabaseName(tablePath.getDatabaseName())
282282
.setTableName(tablePath.getTableName());
283283
gateway.alterTable(request).get();
284+
future.complete( null);
284285
} catch (Throwable t) {
285286
future.completeExceptionally(t);
286287
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ public static PbAlterColumn toPbAlterColumn(TableChange tableChange) {
383383
throw new IllegalArgumentException(
384384
"Unsupported table change: " + tableChange.getClass());
385385
}
386-
return null;
386+
return pbAlterColumn;
387387
}
388388

389389
public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ void testGetTableInfoAndSchema() throws Exception {
228228
}
229229

230230
@Test
231-
void testAlterTable() throws Exception {
231+
void testAlterTableConfig() throws Exception {
232232
// create table
233233
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
234234
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
@@ -302,6 +302,38 @@ void testAlterTable() throws Exception {
302302
.get();
303303
}
304304

305+
@Test
306+
void testAlterTableColumn() throws Exception {
307+
// create table
308+
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
309+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
310+
311+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.dropColumn("id")), false).get()).hasMessageContaining("Can not change primary / partition / bucket / key column id");
312+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.modifyColumnName(new Schema.Column("id", DataTypes.INT(), "person id", (short) 0), "id2")), false).get()).hasMessageContaining("Can not change primary / partition / bucket / key column id");
313+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.modify("id", new Schema.Column("id", DataTypes.INT(), "person id", (short) 0), null)), false).get()).hasMessageContaining("Can not change primary / partition / bucket / key column id");
314+
315+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.add(new Schema.Column("c1", DataTypes.STRING().copy(false)), TableChange.ColumnPosition.last())), false).get()).hasMessageContaining("Column c1 must be nullable");
316+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.modify("age", new Schema.Column("age", DataTypes.STRING(), "person id", (short) 0), null)), false).get()).hasMessageContaining("Can not change column age from INT to STRING");
317+
assertThatThrownBy(()-> admin.alterTable(tablePath, Collections.singletonList(TableChange.modify("age", new Schema.Column("age", DataTypes.BIGINT().copy( false), "person id", (short) 0), null)), false).get()).hasMessageContaining("Can not change column age from nullable to not null");
318+
admin.alterTable(tablePath, Arrays.asList(
319+
TableChange.add(new Schema.Column("c1", DataTypes.STRING()), TableChange.ColumnPosition.last()),
320+
TableChange.dropColumn("name"),
321+
//todo: rename感觉只需要old name + new name即可
322+
TableChange.modifyColumnName(new Schema.Column("age",DataTypes.INT(), "person age"), "age2")
323+
), false).get();
324+
325+
Schema expectedSchema = Schema.newBuilder()
326+
.primaryKey("id")
327+
.fromColumns(Arrays.asList(
328+
new Schema.Column("id", DataTypes.INT(), "person id", (short) 0),
329+
new Schema.Column("age2", DataTypes.INT(), "person age", (short) 2),
330+
new Schema.Column("c1", DataTypes.STRING(), null, (short) 3)))
331+
.build();
332+
SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
333+
assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2));
334+
}
335+
336+
305337
@Test
306338
void testCreateInvalidDatabaseAndTable() throws Exception {
307339
assertThatThrownBy(
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config.cluster;
19+
20+
/**
21+
* ColumnPositionType.
22+
*/
23+
public enum AlterColumnOpType {
24+
AddColumn(0),
25+
DropColumn(1),
26+
ModifyColumn(2);
27+
28+
public final int value;
29+
30+
AlterColumnOpType(int value) {
31+
this.value = value;
32+
}
33+
34+
public static AlterColumnOpType from(int opType) {
35+
switch (opType) {
36+
case 0:
37+
return AddColumn;
38+
case 1:
39+
return DropColumn;
40+
case 2:
41+
return ModifyColumn;
42+
default:
43+
throw new IllegalArgumentException("Unsupported AlterColumnOpType: " + opType);
44+
}
45+
}
46+
47+
public int value() {
48+
return this.value;
49+
}
50+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config.cluster;
19+
20+
/**
21+
* ColumnPositionType.
22+
*/
23+
public enum ColumnPositionType {
24+
LAST(0),
25+
FIRST(1),
26+
AFTER(2);
27+
28+
public final int value;
29+
30+
ColumnPositionType(int value) {
31+
this.value = value;
32+
}
33+
34+
35+
public static ColumnPositionType from(int opType) {
36+
switch (opType) {
37+
case 0:
38+
return LAST;
39+
case 1:
40+
return FIRST;
41+
case 2:
42+
return AFTER;
43+
default:
44+
throw new IllegalArgumentException("Unsupported ColumnPositionType: " + opType);
45+
}
46+
}
47+
48+
public int value() {
49+
return this.value;
50+
}
51+
}

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public int[] getColumnIndexesByColumnIds(List<Short> columnIds) {
183183
return keyIndexes;
184184
}
185185

186+
public int getHighestFieldId() {
187+
return highestFieldId;
188+
}
189+
186190
@Override
187191
public String toString() {
188192
final List<Object> components = new ArrayList<>(columns);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@
137137
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
138138
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableConfigChanges;
139139
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableSchemaChanges;
140-
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableChanges;
141140
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
142141
import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
143142
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
@@ -325,10 +324,14 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
325324
TablePropertyChanges tablePropertyChanges = toTablePropertyChanges(alterTableConfigChanges);
326325
List<TableChange.SchemaChange> alterSchemaChanges = toAlterTableSchemaChanges(request.getColumnChangesList());
327326

327+
Integer schemaId = request.hasSchemaId()?request.getSchemaId():null;
328+
329+
328330
LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
329331
lakeCatalogDynamicLoader.getLakeCatalogContainer();
330332
metadataManager.alterTableProperties(
331333
tablePath,
334+
schemaId,
332335
alterTableConfigChanges,
333336
tablePropertyChanges,
334337
alterSchemaChanges,

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.exception.FlussRuntimeException;
2626
import org.apache.fluss.exception.InvalidAlterTableException;
2727
import org.apache.fluss.exception.InvalidPartitionException;
28+
import org.apache.fluss.exception.InvalidTableException;
2829
import org.apache.fluss.exception.LakeTableAlreadyExistException;
2930
import org.apache.fluss.exception.PartitionAlreadyExistsException;
3031
import org.apache.fluss.exception.PartitionNotExistException;
@@ -39,6 +40,7 @@
3940
import org.apache.fluss.metadata.DatabaseDescriptor;
4041
import org.apache.fluss.metadata.DatabaseInfo;
4142
import org.apache.fluss.metadata.ResolvedPartitionSpec;
43+
import org.apache.fluss.metadata.Schema;
4244
import org.apache.fluss.metadata.SchemaInfo;
4345
import org.apache.fluss.metadata.TableChange;
4446
import org.apache.fluss.metadata.TableDescriptor;
@@ -323,6 +325,7 @@ public long createTable(
323325

324326
public void alterTableProperties(
325327
TablePath tablePath,
328+
@Nullable Integer schemaId,
326329
List<TableChange> tableChanges,
327330
TablePropertyChanges tablePropertyChanges,
328331
List<TableChange.SchemaChange> schemaChanges,
@@ -332,51 +335,76 @@ public void alterTableProperties(
332335
LakeTableTieringManager lakeTableTieringManager,
333336
LakeCatalog.Context lakeCatalogContext) {
334337
try {
338+
335339
// it throws TableNotExistException if the table or database not exists
336340
TableRegistration tableReg = getTableRegistration(tablePath);
337341
SchemaInfo schemaInfo = getLatestSchema(tablePath);
338342
// we can't use MetadataManager#getTable here, because it will add the default
339343
// lake options to the table properties, which may cause the validation failure
340344
TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
341345

342-
// validate the changes
346+
// validate the table properties changes
343347
validateAlterTableProperties(
344348
tableInfo,
345349
tablePropertyChanges.tableKeysToChange(),
346350
tablePropertyChanges.customKeysToChange());
347351

348352
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
353+
// validate the table column changes
354+
Schema newSchema = null;
355+
if(!schemaChanges.isEmpty()){
356+
SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
357+
for (TableChange.SchemaChange schemaChange : schemaChanges) {
358+
schemaUpdate.applySchemaChange(schemaChange);
359+
}
360+
newSchema = schemaUpdate.getSchema();
361+
if(schemaId == null){
362+
throw new InvalidTableException("Can't alter table schema without schema id");
363+
}
364+
}
365+
366+
349367
TableDescriptor newDescriptor =
350-
getUpdatedTableDescriptor(tableDescriptor, tablePropertyChanges);
368+
getUpdatedTableDescriptor(tableDescriptor, tablePropertyChanges, newSchema);
351369

352-
if (newDescriptor != null) {
370+
if (newDescriptor != null ) {
353371
// reuse the same validate logic with the createTable() method
354372
validateTableDescriptor(newDescriptor, maxBucketNum);
355373

356-
// pre alter table properties, e.g. create lake table in lake storage if it's to
357-
// enable datalake for the table
358-
preAlterTableProperties(
359-
tablePath,
360-
tableDescriptor,
361-
newDescriptor,
362-
tableChanges,
363-
lakeCatalog,
364-
dataLakeFormat,
365-
lakeCatalogContext);
366-
// update the table to zk
367-
TableRegistration updatedTableRegistration =
368-
tableReg.newProperties(
369-
newDescriptor.getProperties(), newDescriptor.getCustomProperties());
370-
zookeeperClient.updateTable(tablePath, updatedTableRegistration);
371-
372-
// post alter table properties, e.g. add the table to lake table tiering manager if
373-
// it's to enable datalake for the table
374-
postAlterTableProperties(
375-
tablePath,
376-
schemaInfo,
377-
tableDescriptor,
378-
updatedTableRegistration,
379-
lakeTableTieringManager);
374+
if(newSchema != null) {
375+
// cas update the schema
376+
zookeeperClient.registerSchema(tablePath, newSchema, schemaId+1);
377+
}
378+
379+
//todo: 将这个判断换一下,tableChanges不代表properties变了
380+
if(!tableChanges.isEmpty()) {
381+
// pre alter table properties, e.g. create lake table in lake storage if it's to
382+
// enable datalake for the table
383+
preAlterTableProperties(
384+
tablePath,
385+
tableDescriptor,
386+
newDescriptor,
387+
tableChanges,
388+
lakeCatalog,
389+
dataLakeFormat,
390+
lakeCatalogContext);
391+
// update the table to zk
392+
TableRegistration updatedTableRegistration =
393+
tableReg.newProperties(
394+
newDescriptor.getProperties(), newDescriptor.getCustomProperties());
395+
396+
397+
zookeeperClient.updateTable(tablePath, updatedTableRegistration);
398+
399+
// post alter table properties, e.g. add the table to lake table tiering manager if
400+
// it's to enable datalake for the table
401+
postAlterTableProperties(
402+
tablePath,
403+
schemaInfo,
404+
tableDescriptor,
405+
updatedTableRegistration,
406+
lakeTableTieringManager);
407+
}
380408
} else {
381409
LOG.info(
382410
"No properties changed when alter table {}, skip update table.", tablePath);
@@ -480,7 +508,7 @@ private void postAlterTableProperties(
480508
* @return the updated TableDescriptor, or null if no properties updated.
481509
*/
482510
private @Nullable TableDescriptor getUpdatedTableDescriptor(
483-
TableDescriptor tableDescriptor, TablePropertyChanges tablePropertyChanges) {
511+
TableDescriptor tableDescriptor, TablePropertyChanges tablePropertyChanges, @Nullable Schema newSchema) {
484512
Map<String, String> newProperties = new HashMap<>(tableDescriptor.getProperties());
485513
Map<String, String> newCustomProperties =
486514
new HashMap<>(tableDescriptor.getCustomProperties());
@@ -500,10 +528,16 @@ private void postAlterTableProperties(
500528

501529
// no properties change happen
502530
if (newProperties.equals(tableDescriptor.getProperties())
503-
&& newCustomProperties.equals(tableDescriptor.getCustomProperties())) {
531+
&& newCustomProperties.equals(tableDescriptor.getCustomProperties()) && newSchema == null) {
504532
return null;
505533
} else {
506-
return tableDescriptor.withProperties(newProperties, newCustomProperties);
534+
TableDescriptor.Builder builder = TableDescriptor.builder(tableDescriptor);
535+
if(newSchema != null){
536+
builder.schema(newSchema);
537+
}
538+
builder.properties(newProperties);
539+
builder.customProperties(newCustomProperties);
540+
return builder.build();
507541
}
508542
}
509543

0 commit comments

Comments
 (0)