Skip to content

Commit df77b5c

Browse files
[server] Add configuration options to disable PK or Log table creation (apache#1503)
Co-authored-by: ocean.wy <[email protected]> Co-authored-by: Jark Wu <[email protected]>
1 parent bc20370 commit df77b5c

File tree

4 files changed

+210
-8
lines changed

4 files changed

+210
-8
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,24 @@ public class ConfigOptions {
122122
"The interval of auto partition check. "
123123
+ "The default value is 10 minutes.");
124124

125+
public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
126+
key("allow.create.log.tables")
127+
.booleanType()
128+
.defaultValue(true)
129+
.withDescription(
130+
"Whether to allow creation of log tables. When set to false, "
131+
+ "attempts to create log tables (tables without primary key) will be rejected. "
132+
+ "The default value is true.");
133+
134+
public static final ConfigOption<Boolean> KV_TABLE_ALLOW_CREATION =
135+
key("allow.create.kv.tables")
136+
.booleanType()
137+
.defaultValue(true)
138+
.withDescription(
139+
"Whether to allow creation of kv tables (primary key tables). When set to false, "
140+
+ "attempts to create kv tables (tables with primary key) will be rejected. "
141+
+ "The default value is true.");
142+
125143
public static final ConfigOption<Integer> MAX_PARTITION_NUM =
126144
key("max.partition.num")
127145
.intType()

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
135135

136136
private final int defaultBucketNumber;
137137
private final int defaultReplicationFactor;
138+
private final boolean logTableAllowCreation;
139+
private final boolean kvTableAllowCreation;
138140
private final Supplier<EventManager> eventManagerSupplier;
139141
private final Supplier<Integer> coordinatorEpochSupplier;
140142
private final ServerMetadataCache metadataCache;
@@ -157,6 +159,8 @@ public CoordinatorService(
157159
super(remoteFileSystem, ServerType.COORDINATOR, zkClient, metadataManager, authorizer);
158160
this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
159161
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
162+
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
163+
this.kvTableAllowCreation = conf.getBoolean(ConfigOptions.KV_TABLE_ALLOW_CREATION);
160164
this.eventManagerSupplier =
161165
() -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
162166
this.coordinatorEpochSupplier =
@@ -245,6 +249,9 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
245249
}
246250
}
247251

252+
// Check table creation permissions based on table type
253+
validateTableCreationPermission(tableDescriptor, tablePath);
254+
248255
// apply system defaults if the config is not set
249256
tableDescriptor = applySystemDefaults(tableDescriptor);
250257

@@ -651,4 +658,30 @@ private static List<BucketMetadata> getBucketMetadataFromContext(
651658
});
652659
return bucketMetadataList;
653660
}
661+
662+
/**
663+
* Validates whether the table creation is allowed based on the table type and configuration.
664+
*
665+
* @param tableDescriptor the table descriptor to validate
666+
* @param tablePath the table path for error reporting
667+
* @throws InvalidTableException if table creation is not allowed
668+
*/
669+
private void validateTableCreationPermission(
670+
TableDescriptor tableDescriptor, TablePath tablePath) {
671+
boolean hasPrimaryKey = tableDescriptor.hasPrimaryKey();
672+
673+
if (hasPrimaryKey) {
674+
// This is a KV table (Primary Key Table)
675+
if (!kvTableAllowCreation) {
676+
throw new InvalidTableException(
677+
"Creation of Primary Key Tables is disallowed in the cluster.");
678+
}
679+
} else {
680+
// This is a Log table
681+
if (!logTableAllowCreation) {
682+
throw new InvalidTableException(
683+
"Creation of Log Tables is disallowed in the cluster.");
684+
}
685+
}
686+
}
654687
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java

