From 2e5b23a3fbafd077f54c0d669afdf2322dca20a8 Mon Sep 17 00:00:00 2001 From: Arnau Orriols Date: Fri, 21 Jun 2019 02:14:18 +0200 Subject: [PATCH] Fix deadlock when connection fails Wraps sqlite3.connect() in try/except and stores the exception to raise it in check_raise_error() --- sqlitedict.py | 95 +++++++++++++++++++++++++++------------------- tests/test_core.py | 20 +++++++--- 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/sqlitedict.py b/sqlitedict.py index 7b60235..5cd34f2 100755 --- a/sqlitedict.py +++ b/sqlitedict.py @@ -170,9 +170,12 @@ def __init__(self, filename=None, tablename='unnamed', flag='c', logger.info("opening Sqlite table %r in %s" % (tablename, filename)) MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename - self.conn = self._new_conn() - self.conn.execute(MAKE_TABLE) - self.conn.commit() + try: + self.conn = self._new_conn() + self.conn.execute(MAKE_TABLE) + self.conn.commit() + except sqlite3.OperationalError as e: + raise RuntimeError(str(e)) if flag == 'w': self.clear() @@ -381,20 +384,14 @@ def __init__(self, filename, autocommit, journal_mode): self.setDaemon(True) # python2.5-compatible self.exception = None self.log = logging.getLogger('sqlitedict.SqliteMultithread') + self.connect() self.start() def run(self): - if self.autocommit: - conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) - else: - conn = sqlite3.connect(self.filename, check_same_thread=False) - conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) - conn.text_factory = str - cursor = conn.cursor() - conn.commit() - cursor.execute('PRAGMA synchronous=OFF') res = None + conn = None + cursor = None while True: req, arg, res, outer_stack = self.reqs.get() if req == '--close--': @@ -404,35 +401,25 @@ def run(self): conn.commit() if res: res.put('--no more--') + elif req == '--connect--': + try: + if self.autocommit: + conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) + else: + conn = sqlite3.connect(self.filename, check_same_thread=False) + except Exception as err: + self.store_error(outer_stack, err) + else: + conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) + conn.text_factory = str + cursor = conn.cursor() + conn.commit() + cursor.execute('PRAGMA synchronous=OFF') else: try: cursor.execute(req, arg) except Exception as err: - self.exception = (e_type, e_value, e_tb) = sys.exc_info() - inner_stack = traceback.extract_stack() - - # An exception occurred in our thread, but we may not - # immediately able to throw it in our calling thread, if it has - # no return `res` queue: log as level ERROR both the inner and - # outer exception immediately. - # - # Any iteration of res.get() or any next call will detect the - # inner exception and re-raise it in the calling Thread; though - # it may be confusing to see an exception for an unrelated - # statement, an ERROR log statement from the 'sqlitedict.*' - # namespace contains the original outer stack location. - self.log.error('Inner exception:') - for item in traceback.format_list(inner_stack): - self.log.error(item) - self.log.error('') # deliniate traceback & exception w/blank line - for item in traceback.format_exception_only(e_type, e_value): - self.log.error(item) - - self.log.error('') # exception & outer stack w/blank line - self.log.error('Outer stack:') - for item in traceback.format_list(outer_stack): - self.log.error(item) - self.log.error('Exception will be re-raised at next call.') + self.store_error(outer_stack, err) if res: for rec in cursor: @@ -443,9 +430,38 @@ def run(self): conn.commit() self.log.debug('received: %s, send: --no more--', req) - conn.close() + if conn is not None: + conn.close() res.put('--no more--') + def store_error(self, outer_stack, err): + """ Store error to be raised in any next call """ + self.exception = (e_type, e_value, e_tb) = sys.exc_info() + inner_stack = traceback.extract_stack() + + # An exception occurred in our thread, but we may not + # immediately able to throw it in our calling thread, if it has + # no return `res` queue: log as level ERROR both the inner and + # outer exception immediately. + # + # Any iteration of res.get() or any next call will detect the + # inner exception and re-raise it in the calling Thread; though + # it may be confusing to see an exception for an unrelated + # statement, an ERROR log statement from the 'sqlitedict.*' + # namespace contains the original outer stack location. + self.log.error('Inner exception:') + for item in traceback.format_list(inner_stack): + self.log.error(item) + self.log.error('') # deliniate traceback & exception w/blank line + for item in traceback.format_exception_only(e_type, e_value): + self.log.error(item) + + self.log.error('') # exception & outer stack w/blank line + self.log.error('Outer stack:') + for item in traceback.format_list(outer_stack): + self.log.error(item) + self.log.error('Exception will be re-raised at next call.') + def check_raise_error(self): """ Check for and raise exception for any previous sqlite query. @@ -527,6 +543,9 @@ def commit(self, blocking=True): # otherwise, we fire and forget as usual. self.execute('--commit--') + def connect(self): + self.execute('--connect--') + def close(self, force=False): if force: # If a SqliteDict is being killed or garbage-collected, then select_one() diff --git a/tests/test_core.py b/tests/test_core.py index 1e43c4f..c680ed4 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -59,6 +59,16 @@ def test_directory_notfound(self): with self.assertRaises(RuntimeError): SqliteDict(filename=os.path.join(folder, 'nonexistent')) + def test_failed_connection(self): + """ Verify error when connecting does not deadlock """ + # given: a non-existent directory, + folder = tempfile.mkdtemp(prefix='sqlitedict-test') + # os.rmdir(folder) + # exercise, + with self.assertRaises(RuntimeError): + SqliteDict(filename=folder) + os.rmdir(folder) + def test_commit_nonblocking(self): """Coverage for non-blocking commit.""" # given, @@ -133,8 +143,8 @@ def attempt_clear(): def attempt_terminate(): readonly_db.terminate() - attempt_funcs = [attempt_write, - attempt_update, + attempt_funcs = [attempt_write, + attempt_update, attempt_delete, attempt_clear, attempt_terminate] @@ -159,7 +169,7 @@ def test_irregular_tablenames(self): with self.assertRaisesRegexp(ValueError, r'^Invalid tablename '): SqliteDict(':memory:', '"') - + def test_overwrite_using_flag_w(self): """Re-opening of a database with flag='w' destroys only the target table.""" # given, @@ -277,6 +287,6 @@ def test_tablenames(self): self.assertEqual(SqliteDict.get_tablenames(fname), ['table1']) with SqliteDict(fname,tablename='table2') as db2: self.assertEqual(SqliteDict.get_tablenames(fname), ['table1','table2']) - + tablenames = SqliteDict.get_tablenames('tests/db/tablenames-test-2.sqlite') - self.assertEqual(tablenames, ['table1','table2']) \ No newline at end of file + self.assertEqual(tablenames, ['table1','table2'])