Skip to content

Commit

Permalink
Fix deadlock when connection fails
Browse files Browse the repository at this point in the history
Wraps sqlite3.connect() in try/except
and stores the exception to raise it in check_raise_error()
  • Loading branch information
Arnau Orriols committed Jun 21, 2019
1 parent 39e8dce commit 2e5b23a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 43 deletions.
95 changes: 57 additions & 38 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',

logger.info("opening Sqlite table %r in %s" % (tablename, filename))
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
self.conn = self._new_conn()
self.conn.execute(MAKE_TABLE)
self.conn.commit()
try:
self.conn = self._new_conn()
self.conn.execute(MAKE_TABLE)
self.conn.commit()
except sqlite3.OperationalError as e:
raise RuntimeError(str(e))
if flag == 'w':
self.clear()

Expand Down Expand Up @@ -381,20 +384,14 @@ def __init__(self, filename, autocommit, journal_mode):
self.setDaemon(True) # python2.5-compatible
self.exception = None
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.connect()
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')

res = None
conn = None
cursor = None
while True:
req, arg, res, outer_stack = self.reqs.get()
if req == '--close--':
Expand All @@ -404,35 +401,25 @@ def run(self):
conn.commit()
if res:
res.put('--no more--')
elif req == '--connect--':
try:
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
except Exception as err:
self.store_error(outer_stack, err)
else:
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
else:
try:
cursor.execute(req, arg)
except Exception as err:
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
inner_stack = traceback.extract_stack()

# An exception occurred in our thread, but we may not
# immediately able to throw it in our calling thread, if it has
# no return `res` queue: log as level ERROR both the inner and
# outer exception immediately.
#
# Any iteration of res.get() or any next call will detect the
# inner exception and re-raise it in the calling Thread; though
# it may be confusing to see an exception for an unrelated
# statement, an ERROR log statement from the 'sqlitedict.*'
# namespace contains the original outer stack location.
self.log.error('Inner exception:')
for item in traceback.format_list(inner_stack):
self.log.error(item)
self.log.error('') # deliniate traceback & exception w/blank line
for item in traceback.format_exception_only(e_type, e_value):
self.log.error(item)

self.log.error('') # exception & outer stack w/blank line
self.log.error('Outer stack:')
for item in traceback.format_list(outer_stack):
self.log.error(item)
self.log.error('Exception will be re-raised at next call.')
self.store_error(outer_stack, err)

if res:
for rec in cursor:
Expand All @@ -443,9 +430,38 @@ def run(self):
conn.commit()

self.log.debug('received: %s, send: --no more--', req)
conn.close()
if conn is not None:
conn.close()
res.put('--no more--')

def store_error(self, outer_stack, err):
""" Store error to be raised in any next call """
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
inner_stack = traceback.extract_stack()

# An exception occurred in our thread, but we may not
# immediately able to throw it in our calling thread, if it has
# no return `res` queue: log as level ERROR both the inner and
# outer exception immediately.
#
# Any iteration of res.get() or any next call will detect the
# inner exception and re-raise it in the calling Thread; though
# it may be confusing to see an exception for an unrelated
# statement, an ERROR log statement from the 'sqlitedict.*'
# namespace contains the original outer stack location.
self.log.error('Inner exception:')
for item in traceback.format_list(inner_stack):
self.log.error(item)
self.log.error('') # deliniate traceback & exception w/blank line
for item in traceback.format_exception_only(e_type, e_value):
self.log.error(item)

self.log.error('') # exception & outer stack w/blank line
self.log.error('Outer stack:')
for item in traceback.format_list(outer_stack):
self.log.error(item)
self.log.error('Exception will be re-raised at next call.')

def check_raise_error(self):
"""
Check for and raise exception for any previous sqlite query.
Expand Down Expand Up @@ -527,6 +543,9 @@ def commit(self, blocking=True):
# otherwise, we fire and forget as usual.
self.execute('--commit--')

def connect(self):
self.execute('--connect--')

def close(self, force=False):
if force:
# If a SqliteDict is being killed or garbage-collected, then select_one()
Expand Down
20 changes: 15 additions & 5 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def test_directory_notfound(self):
with self.assertRaises(RuntimeError):
SqliteDict(filename=os.path.join(folder, 'nonexistent'))

def test_failed_connection(self):
""" Verify error when connecting does not deadlock """
# given: a non-existent directory,
folder = tempfile.mkdtemp(prefix='sqlitedict-test')
# os.rmdir(folder)
# exercise,
with self.assertRaises(RuntimeError):
SqliteDict(filename=folder)
os.rmdir(folder)

def test_commit_nonblocking(self):
"""Coverage for non-blocking commit."""
# given,
Expand Down Expand Up @@ -133,8 +143,8 @@ def attempt_clear():
def attempt_terminate():
readonly_db.terminate()

attempt_funcs = [attempt_write,
attempt_update,
attempt_funcs = [attempt_write,
attempt_update,
attempt_delete,
attempt_clear,
attempt_terminate]
Expand All @@ -159,7 +169,7 @@ def test_irregular_tablenames(self):

with self.assertRaisesRegexp(ValueError, r'^Invalid tablename '):
SqliteDict(':memory:', '"')

def test_overwrite_using_flag_w(self):
"""Re-opening of a database with flag='w' destroys only the target table."""
# given,
Expand Down Expand Up @@ -277,6 +287,6 @@ def test_tablenames(self):
self.assertEqual(SqliteDict.get_tablenames(fname), ['table1'])
with SqliteDict(fname,tablename='table2') as db2:
self.assertEqual(SqliteDict.get_tablenames(fname), ['table1','table2'])

tablenames = SqliteDict.get_tablenames('tests/db/tablenames-test-2.sqlite')
self.assertEqual(tablenames, ['table1','table2'])
self.assertEqual(tablenames, ['table1','table2'])

0 comments on commit 2e5b23a

Please sign in to comment.