Skip to content

Commit 6321586

Browse files
authored
[test] Add test for recovering coordinator server with schema evolution (#2176)
1 parent 35d92c5 commit 6321586

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.fluss.metadata.Schema;
3737
import org.apache.fluss.metadata.TableBucket;
3838
import org.apache.fluss.metadata.TableDescriptor;
39+
import org.apache.fluss.metadata.TableInfo;
3940
import org.apache.fluss.metadata.TablePath;
4041
import org.apache.fluss.metrics.registry.MetricRegistry;
4142
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -49,6 +50,7 @@
4950
import org.apache.fluss.rpc.messages.ListDatabasesRequest;
5051
import org.apache.fluss.rpc.messages.MetadataRequest;
5152
import org.apache.fluss.rpc.messages.MetadataResponse;
53+
import org.apache.fluss.rpc.messages.PbAddColumn;
5254
import org.apache.fluss.rpc.messages.PbAlterConfig;
5355
import org.apache.fluss.rpc.messages.PbBucketMetadata;
5456
import org.apache.fluss.rpc.messages.PbPartitionMetadata;
@@ -62,6 +64,8 @@
6264
import org.apache.fluss.server.zk.data.BucketAssignment;
6365
import org.apache.fluss.server.zk.data.TableAssignment;
6466
import org.apache.fluss.types.DataTypes;
67+
import org.apache.fluss.utils.json.DataTypeJsonSerde;
68+
import org.apache.fluss.utils.json.JsonSerdeUtils;
6569

6670
import org.junit.jupiter.api.BeforeEach;
6771
import org.junit.jupiter.api.Test;
@@ -298,6 +302,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception {
298302
newAlterTableRequest(
299303
tablePath,
300304
alterTableProperties(setProperties, resetProperties),
305+
Collections.emptyList(),
301306
false))
302307
.get();
303308
// get the table and check it
@@ -655,6 +660,48 @@ void testMetadataCompatibility(boolean isCoordinatorServer) throws Exception {
655660
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes());
656661
}
657662

663+
@Test
664+
void testSchemaEvolution() throws Exception {
665+
AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
666+
AdminGateway adminGateway = getAdminGateway();
667+
668+
// create database and table
669+
String db1 = "db1";
670+
String tb1 = "tb1";
671+
TablePath tablePath = TablePath.of(db1, tb1);
672+
adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get();
673+
TableDescriptor tableDescriptor = newPkTable();
674+
adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get();
675+
676+
// add column
677+
adminGateway
678+
.alterTable(
679+
newAlterTableRequest(
680+
tablePath, Collections.emptyList(), alterTableAddColumns(), false))
681+
.get();
682+
683+
// restart coordinatorServer
684+
FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
685+
FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
686+
687+
// check metadata response
688+
MetadataResponse metadataResponse =
689+
gateway.metadata(newMetadataRequest(Collections.singletonList(tablePath))).get();
690+
assertThat(metadataResponse.getTableMetadatasCount()).isEqualTo(1);
691+
PbTableMetadata pbTableMetadata = metadataResponse.getTableMetadataAt(0);
692+
assertThat(pbTableMetadata.getSchemaId()).isEqualTo(2);
693+
TableInfo tableInfo =
694+
TableInfo.of(
695+
tablePath,
696+
pbTableMetadata.getTableId(),
697+
pbTableMetadata.getSchemaId(),
698+
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()),
699+
pbTableMetadata.getCreatedTime(),
700+
pbTableMetadata.getModifiedTime());
701+
List<Schema.Column> columns = tableInfo.getSchema().getColumns();
702+
assertThat(columns.size()).isEqualTo(3);
703+
}
704+
658705
private void checkBucketMetadata(int expectBucketCount, List<PbBucketMetadata> bucketMetadata) {
659706
Set<Integer> liveServers =
660707
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().stream()
@@ -781,6 +828,20 @@ private static List<PbAlterConfig> alterTableProperties(
781828
return res;
782829
}
783830

831+
private static List<PbAddColumn> alterTableAddColumns() {
832+
List<PbAddColumn> addColumns = new ArrayList<>();
833+
PbAddColumn newColumn = new PbAddColumn();
834+
newColumn
835+
.setColumnName("new_column")
836+
.setDataTypeJson(
837+
JsonSerdeUtils.writeValueAsBytes(
838+
DataTypes.STRING(), DataTypeJsonSerde.INSTANCE))
839+
.setComment("new_column")
840+
.setColumnPositionType(0);
841+
addColumns.add(newColumn);
842+
return addColumns;
843+
}
844+
784845
private static Schema newPkSchema() {
785846
return Schema.newBuilder()
786847
.column("a", DataTypes.INT())

fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.fluss.rpc.messages.ListTablesRequest;
4949
import org.apache.fluss.rpc.messages.LookupRequest;
5050
import org.apache.fluss.rpc.messages.MetadataRequest;
51+
import org.apache.fluss.rpc.messages.PbAddColumn;
5152
import org.apache.fluss.rpc.messages.PbAlterConfig;
5253
import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
5354
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
@@ -145,9 +146,13 @@ public static CreateTableRequest newCreateTableRequest(
145146
}
146147

147148
public static AlterTableRequest newAlterTableRequest(
148-
TablePath tablePath, List<PbAlterConfig> alterConfigs, boolean ignoreIfExists) {
149+
TablePath tablePath,
150+
List<PbAlterConfig> alterConfigs,
151+
List<PbAddColumn> addColumns,
152+
boolean ignoreIfExists) {
149153
AlterTableRequest request = new AlterTableRequest();
150154
request.addAllConfigChanges(alterConfigs)
155+
.addAllAddColumns(addColumns)
151156
.setIgnoreIfNotExists(ignoreIfExists)
152157
.setTablePath()
153158
.setDatabaseName(tablePath.getDatabaseName())

0 commit comments

Comments
 (0)