@@ -157,7 +157,7 @@ void testCreateInvalidDatabaseAndTable() {
157157 .createTable (
158158 newCreateTableRequest (
159159 new TablePath ("db" , "=invalid_table!" ),
160- newTable (),
160+ newPkTable (),
161161 true ))
162162 .get ())
163163 .cause ()
@@ -170,7 +170,7 @@ void testCreateInvalidDatabaseAndTable() {
170170 .createTable (
171171 newCreateTableRequest (
172172 new TablePath ("" , "=invalid_table!" ),
173- newTable (),
173+ newPkTable (),
174174 true ))
175175 .get ())
176176 .cause ()
@@ -272,7 +272,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception {
272272 adminGateway .dropTable (newDropTableRequest (db1 , tb1 , true )).get ();
273273
274274 // then create a table
275- TableDescriptor tableDescriptor = newTable ();
275+ TableDescriptor tableDescriptor = newPkTable ();
276276 adminGateway .createTable (newCreateTableRequest (tablePath , tableDescriptor , false )).get ();
277277
278278 // the table should exist then
@@ -453,7 +453,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception {
453453 TablePath tablePath = TablePath .of (db1 , tb1 );
454454 // first create a database
455455 adminGateway .createDatabase (newCreateDatabaseRequest (db1 , false )).get ();
456- TableDescriptor tableDescriptor = newTable ();
456+ TableDescriptor tableDescriptor = newPkTable ();
457457 adminGateway .createTable (newCreateTableRequest (tablePath , tableDescriptor , false )).get ();
458458 GetTableInfoResponse response =
459459 gateway .getTableInfo (newGetTableInfoRequest (tablePath )).get ();
@@ -715,7 +715,7 @@ private static void checkAssignmentWithReplicaFactor(
715715 }
716716
717717 private static TableDescriptor newTableWithoutSettingDistribution () {
718- return TableDescriptor .builder ().schema (newSchema ()).comment ("first table" ).build ();
718+ return TableDescriptor .builder ().schema (newPkSchema ()).comment ("first table" ).build ();
719719 }
720720
721721 private static TableDescriptor newPartitionedTable () {
@@ -749,15 +749,15 @@ private static TableDescriptor.Builder newPartitionedTableBuilder(
749749 .property (ConfigOptions .TABLE_AUTO_PARTITION_NUM_PRECREATE , 1 );
750750 }
751751
752- private static TableDescriptor newTable () {
752+ private static TableDescriptor newPkTable () {
753753 return TableDescriptor .builder ()
754- .schema (newSchema ())
754+ .schema (newPkSchema ())
755755 .comment ("first table" )
756756 .distributedBy (3 , "a" )
757757 .build ();
758758 }
759759
760- private static Schema newSchema () {
760+ private static Schema newPkSchema () {
761761 return Schema .newBuilder ()
762762 .column ("a" , DataTypes .INT ())
763763 .withComment ("a comment" )
@@ -798,4 +798,153 @@ private UpdateMetadataRequest makeLegacyUpdateMetadataRequest(
798798 });
799799 return updateMetadataRequest ;
800800 }
801+
802+ // Test methods for table creation restrictions
803+
804+ @ Test
805+ void testLogTableCreationRestriction () throws Exception {
806+ // Test with cluster that disallows log table creation
807+ FlussClusterExtension kvCluster =
808+ FlussClusterExtension .builder ()
809+ .setNumOfTabletServers (3 )
810+ .setCoordinatorServerListeners (
811+ String .format (
812+ "%s://localhost:0, %s://localhost:0" ,
813+ DEFAULT_LISTENER_NAME , CLIENT_LISTENER ))
814+ .setTabletServerListeners (
815+ String .format (
816+ "%s://localhost:0, %s://localhost:0" ,
817+ DEFAULT_LISTENER_NAME , CLIENT_LISTENER ))
818+ .setClusterConf (initLogRestrictedConf ())
819+ .build ();
820+
821+ try {
822+ kvCluster .start ();
823+
824+ AdminGateway kvClusterGateway = kvCluster .newCoordinatorClient ();
825+
826+ String tb1 = "log_table" ;
827+ TablePath tablePath = TablePath .of ("fluss" , tb1 );
828+
829+ // Try to create a log table (table without primary key), should fail
830+ TableDescriptor logTableDescriptor = newLogTable ();
831+ assertThatThrownBy (
832+ () ->
833+ kvClusterGateway
834+ .createTable (
835+ newCreateTableRequest (
836+ tablePath , logTableDescriptor , false ))
837+ .get ())
838+ .cause ()
839+ .isInstanceOf (InvalidTableException .class )
840+ .hasMessageContaining ("Creation of Log Tables is disallowed in the cluster." );
841+
842+ // Try to create a kv table (table with primary key), should succeed
843+ String tb2 = "kv_table" ;
844+ TablePath kvTablePath = TablePath .of ("fluss" , tb2 );
845+ TableDescriptor kvTableDescriptor = newPkTable ();
846+ kvClusterGateway
847+ .createTable (newCreateTableRequest (kvTablePath , kvTableDescriptor , false ))
848+ .get ();
849+
850+ // Verify the kv table was created successfully
851+ assertThat (
852+ kvClusterGateway
853+ .tableExists (newTableExistsRequest (kvTablePath ))
854+ .get ()
855+ .isExists ())
856+ .isTrue ();
857+ } finally {
858+ kvCluster .close ();
859+ }
860+ }
861+
862+ @ Test
863+ void testKvTableCreationRestriction () throws Exception {
864+ // Test with cluster that disallows kv table creation
865+ FlussClusterExtension logCluster =
866+ FlussClusterExtension .builder ()
867+ .setNumOfTabletServers (3 )
868+ .setCoordinatorServerListeners (
869+ String .format (
870+ "%s://localhost:0, %s://localhost:0" ,
871+ DEFAULT_LISTENER_NAME , CLIENT_LISTENER ))
872+ .setTabletServerListeners (
873+ String .format (
874+ "%s://localhost:0, %s://localhost:0" ,
875+ DEFAULT_LISTENER_NAME , CLIENT_LISTENER ))
876+ .setClusterConf (initKvRestrictedConf ())
877+ .build ();
878+
879+ try {
880+ logCluster .start ();
881+ AdminGateway logClusterGateway = logCluster .newCoordinatorClient ();
882+
883+ String tb1 = "kv_table" ;
884+ TablePath tablePath = TablePath .of ("fluss" , tb1 );
885+
886+ // Try to create a kv table (table with primary key), should fail
887+ TableDescriptor kvTableDescriptor = newPkTable ();
888+ assertThatThrownBy (
889+ () ->
890+ logClusterGateway
891+ .createTable (
892+ newCreateTableRequest (
893+ tablePath , kvTableDescriptor , false ))
894+ .get ())
895+ .cause ()
896+ .isInstanceOf (InvalidTableException .class )
897+ .hasMessageContaining (
898+ "Creation of Primary Key Tables is disallowed in the cluster." );
899+
900+ // Try to create a log table (table without primary key), should succeed
901+ String tb2 = "log_table" ;
902+ TablePath logTablePath = TablePath .of ("fluss" , tb2 );
903+ TableDescriptor logTableDescriptor = newLogTable ();
904+ logClusterGateway
905+ .createTable (newCreateTableRequest (logTablePath , logTableDescriptor , false ))
906+ .get ();
907+
908+ // Verify the log table was created successfully
909+ assertThat (
910+ logClusterGateway
911+ .tableExists (newTableExistsRequest (logTablePath ))
912+ .get ()
913+ .isExists ())
914+ .isTrue ();
915+ } finally {
916+ logCluster .close ();
917+ }
918+ }
919+
920+ private static Configuration initLogRestrictedConf () {
921+ Configuration conf = new Configuration ();
922+ conf .set (ConfigOptions .AUTO_PARTITION_CHECK_INTERVAL , Duration .ofSeconds (1 ));
923+ conf .set (ConfigOptions .LOG_TABLE_ALLOW_CREATION , false );
924+ return conf ;
925+ }
926+
927+ private static Configuration initKvRestrictedConf () {
928+ Configuration conf = new Configuration ();
929+ conf .set (ConfigOptions .AUTO_PARTITION_CHECK_INTERVAL , Duration .ofSeconds (1 ));
930+ conf .set (ConfigOptions .KV_TABLE_ALLOW_CREATION , false );
931+ return conf ;
932+ }
933+
934+ // Helper methods for creating different table types
935+ private static TableDescriptor newLogTable () {
936+ return TableDescriptor .builder ()
937+ .schema (newLogSchema ())
938+ .comment ("log table without primary key" )
939+ .distributedBy (3 , "a" )
940+ .build ();
941+ }
942+
943+ private static Schema newLogSchema () {
944+ return Schema .newBuilder ()
945+ .column ("a" , DataTypes .INT ())
946+ .withComment ("a comment" )
947+ .column ("b" , DataTypes .STRING ())
948+ .build ();
949+ }
801950}
0 commit comments