1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use std:: { fs :: read_dir , sync:: Arc } ;
18+ use std:: sync:: Arc ;
1919
2020use color_eyre:: { Report , Result } ;
2121use datafusion:: {
@@ -26,34 +26,26 @@ use datafusion::{
2626 } ,
2727 prelude:: SessionContext ,
2828} ;
29- use log:: { error , info} ;
29+ use log:: info;
3030
3131use crate :: config:: DbConfig ;
3232
3333pub async fn register_db ( ctx : & SessionContext , db_config : & DbConfig ) -> Result < ( ) > {
3434 info ! ( "registering tables to database" ) ;
35- let tables_path = db_config. path . join ( "tables" ) ;
36- if !tables_path. exists ( ) || !tables_path. is_dir ( ) {
37- info ! ( "no tables directory configured, skipping table registration" ) ;
38- return Ok ( ( ) ) ;
39- }
40- let catalogs = read_dir ( tables_path) ?;
41- info ! ( "...reading catalogs" ) ;
42- for maybe_catalog in catalogs {
43- let catalog = maybe_catalog?;
44- let catalog_file_name = catalog. file_name ( ) ;
45- let catalog_name = catalog_file_name. to_str ( ) . ok_or ( Report :: msg ( format ! (
46- "invalid catalog path {catalog_file_name:?}"
47- ) ) ) ?;
48- // Every catalog should be a directory
49- if !catalog. path ( ) . is_dir ( ) {
50- error ! ( "catalog {catalog_name:?} is not a directory, skipping" ) ;
51- continue ;
52- }
53- let catalog_path = catalog. path ( ) ;
54- info ! ( "...handling {:?} catalog" , catalog_name) ;
35+ let tables_url = db_config. path . join ( "tables" ) ?;
36+ let listing_tables_url = ListingTableUrl :: parse ( tables_url. clone ( ) ) ?;
37+ let store_url = listing_tables_url. object_store ( ) ;
38+ let store = ctx. runtime_env ( ) . object_store ( store_url) ?;
39+ let tables_path = object_store:: path:: Path :: from_url_path ( tables_url. path ( ) ) ?;
40+ let catalogs = store. list_with_delimiter ( Some ( & tables_path) ) . await ?;
41+ for catalog in catalogs. common_prefixes {
42+ let catalog_name = catalog
43+ . filename ( )
44+ . ok_or ( Report :: msg ( "missing catalog name" ) ) ?;
45+ info ! ( "...handling {catalog_name} catalog" ) ;
5546 let maybe_catalog = ctx. catalog ( catalog_name) ;
5647 let catalog_provider = match maybe_catalog {
48+ Some ( catalog) => catalog,
5749 None => {
5850 info ! ( "...catalog does not exist, createing" ) ;
5951 let mem_catalog_provider = Arc :: new ( MemoryCatalogProvider :: new ( ) ) ;
@@ -62,23 +54,16 @@ pub async fn register_db(ctx: &SessionContext, db_config: &DbConfig) -> Result<(
6254 "missing catalog {catalog_name}, shouldnt be possible"
6355 ) ) ) ?
6456 }
65- Some ( catalog) => catalog,
6657 } ;
67- for maybe_schema in read_dir ( & catalog_path) ? {
68- let schema = maybe_schema?;
69- let schema_file_name = schema. file_name ( ) ;
70- let schema_name = schema_file_name. to_str ( ) . ok_or ( Report :: msg ( format ! (
71- "invalid schema path {schema_file_name:?}"
72- ) ) ) ?;
73- // Every schema should be a directory
74- if !schema. path ( ) . is_dir ( ) {
75- error ! ( "schema {schema_name:?} is not a directory, skipping" , ) ;
76- continue ;
77- }
78- let schema_path = schema. path ( ) ;
79- info ! ( "...handling {:?} schema" , schema_name) ;
58+ let schemas = store. list_with_delimiter ( Some ( & catalog) ) . await ?;
59+ for schema in schemas. common_prefixes {
60+ let schema_name = schema
61+ . filename ( )
62+ . ok_or ( Report :: msg ( "missing schema name" ) ) ?;
63+ info ! ( "...handling {schema_name} schema" ) ;
8064 let maybe_schema = catalog_provider. schema ( schema_name) ;
8165 let schema_provider = match maybe_schema {
66+ Some ( schema) => schema,
8267 None => {
8368 info ! ( "...schema does not exist, creating" ) ;
8469 let mem_schema_provider = Arc :: new ( MemorySchemaProvider :: new ( ) ) ;
@@ -89,24 +74,19 @@ pub async fn register_db(ctx: &SessionContext, db_config: &DbConfig) -> Result<(
8974 "missing schema {schema_name}, shouldnt be possible"
9075 ) ) ) ?
9176 }
92- Some ( schema) => schema,
9377 } ;
94- for maybe_table in read_dir ( schema_path) ? {
95- let table = maybe_table?;
96- // Every table should be a directory even if there is a single data file
97- if !table. path ( ) . is_dir ( ) {
98- error ! ( "table {:?} is not a directory, skipping" , catalog. path( ) ) ;
99- continue ;
100- }
101- let table_path = table. path ( ) ;
102- let table_file_name = table. file_name ( ) ;
103- let table_name = table_file_name. to_str ( ) . ok_or ( Report :: msg ( format ! (
104- "invalid table path {table_file_name:?}"
105- ) ) ) ?;
106- info ! ( "...handling table {table_name:?}" ) ;
107- let table_url = ListingTableUrl :: parse ( table_path. to_str ( ) . ok_or ( Report :: msg (
108- format ! ( "Invalid table path for {table_path:?}" ) ,
109- ) ) ?) ?;
78+ let tables = store. list_with_delimiter ( Some ( & schema) ) . await ?;
79+ for table_path in tables. common_prefixes {
80+ let table_name = table_path
81+ . filename ( )
82+ . ok_or ( Report :: msg ( "missing table name" ) ) ?;
83+ info ! ( "...handling table \" {catalog_name}.{schema_name}.{table_name}\" " ) ;
84+
85+ let p = tables_url
86+ . join ( & format ! ( "{catalog_name}/" ) ) ?
87+ . join ( & format ! ( "{schema_name}/" ) ) ?
88+ . join ( & format ! ( "{table_name}/" ) ) ?;
89+ let table_url = ListingTableUrl :: parse ( p) ?;
11090 let file_format = ParquetFormat :: new ( ) ;
11191 let listing_options =
11292 ListingOptions :: new ( Arc :: new ( file_format) ) . with_file_extension ( ".parquet" ) ;
@@ -119,11 +99,12 @@ pub async fn register_db(ctx: &SessionContext, db_config: &DbConfig) -> Result<(
11999 . with_schema ( resolved_schema) ;
120100 // Create a new TableProvider
121101 let provider = Arc :: new ( ListingTable :: try_new ( config) ?) ;
122- info ! ( "...registering {table_name} " ) ;
102+ info ! ( "...table registered " ) ;
123103 schema_provider. register_table ( table_name. to_string ( ) , provider) ?;
124104 }
125105 }
126106 }
107+
127108 Ok ( ( ) )
128109}
129110
@@ -179,9 +160,9 @@ mod test {
179160 let ctx = setup ( ) ;
180161 let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
181162 let db_path = dir. path ( ) . join ( "db" ) ;
182- let config = DbConfig {
183- path : db_path . clone ( ) ,
184- } ;
163+ let path = format ! ( "file://{}/" , db_path . to_str ( ) . unwrap ( ) ) ;
164+ let db_url = url :: Url :: parse ( & path ) . unwrap ( ) ;
165+ let config = DbConfig { path : db_url } ;
185166 let data_path = db_path. join ( "tables" ) . join ( "dft" ) . join ( "stuff" ) . join ( "hi" ) ;
186167
187168 let df = ctx. sql ( "SELECT 1" ) . await . unwrap ( ) ;
@@ -231,9 +212,9 @@ mod test {
231212 let ctx = setup ( ) ;
232213 let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
233214 let db_path = dir. path ( ) . join ( "db" ) ;
234- let config = DbConfig {
235- path : db_path . clone ( ) ,
236- } ;
215+ let path = format ! ( "file://{}/" , db_path . to_str ( ) . unwrap ( ) ) ;
216+ let db_url = url :: Url :: parse ( & path ) . unwrap ( ) ;
217+ let config = DbConfig { path : db_url } ;
237218 let data_1_path = db_path. join ( "tables" ) . join ( "dft" ) . join ( "stuff" ) . join ( "hi" ) ;
238219 let data_2_path = db_path. join ( "tables" ) . join ( "dft" ) . join ( "stuff" ) . join ( "bye" ) ;
239220
@@ -290,9 +271,9 @@ mod test {
290271 let ctx = setup ( ) ;
291272 let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
292273 let db_path = dir. path ( ) . join ( "db" ) ;
293- let config = DbConfig {
294- path : db_path . clone ( ) ,
295- } ;
274+ let path = format ! ( "file://{}/" , db_path . to_str ( ) . unwrap ( ) ) ;
275+ let db_url = url :: Url :: parse ( & path ) . unwrap ( ) ;
276+ let config = DbConfig { path : db_url } ;
296277 let data_1_path = db_path. join ( "tables" ) . join ( "dft" ) . join ( "stuff" ) . join ( "hi" ) ;
297278 let data_2_path = db_path
298279 . join ( "tables" )
@@ -353,9 +334,9 @@ mod test {
353334 let ctx = setup ( ) ;
354335 let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
355336 let db_path = dir. path ( ) . join ( "db" ) ;
356- let config = DbConfig {
357- path : db_path . clone ( ) ,
358- } ;
337+ let path = format ! ( "file://{}/" , db_path . to_str ( ) . unwrap ( ) ) ;
338+ let db_url = url :: Url :: parse ( & path ) . unwrap ( ) ;
339+ let config = DbConfig { path : db_url } ;
359340 let data_1_path = db_path. join ( "tables" ) . join ( "dft2" ) . join ( "stuff" ) . join ( "hi" ) ;
360341 let data_2_path = db_path
361342 . join ( "tables" )
0 commit comments