@@ -1651,10 +1651,18 @@ def run_transaction(self):
1651
1651
1652
1652
def execute (self , sql : str | Select | TextClause , params = None ):
1653
1653
"""Simple passthrough to SQLAlchemy connectable"""
1654
+ from sqlalchemy .exc import DBAPIError as SQLAlchemyDatabaseError
1655
+
1654
1656
args = [] if params is None else [params ]
1655
1657
if isinstance (sql , str ):
1656
- return self .con .exec_driver_sql (sql , * args )
1657
- return self .con .execute (sql , * args )
1658
+ execute_function = self .con .exec_driver_sql
1659
+ else :
1660
+ execute_function = self .con .execute
1661
+
1662
+ try :
1663
+ return execute_function (sql , * args )
1664
+ except SQLAlchemyDatabaseError as exc :
1665
+ raise DatabaseError (f"Execution failed on sql '{ sql } ': { exc } " ) from exc
1658
1666
1659
1667
def read_table (
1660
1668
self ,
@@ -2108,17 +2116,19 @@ def run_transaction(self):
2108
2116
self .con .commit ()
2109
2117
2110
2118
def execute (self , sql : str | Select | TextClause , params = None ):
2119
+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2120
+
2111
2121
if not isinstance (sql , str ):
2112
2122
raise TypeError ("Query must be a string unless using sqlalchemy." )
2113
2123
args = [] if params is None else [params ]
2114
2124
cur = self .con .cursor ()
2115
2125
try :
2116
2126
cur .execute (sql , * args )
2117
2127
return cur
2118
- except Exception as exc :
2128
+ except ADBCDatabaseError as exc :
2119
2129
try :
2120
2130
self .con .rollback ()
2121
- except Exception as inner_exc : # pragma: no cover
2131
+ except ADBCDatabaseError as inner_exc : # pragma: no cover
2122
2132
ex = DatabaseError (
2123
2133
f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
2124
2134
)
@@ -2207,8 +2217,7 @@ def read_table(
2207
2217
else :
2208
2218
stmt = f"SELECT { select_list } FROM { table_name } "
2209
2219
2210
- with self .con .cursor () as cur :
2211
- cur .execute (stmt )
2220
+ with self .execute (stmt ) as cur :
2212
2221
pa_table = cur .fetch_arrow_table ()
2213
2222
df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
2214
2223
@@ -2278,8 +2287,7 @@ def read_query(
2278
2287
if chunksize :
2279
2288
raise NotImplementedError ("'chunksize' is not implemented for ADBC drivers" )
2280
2289
2281
- with self .con .cursor () as cur :
2282
- cur .execute (sql )
2290
+ with self .execute (sql ) as cur :
2283
2291
pa_table = cur .fetch_arrow_table ()
2284
2292
df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
2285
2293
@@ -2335,6 +2343,9 @@ def to_sql(
2335
2343
engine : {'auto', 'sqlalchemy'}, default 'auto'
2336
2344
Raises NotImplementedError if not set to 'auto'
2337
2345
"""
2346
+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2347
+ import pyarrow as pa
2348
+
2338
2349
if index_label :
2339
2350
raise NotImplementedError (
2340
2351
"'index_label' is not implemented for ADBC drivers"
@@ -2364,22 +2375,25 @@ def to_sql(
2364
2375
if if_exists == "fail" :
2365
2376
raise ValueError (f"Table '{ table_name } ' already exists." )
2366
2377
elif if_exists == "replace" :
2367
- with self . con . cursor () as cur :
2368
- cur .execute (f"DROP TABLE { table_name } " )
2378
+ sql_statement = f"DROP TABLE { table_name } "
2379
+ self .execute (sql_statement ). close ( )
2369
2380
elif if_exists == "append" :
2370
2381
mode = "append"
2371
2382
2372
- import pyarrow as pa
2373
-
2374
2383
try :
2375
2384
tbl = pa .Table .from_pandas (frame , preserve_index = index )
2376
2385
except pa .ArrowNotImplementedError as exc :
2377
2386
raise ValueError ("datatypes not supported" ) from exc
2378
2387
2379
2388
with self .con .cursor () as cur :
2380
- total_inserted = cur .adbc_ingest (
2381
- table_name = name , data = tbl , mode = mode , db_schema_name = schema
2382
- )
2389
+ try :
2390
+ total_inserted = cur .adbc_ingest (
2391
+ table_name = name , data = tbl , mode = mode , db_schema_name = schema
2392
+ )
2393
+ except ADBCDatabaseError as exc :
2394
+ raise DatabaseError (
2395
+ f"Failed to insert records on table={ name } with { mode = } "
2396
+ ) from exc
2383
2397
2384
2398
self .con .commit ()
2385
2399
return total_inserted
@@ -2496,9 +2510,9 @@ def sql_schema(self) -> str:
2496
2510
return str (";\n " .join (self .table ))
2497
2511
2498
2512
def _execute_create (self ) -> None :
2499
- with self .pd_sql .run_transaction () as conn :
2513
+ with self .pd_sql .run_transaction ():
2500
2514
for stmt in self .table :
2501
- conn . execute (stmt )
2515
+ self . pd_sql . execute (stmt ). close ( )
2502
2516
2503
2517
def insert_statement (self , * , num_rows : int ) -> str :
2504
2518
names = list (map (str , self .frame .columns ))
@@ -2520,8 +2534,13 @@ def insert_statement(self, *, num_rows: int) -> str:
2520
2534
return insert_statement
2521
2535
2522
2536
def _execute_insert (self , conn , keys , data_iter ) -> int :
2537
+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2538
+
2523
2539
data_list = list (data_iter )
2524
- conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2540
+ try :
2541
+ conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2542
+ except SQLiteDatabaseError as exc :
2543
+ raise DatabaseError ("Execution failed" ) from exc
2525
2544
return conn .rowcount
2526
2545
2527
2546
def _execute_insert_multi (self , conn , keys , data_iter ) -> int :
@@ -2643,17 +2662,19 @@ def run_transaction(self):
2643
2662
cur .close ()
2644
2663
2645
2664
def execute (self , sql : str | Select | TextClause , params = None ):
2665
+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2666
+
2646
2667
if not isinstance (sql , str ):
2647
2668
raise TypeError ("Query must be a string unless using sqlalchemy." )
2648
2669
args = [] if params is None else [params ]
2649
2670
cur = self .con .cursor ()
2650
2671
try :
2651
2672
cur .execute (sql , * args )
2652
2673
return cur
2653
- except Exception as exc :
2674
+ except SQLiteDatabaseError as exc :
2654
2675
try :
2655
2676
self .con .rollback ()
2656
- except Exception as inner_exc : # pragma: no cover
2677
+ except SQLiteDatabaseError as inner_exc : # pragma: no cover
2657
2678
ex = DatabaseError (
2658
2679
f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
2659
2680
)
0 commit comments