@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414limitations under the License.
1515*/
1616
17- use std:: { collections:: HashMap , fmt :: Write , time:: Duration } ;
17+ use std:: { collections:: HashMap , time:: Duration } ;
1818
1919use anyhow:: { Result , anyhow} ;
2020use async_trait:: async_trait;
@@ -67,6 +67,11 @@ struct StdioArgs {
6767 #[ arg( long, env = "DATABRICKS_SCHEMA" , default_value = "tpch" ) ]
6868 databricks_schema : String ,
6969
70+ /// Storage credential name for accessing external S3 locations.
71+ /// If set, CREATE TABLE statements will include WITH (CREDENTIAL <name>).
72+ #[ arg( long, env = "DATABRICKS_STORAGE_CREDENTIAL" ) ]
73+ databricks_storage_credential : Option < String > ,
74+
7075 /// Drop created tables during teardown
7176 #[ arg( long, env = "DATABRICKS_DROP_TABLES_ON_TEARDOWN" , default_value_t = false ) ]
7277 drop_tables_on_teardown : bool ,
@@ -91,6 +96,7 @@ struct AdapterConfig {
9196 warehouse_id : String ,
9297 catalog : String ,
9398 schema : String ,
99+ storage_credential : Option < String > ,
94100 drop_tables_on_teardown : bool ,
95101}
96102
@@ -144,6 +150,7 @@ impl AdapterConfig {
144150 warehouse_id,
145151 catalog : args. databricks_catalog ,
146152 schema : args. databricks_schema ,
153+ storage_credential : args. databricks_storage_credential ,
147154 drop_tables_on_teardown : args. drop_tables_on_teardown ,
148155 } )
149156 }
@@ -362,17 +369,31 @@ impl Handler for DatabricksAdapter {
362369 . map_err ( |e| format ! ( "Invalid dataset '{dataset_name}' config: {e}" ) ) ?;
363370
364371 let table_name = dataset_name;
365- let mut sql = String :: new ( ) ;
366- let _ = write ! (
367- sql,
368- "CREATE OR REPLACE TABLE {}.{}.{} USING PARQUET LOCATION {}" ,
372+ let fqn = format ! (
373+ "{}.{}.{}" ,
369374 Self :: quoted_identifier( & self . config. catalog) ,
370375 Self :: quoted_identifier( & self . config. schema) ,
371376 Self :: quoted_identifier( & table_name) ,
377+ ) ;
378+
379+ let drop_sql = format ! ( "DROP TABLE IF EXISTS {fqn}" ) ;
380+ self . execute_sql ( & drop_sql)
381+ . await
382+ . map_err ( |e| format ! ( "Failed to drop existing table '{table_name}': {e}" ) ) ?;
383+
384+ let mut create_sql = format ! (
385+ "CREATE TABLE {fqn} USING PARQUET LOCATION {}" ,
372386 Self :: sql_string_literal( & location)
373387 ) ;
374388
375- self . execute_sql ( & sql)
389+ if let Some ( credential) = & self . config . storage_credential {
390+ create_sql. push_str ( & format ! (
391+ " WITH (CREDENTIAL {})" ,
392+ Self :: quoted_identifier( credential)
393+ ) ) ;
394+ }
395+
396+ self . execute_sql ( & create_sql)
376397 . await
377398 . map_err ( |e| format ! ( "Failed to create table '{table_name}': {e}" ) ) ?;
378399
0 commit comments