@@ -18,6 +18,49 @@ const _RESERVED_INPUT_COLUMNS = Set(["offset", "size"])
1818const _REQUIRED_METADATA_COLUMNS = Set ([" name" , " offset" , " size" ])
1919
2020
21+ # Shared DuckDB connection used by every parquet helper here and by
22+ # Reader.read.
23+
24+ const _DUCKDB_DB = Ref {Union{Nothing,DuckDB.DB}} (nothing )
25+ const _DUCKDB_CON = Ref {Union{Nothing,DuckDB.Connection}} (nothing )
26+ const _DUCKDB_LOCK = ReentrantLock ()
27+
28+
29+ function _init_duckdb! ()
30+ _DUCKDB_CON[] === nothing || return nothing
31+ _DUCKDB_DB[] = DuckDB. DB ()
32+ _DUCKDB_CON[] = DBInterface. connect (_DUCKDB_DB[])
33+ return nothing
34+ end
35+
36+
37+ function _close_duckdb! ()
38+ if _DUCKDB_CON[] != = nothing
39+ try
40+ DBInterface. close! (_DUCKDB_CON[])
41+ catch
42+ end
43+ _DUCKDB_CON[] = nothing
44+ end
45+ if _DUCKDB_DB[] != = nothing
46+ try
47+ DBInterface. close! (_DUCKDB_DB[])
48+ catch
49+ end
50+ _DUCKDB_DB[] = nothing
51+ end
52+ return nothing
53+ end
54+
55+
56+ function _duckdb_con ()
57+ con = _DUCKDB_CON[]
58+ con === nothing &&
59+ error (" cozip: DuckDB connection not initialized; Cozip.__init__ must run first" )
60+ return con
61+ end
62+
63+
2164"""
2265 stage_metadata(table) -> (metadata, paths)
2366
@@ -305,31 +348,25 @@ function _alloc_user_entries(
305348end
306349
307350
308- # Cached: short-lived connections trigger Windows heap corruption (duckdb#10787).
309- const _DUCKDB_DB = Ref {Any} (nothing )
310- const _DUCKDB_CON = Ref {Any} (nothing )
311-
312- function _duckdb_con ()
313- if _DUCKDB_CON[] === nothing
314- _DUCKDB_DB[] = DuckDB. DB ()
315- _DUCKDB_CON[] = DBInterface. connect (_DUCKDB_DB[])
351+ function _read_parquet_columns (path:: String ):: Vector{String}
352+ con = _duckdb_con ()
353+ return lock (_DUCKDB_LOCK) do
354+ result = DBInterface. execute (con,
355+ " SELECT * FROM read_parquet('$(_sql_escape (path)) ') LIMIT 0" )
356+ names (DataFrame (result))
316357 end
317- return _DUCKDB_CON[]
318358end
319359
320360
321- function _read_parquet_columns (path:: String ):: Vector{String}
322- result = DBInterface. execute (_duckdb_con (),
323- " SELECT * FROM read_parquet('$(_sql_escape (path)) ') LIMIT 0" )
324- return names (DataFrame (result))
325- end
326-
327361function _read_parquet_subset (path:: String , columns:: Vector{String} ):: DataFrame
328- # `offset` and `size` are reserved in DuckDB SQL.
329- cols_sql = join ([" \" $c \" " for c in columns], " , " )
330- result = DBInterface. execute (_duckdb_con (),
331- " SELECT $cols_sql FROM read_parquet('$(_sql_escape (path)) ')" )
332- return DataFrame (result)
362+ con = _duckdb_con ()
363+ return lock (_DUCKDB_LOCK) do
364+ # `offset` and `size` are reserved in DuckDB SQL.
365+ cols_sql = join ([" \" $c \" " for c in columns], " , " )
366+ result = DBInterface. execute (con,
367+ " SELECT $cols_sql FROM read_parquet('$(_sql_escape (path)) ')" )
368+ DataFrame (result)
369+ end
333370end
334371
335372
@@ -390,22 +427,27 @@ function _write_metadata_parquet(df::DataFrame, out_path::AbstractString)
390427 cols = names (df)
391428 col_types = [string (" \" " , c, " \" " , _julia_to_duckdb_type (eltype (df[! , c])))
392429 for c in cols]
393- tbl = " tmp_cozip_ $( getpid ()) _ $( time_ns ()) "
430+
394431 con = _duckdb_con ()
395432
396- try
397- DBInterface. execute (con,
398- " CREATE TABLE $tbl ($(join (col_types, " , " )) )" )
399- for i in 1 : nrow (df)
400- vals = join ((_sql_literal (df[i, c]) for c in cols), " , " )
401- DBInterface. execute (con, " INSERT INTO $tbl VALUES ($vals )" )
433+ lock (_DUCKDB_LOCK) do
434+ try
435+ DBInterface. execute (con,
436+ " CREATE OR REPLACE TEMP TABLE tmp_cozip ($(join (col_types, " , " )) )" )
437+
438+ for i in 1 : nrow (df)
439+ vals = join ((_sql_literal (df[i, c]) for c in cols), " , " )
440+ DBInterface. execute (con, " INSERT INTO tmp_cozip VALUES ($vals )" )
441+ end
442+
443+ kv_sql = _kv_metadata_sql (_table_metadata (df))
444+ DBInterface. execute (con,
445+ " COPY tmp_cozip TO '$(_sql_escape (String (out_path))) ' (FORMAT parquet$kv_sql )" )
446+ finally
447+ DBInterface. execute (con, " DROP TABLE IF EXISTS tmp_cozip" )
402448 end
403- kv_sql = _kv_metadata_sql (_table_metadata (df))
404- DBInterface. execute (con,
405- " COPY $tbl TO '$(_sql_escape (String (out_path))) ' (FORMAT parquet$kv_sql )" )
406- finally
407- DBInterface. execute (con, " DROP TABLE IF EXISTS $tbl " )
408449 end
450+
409451 return nothing
410452end
411453
0 commit comments