8686import java .util .stream .Stream ;
8787
8888import static org .apache .fluss .config .ConfigOptions .DATALAKE_FORMAT ;
89+ import static org .apache .fluss .config .ConfigOptions .TABLE_DATALAKE_ENABLED ;
90+ import static org .apache .fluss .config .ConfigOptions .TABLE_DATALAKE_FORMAT ;
8991import static org .apache .fluss .metadata .DataLakeFormat .PAIMON ;
9092import static org .apache .fluss .record .TestData .DATA1_SCHEMA ;
9193import static org .apache .fluss .testutils .DataTestUtils .row ;
94+ import static org .apache .fluss .testutils .common .CommonTestUtils .waitUntil ;
9295import static org .assertj .core .api .Assertions .assertThat ;
9396import static org .assertj .core .api .Assertions .assertThatThrownBy ;
9497
@@ -1070,7 +1073,7 @@ tablePath, newPartitionSpec("age", "11"), false)
10701073 }
10711074
10721075 @ Test
1073- void testDynamicConfigs () throws ExecutionException , InterruptedException {
1076+ void testDynamicConfigs () throws Exception {
10741077 assertThat (
10751078 FLUSS_CLUSTER_EXTENSION
10761079 .getCoordinatorServer ()
@@ -1092,20 +1095,37 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException {
10921095 assertConfigEntry (
10931096 DATALAKE_FORMAT .key (), null , ConfigEntry .ConfigSource .DYNAMIC_SERVER_CONFIG );
10941097
1095- // Delete dynamic configs to use the initial value(from server.yaml)
10961098 admin .alterClusterConfigs (
1097- Collections .singletonList (
1099+ Arrays .asList (
1100+ new AlterConfig (
1101+ DATALAKE_FORMAT .key (), "paimon" , AlterConfigOpType .SET ),
10981102 new AlterConfig (
1099- DATALAKE_FORMAT .key (), null , AlterConfigOpType .DELETE )))
1103+ "datalake.paimon.warehouse" ,
1104+ "test-warehouse" ,
1105+ AlterConfigOpType .SET )))
11001106 .get ();
1101- assertThat (
1102- FLUSS_CLUSTER_EXTENSION
1103- .getCoordinatorServer ()
1104- .getCoordinatorService ()
1105- .getDataLakeFormat ())
1106- .isEqualTo (PAIMON );
1107- assertConfigEntry (
1108- DATALAKE_FORMAT .key (), "paimon" , ConfigEntry .ConfigSource .INITIAL_SERVER_CONFIG );
1107+ TablePath tablePath = TablePath .of ("test_db" , "test_table" );
1108+ createTable (
1109+ tablePath ,
1110+ TableDescriptor .builder ()
1111+ .schema (DEFAULT_SCHEMA )
1112+ .property (TABLE_DATALAKE_ENABLED , true )
1113+ .build (),
1114+ true );
1115+
1116+ waitUntil (
1117+ () -> {
1118+ TableInfo tableInfo = admin .getTableInfo (tablePath ).get ();
1119+ Map <String , String > tableProperties = tableInfo .getProperties ().toMap ();
1120+ return tableProperties .containsKey (TABLE_DATALAKE_FORMAT .key ())
1121+ && tableProperties .containsKey ("table.datalake.paimon.warehouse" )
1122+ && PAIMON .toString ()
1123+ .equals (tableProperties .get (TABLE_DATALAKE_FORMAT .key ()))
1124+ && "test-warehouse"
1125+ .equals (tableProperties .get ("table.datalake.paimon.warehouse" ));
1126+ },
1127+ Duration .ofMinutes (1 ),
1128+ "Get lakehouse info" );
11091129 }
11101130
11111131 private void assertConfigEntry (
0 commit comments