Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when connection fails #97

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 57 additions & 38 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',

logger.info("opening Sqlite table %r in %s" % (tablename, filename))
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
self.conn = self._new_conn()
self.conn.execute(MAKE_TABLE)
self.conn.commit()
try:
self.conn = self._new_conn()
self.conn.execute(MAKE_TABLE)
self.conn.commit()
except sqlite3.OperationalError as e:
raise RuntimeError(str(e))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it better to raise RuntimeError than sqlite3.OperationalError here?

In my opinion, it's not a good idea because it hides useful information from the user.

if flag == 'w':
self.clear()

Expand Down Expand Up @@ -381,20 +384,14 @@ def __init__(self, filename, autocommit, journal_mode):
self.setDaemon(True) # python2.5-compatible
self.exception = None
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.connect()
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')

res = None
conn = None
cursor = None
while True:
req, arg, res, outer_stack = self.reqs.get()
if req == '--close--':
Expand All @@ -404,35 +401,25 @@ def run(self):
conn.commit()
if res:
res.put('--no more--')
elif req == '--connect--':
try:
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
except Exception as err:
self.store_error(outer_stack, err)
else:
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
else:
try:
cursor.execute(req, arg)
except Exception as err:
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
inner_stack = traceback.extract_stack()

# An exception occurred in our thread, but we may not
# immediately able to throw it in our calling thread, if it has
# no return `res` queue: log as level ERROR both the inner and
# outer exception immediately.
#
# Any iteration of res.get() or any next call will detect the
# inner exception and re-raise it in the calling Thread; though
# it may be confusing to see an exception for an unrelated
# statement, an ERROR log statement from the 'sqlitedict.*'
# namespace contains the original outer stack location.
self.log.error('Inner exception:')
for item in traceback.format_list(inner_stack):
self.log.error(item)
self.log.error('') # deliniate traceback & exception w/blank line
for item in traceback.format_exception_only(e_type, e_value):
self.log.error(item)

self.log.error('') # exception & outer stack w/blank line
self.log.error('Outer stack:')
for item in traceback.format_list(outer_stack):
self.log.error(item)
self.log.error('Exception will be re-raised at next call.')
self.store_error(outer_stack, err)

if res:
for rec in cursor:
Expand All @@ -443,9 +430,38 @@ def run(self):
conn.commit()

self.log.debug('received: %s, send: --no more--', req)
conn.close()
if conn is not None:
conn.close()
res.put('--no more--')

def store_error(self, outer_stack, err):
""" Store error to be raised in any next call """
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
inner_stack = traceback.extract_stack()

# An exception occurred in our thread, but we may not
# immediately able to throw it in our calling thread, if it has
# no return `res` queue: log as level ERROR both the inner and
# outer exception immediately.
#
# Any iteration of res.get() or any next call will detect the
# inner exception and re-raise it in the calling Thread; though
# it may be confusing to see an exception for an unrelated
# statement, an ERROR log statement from the 'sqlitedict.*'
# namespace contains the original outer stack location.
self.log.error('Inner exception:')
for item in traceback.format_list(inner_stack):
self.log.error(item)
self.log.error('') # deliniate traceback & exception w/blank line
for item in traceback.format_exception_only(e_type, e_value):
self.log.error(item)

self.log.error('') # exception & outer stack w/blank line
self.log.error('Outer stack:')
for item in traceback.format_list(outer_stack):
self.log.error(item)
self.log.error('Exception will be re-raised at next call.')

def check_raise_error(self):
"""
Check for and raise exception for any previous sqlite query.
Expand Down Expand Up @@ -527,6 +543,9 @@ def commit(self, blocking=True):
# otherwise, we fire and forget as usual.
self.execute('--commit--')

def connect(self):
self.execute('--connect--')

def close(self, force=False):
if force:
# If a SqliteDict is being killed or garbage-collected, then select_one()
Expand Down
20 changes: 15 additions & 5 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def test_directory_notfound(self):
with self.assertRaises(RuntimeError):
SqliteDict(filename=os.path.join(folder, 'nonexistent'))

def test_failed_connection(self):
""" Verify error when connecting does not deadlock """
# given: a non-existent directory,
folder = tempfile.mkdtemp(prefix='sqlitedict-test')
# os.rmdir(folder)
# exercise,
with self.assertRaises(RuntimeError):
SqliteDict(filename=folder)
os.rmdir(folder)

def test_commit_nonblocking(self):
"""Coverage for non-blocking commit."""
# given,
Expand Down Expand Up @@ -133,8 +143,8 @@ def attempt_clear():
def attempt_terminate():
readonly_db.terminate()

attempt_funcs = [attempt_write,
attempt_update,
attempt_funcs = [attempt_write,
attempt_update,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hanging indent please (not vertical).

attempt_delete,
attempt_clear,
attempt_terminate]
Expand All @@ -159,7 +169,7 @@ def test_irregular_tablenames(self):

with self.assertRaisesRegexp(ValueError, r'^Invalid tablename '):
SqliteDict(':memory:', '"')

def test_overwrite_using_flag_w(self):
"""Re-opening of a database with flag='w' destroys only the target table."""
# given,
Expand Down Expand Up @@ -277,6 +287,6 @@ def test_tablenames(self):
self.assertEqual(SqliteDict.get_tablenames(fname), ['table1'])
with SqliteDict(fname,tablename='table2') as db2:
self.assertEqual(SqliteDict.get_tablenames(fname), ['table1','table2'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpenkov can we set up an automated git hook to check for these things?

Code with trailing whitespace should never have been merged.


tablenames = SqliteDict.get_tablenames('tests/db/tablenames-test-2.sqlite')
self.assertEqual(tablenames, ['table1','table2'])
self.assertEqual(tablenames, ['table1','table2'])