Lines changed: 157 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ void testCreateInvalidDatabaseAndTable() {
157157
.createTable(
158158
newCreateTableRequest(
159159
new TablePath("db", "=invalid_table!"),
160-
newTable(),
160+
newPkTable(),
161161
true))
162162
.get())
163163
.cause()
@@ -170,7 +170,7 @@ void testCreateInvalidDatabaseAndTable() {
170170
.createTable(
171171
newCreateTableRequest(
172172
new TablePath("", "=invalid_table!"),
173-
newTable(),
173+
newPkTable(),
174174
true))
175175
.get())
176176
.cause()
@@ -272,7 +272,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception {
272272
adminGateway.dropTable(newDropTableRequest(db1, tb1, true)).get();
273273

274274
// then create a table
275-
TableDescriptor tableDescriptor = newTable();
275+
TableDescriptor tableDescriptor = newPkTable();
276276
adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get();
277277

278278
// the table should exist then
@@ -453,7 +453,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception {
453453
TablePath tablePath = TablePath.of(db1, tb1);
454454
// first create a database
455455
adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get();
456-
TableDescriptor tableDescriptor = newTable();
456+
TableDescriptor tableDescriptor = newPkTable();
457457
adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get();
458458
GetTableInfoResponse response =
459459
gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
@@ -715,7 +715,7 @@ private static void checkAssignmentWithReplicaFactor(
715715
}
716716

717717
private static TableDescriptor newTableWithoutSettingDistribution() {
718-
return TableDescriptor.builder().schema(newSchema()).comment("first table").build();
718+
return TableDescriptor.builder().schema(newPkSchema()).comment("first table").build();
719719
}
720720

721721
private static TableDescriptor newPartitionedTable() {
@@ -749,15 +749,15 @@ private static TableDescriptor.Builder newPartitionedTableBuilder(
749749
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 1);
750750
}
751751

752-
private static TableDescriptor newTable() {
752+
private static TableDescriptor newPkTable() {
753753
return TableDescriptor.builder()
754-
.schema(newSchema())
754+
.schema(newPkSchema())
755755
.comment("first table")
756756
.distributedBy(3, "a")
757757
.build();
758758
}
759759

760-
private static Schema newSchema() {
760+
private static Schema newPkSchema() {
761761
return Schema.newBuilder()
762762
.column("a", DataTypes.INT())
763763
.withComment("a comment")
@@ -798,4 +798,153 @@ private UpdateMetadataRequest makeLegacyUpdateMetadataRequest(
798798
});
799799
return updateMetadataRequest;
800800
}
801+
802+
// Test methods for table creation restrictions
803+
804+
@Test
805+
void testLogTableCreationRestriction() throws Exception {
806+
// Test with cluster that disallows log table creation
807+
FlussClusterExtension kvCluster =
808+
FlussClusterExtension.builder()
809+
.setNumOfTabletServers(3)
810+
.setCoordinatorServerListeners(
811+
String.format(
812+
"%s://localhost:0, %s://localhost:0",
813+
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
814+
.setTabletServerListeners(
815+
String.format(
816+
"%s://localhost:0, %s://localhost:0",
817+
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
818+
.setClusterConf(initLogRestrictedConf())
819+
.build();
820+
821+
try {
822+
kvCluster.start();
823+
824+
AdminGateway kvClusterGateway = kvCluster.newCoordinatorClient();
825+
826+
String tb1 = "log_table";
827+
TablePath tablePath = TablePath.of("fluss", tb1);
828+
829+
// Try to create a log table (table without primary key), should fail
830+
TableDescriptor logTableDescriptor = newLogTable();
831+
assertThatThrownBy(
832+
() ->
833+
kvClusterGateway
834+
.createTable(
835+
newCreateTableRequest(
836+
tablePath, logTableDescriptor, false))
837+
.get())
838+
.cause()
839+
.isInstanceOf(InvalidTableException.class)
840+
.hasMessageContaining("Creation of Log Tables is disallowed in the cluster.");
841+
842+
// Try to create a kv table (table with primary key), should succeed
843+
String tb2 = "kv_table";
844+
TablePath kvTablePath = TablePath.of("fluss", tb2);
845+
TableDescriptor kvTableDescriptor = newPkTable();
846+
kvClusterGateway
847+
.createTable(newCreateTableRequest(kvTablePath, kvTableDescriptor, false))
848+
.get();
849+
850+
// Verify the kv table was created successfully
851+
assertThat(
852+
kvClusterGateway
853+
.tableExists(newTableExistsRequest(kvTablePath))
854+
.get()
855+
.isExists())
856+
.isTrue();
857+
} finally {
858+
kvCluster.close();
859+
}
860+
}
861+
862+
@Test
863+
void testKvTableCreationRestriction() throws Exception {
864+
// Test with cluster that disallows kv table creation
865+
FlussClusterExtension logCluster =
866+
FlussClusterExtension.builder()
867+
.setNumOfTabletServers(3)
868+
.setCoordinatorServerListeners(
869+
String.format(
870+
"%s://localhost:0, %s://localhost:0",
871+
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
872+
.setTabletServerListeners(
873+
String.format(
874+
"%s://localhost:0, %s://localhost:0",
875+
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
876+
.setClusterConf(initKvRestrictedConf())
877+
.build();
878+
879+
try {
880+
logCluster.start();
881+
AdminGateway logClusterGateway = logCluster.newCoordinatorClient();
882+
883+
String tb1 = "kv_table";
884+
TablePath tablePath = TablePath.of("fluss", tb1);
885+
886+
// Try to create a kv table (table with primary key), should fail
887+
TableDescriptor kvTableDescriptor = newPkTable();
888+
assertThatThrownBy(
889+
() ->
890+
logClusterGateway
891+
.createTable(
892+
newCreateTableRequest(
893+
tablePath, kvTableDescriptor, false))
894+
.get())
895+
.cause()
896+
.isInstanceOf(InvalidTableException.class)
897+
.hasMessageContaining(
898+
"Creation of Primary Key Tables is disallowed in the cluster.");
899+
900+
// Try to create a log table (table without primary key), should succeed
901+
String tb2 = "log_table";
902+
TablePath logTablePath = TablePath.of("fluss", tb2);
903+
TableDescriptor logTableDescriptor = newLogTable();
904+
logClusterGateway
905+
.createTable(newCreateTableRequest(logTablePath, logTableDescriptor, false))
906+
.get();
907+
908+
// Verify the log table was created successfully
909+
assertThat(
910+
logClusterGateway
911+
.tableExists(newTableExistsRequest(logTablePath))
912+
.get()
913+
.isExists())
914+
.isTrue();
915+
} finally {
916+
logCluster.close();
917+
}
918+
}
919+
920+
private static Configuration initLogRestrictedConf() {
921+
Configuration conf = new Configuration();
922+
conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1));
923+
conf.set(ConfigOptions.LOG_TABLE_ALLOW_CREATION, false);
924+
return conf;
925+
}
926+
927+
private static Configuration initKvRestrictedConf() {
928+
Configuration conf = new Configuration();
929+
conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1));
930+
conf.set(ConfigOptions.KV_TABLE_ALLOW_CREATION, false);
931+
return conf;
932+
}
933+
934+
// Helper methods for creating different table types
935+
private static TableDescriptor newLogTable() {
936+
return TableDescriptor.builder()
937+
.schema(newLogSchema())
938+
.comment("log table without primary key")
939+
.distributedBy(3, "a")
940+
.build();
941+
}
942+
943+
private static Schema newLogSchema() {
944+
return Schema.newBuilder()
945+
.column("a", DataTypes.INT())
946+
.withComment("a comment")
947+
.column("b", DataTypes.STRING())
948+
.build();
949+
}
801950
}

0 commit comments

Comments
 (0)