11
11
)
12
12
from contextlib import (
13
13
ExitStack ,
14
+ closing ,
14
15
contextmanager ,
15
16
)
16
17
from datetime import (
@@ -1651,10 +1652,18 @@ def run_transaction(self):
1651
1652
1652
1653
def execute (self , sql : str | Select | TextClause , params = None ):
1653
1654
"""Simple passthrough to SQLAlchemy connectable"""
1655
+ from sqlalchemy .exc import DBAPIError as SQLAlchemyDatabaseError
1656
+
1654
1657
args = [] if params is None else [params ]
1655
1658
if isinstance (sql , str ):
1656
- return self .con .exec_driver_sql (sql , * args )
1657
- return self .con .execute (sql , * args )
1659
+ execute_function = self .con .exec_driver_sql
1660
+ else :
1661
+ execute_function = self .con .execute
1662
+
1663
+ try :
1664
+ return execute_function (sql , * args )
1665
+ except SQLAlchemyDatabaseError as exc :
1666
+ raise DatabaseError (f"Execution failed on sql '{ sql } ': { exc } " ) from exc
1658
1667
1659
1668
def read_table (
1660
1669
self ,
@@ -2108,17 +2117,19 @@ def run_transaction(self):
2108
2117
self .con .commit ()
2109
2118
2110
2119
def execute (self , sql : str | Select | TextClause , params = None ):
2120
+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2121
+
2111
2122
if not isinstance (sql , str ):
2112
2123
raise TypeError ("Query must be a string unless using sqlalchemy." )
2113
2124
args = [] if params is None else [params ]
2114
2125
cur = self .con .cursor ()
2115
2126
try :
2116
2127
cur .execute (sql , * args )
2117
2128
return cur
2118
- except Exception as exc :
2129
+ except ADBCDatabaseError as exc :
2119
2130
try :
2120
2131
self .con .rollback ()
2121
- except Exception as inner_exc : # pragma: no cover
2132
+ except ADBCDatabaseError as inner_exc : # pragma: no cover
2122
2133
ex = DatabaseError (
2123
2134
f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
2124
2135
)
@@ -2207,8 +2218,7 @@ def read_table(
2207
2218
else :
2208
2219
stmt = f"SELECT { select_list } FROM { table_name } "
2209
2220
2210
- with self .con .cursor () as cur :
2211
- cur .execute (stmt )
2221
+ with closing (self .execute (stmt )) as cur :
2212
2222
pa_table = cur .fetch_arrow_table ()
2213
2223
df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
2214
2224
@@ -2278,8 +2288,7 @@ def read_query(
2278
2288
if chunksize :
2279
2289
raise NotImplementedError ("'chunksize' is not implemented for ADBC drivers" )
2280
2290
2281
- with self .con .cursor () as cur :
2282
- cur .execute (sql )
2291
+ with closing (self .execute (sql )) as cur :
2283
2292
pa_table = cur .fetch_arrow_table ()
2284
2293
df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
2285
2294
@@ -2335,6 +2344,9 @@ def to_sql(
2335
2344
engine : {'auto', 'sqlalchemy'}, default 'auto'
2336
2345
Raises NotImplementedError if not set to 'auto'
2337
2346
"""
2347
+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2348
+ import pyarrow as pa
2349
+
2338
2350
if index_label :
2339
2351
raise NotImplementedError (
2340
2352
"'index_label' is not implemented for ADBC drivers"
@@ -2364,22 +2376,25 @@ def to_sql(
2364
2376
if if_exists == "fail" :
2365
2377
raise ValueError (f"Table '{ table_name } ' already exists." )
2366
2378
elif if_exists == "replace" :
2367
- with self . con . cursor () as cur :
2368
- cur .execute (f"DROP TABLE { table_name } " )
2379
+ sql_statement = f"DROP TABLE { table_name } "
2380
+ self .execute (sql_statement ). close ( )
2369
2381
elif if_exists == "append" :
2370
2382
mode = "append"
2371
2383
2372
- import pyarrow as pa
2373
-
2374
2384
try :
2375
2385
tbl = pa .Table .from_pandas (frame , preserve_index = index )
2376
2386
except pa .ArrowNotImplementedError as exc :
2377
2387
raise ValueError ("datatypes not supported" ) from exc
2378
2388
2379
2389
with self .con .cursor () as cur :
2380
- total_inserted = cur .adbc_ingest (
2381
- table_name = name , data = tbl , mode = mode , db_schema_name = schema
2382
- )
2390
+ try :
2391
+ total_inserted = cur .adbc_ingest (
2392
+ table_name = name , data = tbl , mode = mode , db_schema_name = schema
2393
+ )
2394
+ except ADBCDatabaseError as exc :
2395
+ raise DatabaseError (
2396
+ f"Failed to insert records on table={ name } with { mode = } "
2397
+ ) from exc
2383
2398
2384
2399
self .con .commit ()
2385
2400
return total_inserted
@@ -2496,9 +2511,9 @@ def sql_schema(self) -> str:
2496
2511
return str (";\n " .join (self .table ))
2497
2512
2498
2513
def _execute_create (self ) -> None :
2499
- with self .pd_sql .run_transaction () as conn :
2514
+ with self .pd_sql .run_transaction ():
2500
2515
for stmt in self .table :
2501
- conn . execute (stmt )
2516
+ self . pd_sql . execute (stmt ). close ( )
2502
2517
2503
2518
def insert_statement (self , * , num_rows : int ) -> str :
2504
2519
names = list (map (str , self .frame .columns ))
@@ -2520,8 +2535,13 @@ def insert_statement(self, *, num_rows: int) -> str:
2520
2535
return insert_statement
2521
2536
2522
2537
def _execute_insert (self , conn , keys , data_iter ) -> int :
2538
+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2539
+
2523
2540
data_list = list (data_iter )
2524
- conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2541
+ try :
2542
+ conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2543
+ except SQLiteDatabaseError as exc :
2544
+ raise DatabaseError ("Execution failed" ) from exc
2525
2545
return conn .rowcount
2526
2546
2527
2547
def _execute_insert_multi (self , conn , keys , data_iter ) -> int :
@@ -2643,17 +2663,19 @@ def run_transaction(self):
2643
2663
cur .close ()
2644
2664
2645
2665
def execute (self , sql : str | Select | TextClause , params = None ):
2666
+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2667
+
2646
2668
if not isinstance (sql , str ):
2647
2669
raise TypeError ("Query must be a string unless using sqlalchemy." )
2648
2670
args = [] if params is None else [params ]
2649
2671
cur = self .con .cursor ()
2650
2672
try :
2651
2673
cur .execute (sql , * args )
2652
2674
return cur
2653
- except Exception as exc :
2675
+ except SQLiteDatabaseError as exc :
2654
2676
try :
2655
2677
self .con .rollback ()
2656
- except Exception as inner_exc : # pragma: no cover
2678
+ except SQLiteDatabaseError as inner_exc : # pragma: no cover
2657
2679
ex = DatabaseError (
2658
2680
f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
2659
2681
)
0 commit comments