1717
1818package org .apache .fluss .flink .catalog ;
1919
20+ import org .apache .fluss .client .Connection ;
21+ import org .apache .fluss .client .ConnectionFactory ;
22+ import org .apache .fluss .client .admin .Admin ;
2023import org .apache .fluss .cluster .ServerNode ;
2124import org .apache .fluss .config .ConfigOptions ;
2225import org .apache .fluss .config .Configuration ;
2528import org .apache .fluss .exception .InvalidConfigException ;
2629import org .apache .fluss .exception .InvalidTableException ;
2730import org .apache .fluss .metadata .DataLakeFormat ;
31+ import org .apache .fluss .metadata .TableInfo ;
2832import org .apache .fluss .metadata .TablePath ;
2933import org .apache .fluss .server .testutils .FlussClusterExtension ;
3034
@@ -586,7 +590,9 @@ void testTableWithExpression() throws Exception {
586590 + " cost AS price * quantity,\n "
587591 + " order_time TIMESTAMP(3),\n "
588592 + " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n "
589- + ") with ('k1' = 'v1')" );
593+ + ") with ('k1' = 'v1', 'bucket.num' = '2', "
594+ + "'table.datalake.format' = 'paimon', "
595+ + "'client.connect-timeout' = '120s')" );
590596 CatalogTable table =
591597 (CatalogTable ) catalog .getTable (new ObjectPath (DEFAULT_DB , "expression_test" ));
592598 Schema .Builder schemaBuilder = Schema .newBuilder ();
@@ -606,9 +612,36 @@ void testTableWithExpression() throws Exception {
606612 Map <String , String > expectedOptions = new HashMap <>();
607613 expectedOptions .put ("k1" , "v1" );
608614 expectedOptions .put (BUCKET_KEY .key (), "user" );
609- expectedOptions .put (BUCKET_NUMBER .key (), "1 " );
615+ expectedOptions .put (BUCKET_NUMBER .key (), "2 " );
610616 expectedOptions .put ("table.datalake.format" , "paimon" );
617+ expectedOptions .put ("client.connect-timeout" , "120s" );
611618 assertOptionsEqual (table .getOptions (), expectedOptions );
619+
620+ // assert the stored table/custom configs
621+ Configuration clientConfig = FLUSS_CLUSTER_EXTENSION .getClientConfig ();
622+ try (Connection conn = ConnectionFactory .createConnection (clientConfig )) {
623+ Admin admin = conn .getAdmin ();
624+ TableInfo tableInfo =
625+ admin .getTableInfo (TablePath .of (DEFAULT_DB , "expression_test" )).get ();
626+
627+ Map <String , String > expectedTableProperties = new HashMap <>();
628+ expectedTableProperties .put ("table.datalake.format" , "paimon" );
629+ expectedTableProperties .put ("table.replication.factor" , "1" );
630+ assertThat (tableInfo .getTableConfig ()).isEqualTo (expectedTableProperties );
631+
632+ Map <String , String > expectedCustomProperties = new HashMap <>();
633+ expectedCustomProperties .put ("k1" , "v1" );
634+ expectedCustomProperties .put ("client.connect-timeout" , "120s" );
635+ expectedCustomProperties .put (
636+ "schema.watermark.0.strategy.expr" , "`order_time` - INTERVAL '5' SECOND" );
637+ expectedCustomProperties .put ("schema.watermark.0.rowtime" , "order_time" );
638+ expectedCustomProperties .put ("schema.watermark.0.data-type" , "TIMESTAMP(3)" );
639+ expectedCustomProperties .put ("schema.4.name" , "cost" );
640+ expectedCustomProperties .put ("schema.4.expr" , "`price` * `quantity`" );
641+ expectedCustomProperties .put ("schema.4.data-type" , "DOUBLE" );
642+ expectedCustomProperties .put ("bucket.num" , "2" );
643+ assertThat (tableInfo .getCustomProperties ()).isEqualTo (expectedCustomProperties );
644+ }
612645 }
613646
614647 @ Test
0 commit comments