@@ -75,15 +75,8 @@ public static int mainInternal(String[] args) {
7575 GlobalConfiguration .loadConfiguration (
7676 configDirAndDynamicConfig .f0 , configDirAndDynamicConfig .f1 );
7777
78- // validate datalake format
79- DataLakeFormat datalakeFormat = globalConfiguration .get (ConfigOptions .DATALAKE_FORMAT );
80- if (datalakeFormat == null ) {
81- throw new IllegalArgumentException (
82- "The datalake format is not set,"
83- + " please set the configuration "
84- + ConfigOptions .DATALAKE_FORMAT .key ());
85- }
86-
78+ // get config for lake storage
79+ Map <String , String > lakeStorageConfig = getLakeStorageConfig (globalConfiguration );
8780 // get config for fluss source, now only bootstrap server is required
8881 String flussBootstrapServer = getFlussBootStrapServers (globalConfiguration );
8982
@@ -100,7 +93,14 @@ public static int mainInternal(String[] args) {
10093 flinkConfig .set (PipelineOptions .NAME , DEFAULT_PIPELINE_NAME );
10194
10295 // now, start execute
103- retCode = run (runAction , jarFile , flussBootstrapServer , flinkConfig , dynamicConfigs );
96+ retCode =
97+ run (
98+ runAction ,
99+ jarFile ,
100+ flussBootstrapServer ,
101+ lakeStorageConfig ,
102+ flinkConfig ,
103+ dynamicConfigs );
104104 } catch (Throwable t ) {
105105 t .printStackTrace (System .err );
106106 }
@@ -125,6 +125,7 @@ private static int run(
125125 String runAction ,
126126 String jarFile ,
127127 String flussBootstrapServer ,
128+ Map <String , String > lakeStorageConfig ,
128129 org .apache .flink .configuration .Configuration flinkConfig ,
129130 Map <String , String > dynamicConfigs ) {
130131 List <CustomCommandLine > customCommandLines = new ArrayList <>();
@@ -139,6 +140,10 @@ private static int run(
139140 jarFile ,
140141 "--" + ConfigOptions .BOOTSTRAP_SERVERS .key (),
141142 flussBootstrapServer ));
143+ for (Map .Entry <String , String > entry : lakeStorageConfig .entrySet ()) {
144+ arguments .add ("--" + entry .getKey ());
145+ arguments .add (entry .getValue ());
146+ }
142147 for (Map .Entry <String , String > entry : dynamicConfigs .entrySet ()) {
143148 arguments .add ("--" + entry .getKey ());
144149 arguments .add (entry .getValue ());
@@ -147,6 +152,25 @@ private static int run(
147152 return cliFrontend .parseAndRun (newArgs );
148153 }
149154
155+ private static Map <String , String > getLakeStorageConfig (Configuration configuration ) {
156+ // validate datalake format
157+ DataLakeFormat datalakeFormat = configuration .get (ConfigOptions .DATALAKE_FORMAT );
158+ if (datalakeFormat == null ) {
159+ throw new IllegalArgumentException (
160+ "The datalake format is not set,"
161+ + " please set the configuration "
162+ + ConfigOptions .DATALAKE_FORMAT .key ());
163+ }
164+ String datalakeConfigPrefix = "datalake." + datalakeFormat + "." ;
165+ Map <String , String > lakeStorageConfig = new HashMap <>();
166+ for (Map .Entry <String , String > entry : configuration .toMap ().entrySet ()) {
167+ if (entry .getKey ().startsWith (datalakeConfigPrefix )) {
168+ lakeStorageConfig .put (entry .getKey (), entry .getValue ());
169+ }
170+ }
171+ return lakeStorageConfig ;
172+ }
173+
150174 private static String getFlussBootStrapServers (Configuration configuration ) {
151175 Map <String , String > configMap = configuration .toMap ();
152176 String bootstrapServers = configMap .get (ConfigOptions .BOOTSTRAP_SERVERS .key ());
0 commit comments