Skip to content

Commit 6003c42

Browse files
authored
[server] Forbid any table/partition creation that begins with __ (#1706)
* [server] Forbid any table/partition creation that begins with `__` * addressed yunhong's comments * cover test in FlussAdminITCase
1 parent bb76356 commit 6003c42

File tree

6 files changed

+102
-16
lines changed

6 files changed

+102
-16
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ void testAlterTable() throws Exception {
281281
}
282282

283283
@Test
284-
void testCreateInvalidDatabaseAndTable() {
284+
void testCreateInvalidDatabaseAndTable() throws Exception {
285285
assertThatThrownBy(
286286
() ->
287287
admin.createDatabase(
@@ -290,6 +290,15 @@ void testCreateInvalidDatabaseAndTable() {
290290
.isInstanceOf(InvalidDatabaseException.class)
291291
.hasMessageContaining(
292292
"Database name *invalid_db* is invalid: '*invalid_db*' contains one or more characters other than");
293+
// test internal database with '__' prefix is not allowed
294+
assertThatThrownBy(
295+
() ->
296+
admin.createDatabase(
297+
"__internal_db", DatabaseDescriptor.EMPTY, false)
298+
.get())
299+
.isInstanceOf(InvalidDatabaseException.class)
300+
.hasMessageContaining(
301+
"Database name __internal_db is invalid: '__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server");
293302
assertThatThrownBy(
294303
() ->
295304
admin.createTable(
@@ -309,6 +318,17 @@ void testCreateInvalidDatabaseAndTable() {
309318
.get())
310319
.isInstanceOf(InvalidDatabaseException.class)
311320
.hasMessageContaining("Database name null is invalid: null string is not allowed");
321+
// test internal table with '__' prefix is not allowed
322+
assertThatThrownBy(
323+
() ->
324+
admin.createTable(
325+
TablePath.of("db", "__internal_table"),
326+
DEFAULT_TABLE_DESCRIPTOR,
327+
false)
328+
.get())
329+
.isInstanceOf(InvalidTableException.class)
330+
.hasMessageContaining(
331+
"Table name __internal_table is invalid: '__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server");
312332
}
313333

314334
@Test
@@ -748,6 +768,21 @@ void testAddAndDropPartitions() throws Exception {
748768
admin.createTable(tablePath, partitionedTable, true).get();
749769
assertPartitionInfo(admin.listPartitionInfos(tablePath).get(), Collections.emptyList());
750770

771+
// test internal partition with '__' prefix is not allowed
772+
assertThatThrownBy(
773+
() ->
774+
admin.createPartition(
775+
tablePath,
776+
newPartitionSpec(
777+
Arrays.asList("age"),
778+
Arrays.asList("__18")),
779+
false)
780+
.get())
781+
.cause()
782+
.isInstanceOf(InvalidPartitionException.class)
783+
.hasMessageContaining(
784+
"The partition value __18 is invalid: '__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server");
785+
751786
// add two partitions.
752787
admin.createPartition(tablePath, newPartitionSpec("age", "10"), false).get();
753788
admin.createPartition(tablePath, newPartitionSpec("age", "11"), false).get();

fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public class TablePath implements Serializable {
5454
// RecordAccumulator.ready)
5555
private Integer hash;
5656

57+
// Prefix reserved for internal system names. User-defined database and table and others names
58+
// are not
59+
// allowed to start with this prefix to prevent conflicts with system-generated identifiers.
60+
// This convention aligns with and maintains compatibility with Apache Kafka's naming standards.
61+
private static final String INTERNAL_NAME_PREFIX = "__";
62+
5763
public TablePath(String databaseName, String tableName) {
5864
this.databaseName = databaseName;
5965
this.tableName = tableName;
@@ -127,18 +133,36 @@ public String toString() {
127133

128134
public static void validateDatabaseName(String databaseName) throws InvalidDatabaseException {
129135
String dbError = detectInvalidName(databaseName);
130-
if (dbError != null) {
136+
String dbInternalNameError = validatePrefix(databaseName);
137+
if (dbError != null || dbInternalNameError != null) {
131138
throw new InvalidDatabaseException(
132-
"Database name " + databaseName + " is invalid: " + dbError);
139+
"Database name "
140+
+ databaseName
141+
+ " is invalid: "
142+
+ (dbError != null ? dbError : dbInternalNameError));
133143
}
134144
}
135145

136146
public static void validateTableName(String tableName) throws InvalidTableException {
137147
String tableError = detectInvalidName(tableName);
138-
if (tableError != null) {
148+
String tableInternalNameError = validatePrefix(tableName);
149+
if (tableError != null || tableInternalNameError != null) {
139150
throw new InvalidTableException(
140-
"Table name " + tableName + " is invalid: " + tableError);
151+
"Table name "
152+
+ tableName
153+
+ " is invalid: "
154+
+ (tableError != null ? tableError : tableInternalNameError));
155+
}
156+
}
157+
158+
public static String validatePrefix(String identifier) throws InvalidTableException {
159+
if (identifier != null && identifier.startsWith(INTERNAL_NAME_PREFIX)) {
160+
return "'"
161+
+ INTERNAL_NAME_PREFIX
162+
+ "' is not allowed as prefix, since it is reserved"
163+
+ " for internal databases/internal tables/internal partitions in Fluss server";
141164
}
165+
return null;
142166
}
143167

144168
public static String detectInvalidName(String identifier) {

fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Map;
3737

3838
import static org.apache.fluss.metadata.TablePath.detectInvalidName;
39+
import static org.apache.fluss.metadata.TablePath.validatePrefix;
3940

4041
/** Utils for partition. */
4142
public class PartitionUtils {
@@ -65,7 +66,10 @@ public class PartitionUtils {
6566
private static final String HOUR_FORMAT = "yyyyMMddHH";
6667

6768
public static void validatePartitionSpec(
68-
TablePath tablePath, List<String> partitionKeys, PartitionSpec partitionSpec) {
69+
TablePath tablePath,
70+
List<String> partitionKeys,
71+
PartitionSpec partitionSpec,
72+
boolean isCreate) {
6973
Map<String, String> partitionSpecMap = partitionSpec.getSpecMap();
7074
if (partitionKeys.size() != partitionSpecMap.size()) {
7175
throw new InvalidPartitionException(
@@ -86,16 +90,21 @@ public static void validatePartitionSpec(
8690
}
8791
}
8892

89-
validatePartitionValues(reOrderedPartitionValues);
93+
validatePartitionValues(reOrderedPartitionValues, isCreate);
9094
}
9195

9296
@VisibleForTesting
93-
static void validatePartitionValues(List<String> partitionValues) {
97+
static void validatePartitionValues(List<String> partitionValues, boolean isCreate) {
9498
for (String value : partitionValues) {
95-
String invalidName = detectInvalidName(value);
96-
if (invalidName != null) {
99+
String invalidNameError = detectInvalidName(value);
100+
if (invalidNameError != null || (isCreate && validatePrefix(value) != null)) {
97101
throw new InvalidPartitionException(
98-
"The partition value " + value + " is invalid: " + invalidName);
102+
"The partition value "
103+
+ value
104+
+ " is invalid: "
105+
+ (invalidNameError != null
106+
? invalidNameError
107+
: validatePrefix(value)));
99108
}
100109
}
101110
}

fluss-common/src/test/java/org/apache/fluss/metadata/TablePathTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ void testValidate() {
3737
assertThat(path.isValid()).isTrue();
3838
assertThat(path.toString()).isEqualTo("db_2-abc3.table-1_abc_2");
3939

40+
// assert invalid name prefix
41+
TablePath invalidPath = TablePath.of("db_2", "__table-1");
42+
assertThatThrownBy(invalidPath::validate)
43+
.isInstanceOf(InvalidTableException.class)
44+
.hasMessageContaining(
45+
"Table name __table-1 is invalid: '__' is not allowed as prefix");
46+
4047
// check max length
4148
String longName = StringUtils.repeat("a", 200);
4249
assertThat(TablePath.of(longName, longName).isValid()).isTrue();

fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,35 @@
4545
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
4646
import static org.apache.fluss.utils.PartitionUtils.validatePartitionValues;
4747
import static org.assertj.core.api.Assertions.assertThat;
48+
import static org.assertj.core.api.Assertions.assertThatNoException;
4849
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4950

5051
/** Test for {@link PartitionUtils}. */
5152
class PartitionUtilsTest {
5253

5354
@Test
5455
void testValidatePartitionValues() {
55-
assertThatThrownBy(() -> validatePartitionValues(Arrays.asList("$1", "2")))
56+
assertThatThrownBy(() -> validatePartitionValues(Arrays.asList("$1", "2"), true))
5657
.isInstanceOf(InvalidPartitionException.class)
5758
.hasMessageContaining(
5859
"The partition value $1 is invalid: '$1' contains one "
5960
+ "or more characters other than ASCII alphanumerics, '_' and '-'");
6061

61-
assertThatThrownBy(() -> validatePartitionValues(Arrays.asList("?1", "2")))
62+
assertThatThrownBy(() -> validatePartitionValues(Arrays.asList("?1", "2"), false))
6263
.isInstanceOf(InvalidPartitionException.class)
6364
.hasMessageContaining(
6465
"The partition value ?1 is invalid: '?1' contains one or more "
6566
+ "characters other than ASCII alphanumerics, '_' and '-'");
6667

68+
assertThatThrownBy(() -> validatePartitionValues(Arrays.asList("__p1", "2"), true))
69+
.isInstanceOf(InvalidPartitionException.class)
70+
.hasMessageContaining(
71+
"The partition value __p1 is invalid: '__' is not allowed as prefix, "
72+
+ "since it is reserved for internal databases/internal tables/internal partitions in Fluss server");
73+
74+
assertThatNoException()
75+
.isThrownBy(() -> validatePartitionValues(Arrays.asList("__p1", "2"), false));
76+
6777
TableDescriptor descriptor =
6878
TableDescriptor.builder()
6979
.schema(DATA1_SCHEMA)
@@ -80,7 +90,8 @@ void testValidatePartitionValues() {
8090
validatePartitionSpec(
8191
tableInfo.getTablePath(),
8292
tableInfo.getPartitionKeys(),
83-
new PartitionSpec(Collections.emptyMap())))
93+
new PartitionSpec(Collections.emptyMap()),
94+
true))
8495
.isInstanceOf(InvalidPartitionException.class)
8596
.hasMessageContaining(
8697
"PartitionSpec size is not equal to partition keys size for "

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
448448

449449
// first, validate the partition spec, and get resolved partition spec.
450450
PartitionSpec partitionSpec = getPartitionSpec(request.getPartitionSpec());
451-
validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec);
451+
validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec, true);
452452
ResolvedPartitionSpec partitionToCreate =
453453
ResolvedPartitionSpec.fromPartitionSpec(table.partitionKeys, partitionSpec);
454454

@@ -489,7 +489,7 @@ public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionReque
489489

490490
// first, validate the partition spec.
491491
PartitionSpec partitionSpec = getPartitionSpec(request.getPartitionSpec());
492-
validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec);
492+
validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec, false);
493493
ResolvedPartitionSpec partitionToDrop =
494494
ResolvedPartitionSpec.fromPartitionSpec(table.partitionKeys, partitionSpec);
495495

0 commit comments

Comments
 (0)