Skip to content

Commit e0b9692

Browse files
committed
rewrite and optimize SQLite code
1 parent b6c71cd commit e0b9692

5 files changed

Lines changed: 104 additions & 89 deletions

File tree

bakta/ips.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import logging
2-
import sqlite3
32

43
from concurrent.futures import ThreadPoolExecutor
54
from typing import Sequence, Tuple
65

76
import bakta.config as cfg
87
import bakta.constants as bc
8+
import bakta.utils as bu
99

1010

1111
############################################################################
@@ -28,20 +28,17 @@ def lookup(features: Sequence[dict]) -> Tuple[Sequence[dict], Sequence[dict]]:
2828
features_found = []
2929
features_not_found = []
3030
rec_futures = []
31-
with sqlite3.connect(f"file:{cfg.db_path.joinpath('bakta.db')}?mode=ro&nolock=1&cache=shared", uri=True, check_same_thread=False) as conn:
32-
conn.execute('PRAGMA omit_readlock;')
33-
conn.row_factory = sqlite3.Row
34-
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
35-
for feature in features:
36-
if('truncated' not in feature): # skip truncated CDS
37-
uniref100_id = feature.get('ups', {}).get('uniref100_id', None)
38-
if(uniref100_id):
39-
future = tpe.submit(fetch_db_ips_result, conn, feature)
40-
rec_futures.append((feature, future))
41-
else:
42-
features_not_found.append(feature)
31+
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
32+
for feature in features:
33+
if('truncated' not in feature): # skip truncated CDS
34+
uniref100_id = feature.get('ups', {}).get('uniref100_id', None)
35+
if(uniref100_id):
36+
future = tpe.submit(fetch_db_ips_result, feature)
37+
rec_futures.append((feature, future))
4338
else:
4439
features_not_found.append(feature)
40+
else:
41+
features_not_found.append(feature)
4542

4643
for (feature, future) in rec_futures:
4744
rec = future.result()
@@ -63,12 +60,13 @@ def lookup(features: Sequence[dict]) -> Tuple[Sequence[dict], Sequence[dict]]:
6360
raise Exception('SQL error!', ex)
6461

6562

66-
def fetch_db_ips_result(conn: sqlite3.Connection, feature: dict):
67-
c = conn.cursor()
68-
c.execute('select * from ips where uniref100_id=?', (feature['ups']['uniref100_id'][10:],))
69-
rec = c.fetchone()
70-
c.close()
71-
return rec
63+
def fetch_db_ips_result(feature: dict):
64+
with bu.get_db_connection() as conn:
65+
c = conn.cursor()
66+
c.execute('select * from ips where uniref100_id=?', (feature['ups']['uniref100_id'][10:],))
67+
rec = c.fetchone()
68+
c.close()
69+
return rec
7270

7371

7472
def parse_annotation(rec) -> dict:

bakta/psc.py

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import subprocess as sp
3-
import sqlite3
43

54
from concurrent.futures import ThreadPoolExecutor
65
from typing import Sequence, Tuple
76

87
import bakta.config as cfg
98
import bakta.constants as bc
109
import bakta.features.orf as orf
10+
import bakta.utils as bu
1111

1212

1313
############################################################################
@@ -108,25 +108,21 @@ def lookup(features: Sequence[dict], pseudo: bool = False):
108108
no_psc_lookups = 0
109109
try:
110110
rec_futures = []
111-
with sqlite3.connect(f"file:{cfg.db_path.joinpath('bakta.db')}?mode=ro&nolock=1&cache=shared", uri=True, check_same_thread=False) as conn:
112-
conn.execute('PRAGMA omit_readlock;')
113-
conn.row_factory = sqlite3.Row
114-
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
115-
for feature in features:
116-
uniref90_id = None
117-
if(pseudo): # if pseudogene use pseudogene info
118-
uniref90_id = feature[bc.PSEUDOGENE]['inference'][DB_PSC_COL_UNIREF90]
119-
else:
120-
if('psc' in feature):
121-
uniref90_id = feature['psc'].get(DB_PSC_COL_UNIREF90, None)
122-
elif('ips' in feature):
123-
uniref90_id = feature['ips'].get(DB_PSC_COL_UNIREF90, None)
124-
125-
if(uniref90_id is not None):
126-
if(bc.DB_PREFIX_UNIREF_90 in uniref90_id):
127-
uniref90_id = uniref90_id[9:] # remove 'UniRef90_' prefix
128-
future = tpe.submit(fetch_db_psc_result, conn, uniref90_id)
129-
rec_futures.append((feature, future))
111+
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
112+
for feature in features:
113+
uniref90_id = None
114+
if(pseudo): # if pseudogene use pseudogene info
115+
uniref90_id = feature[bc.PSEUDOGENE]['inference'][DB_PSC_COL_UNIREF90]
116+
else:
117+
if('psc' in feature):
118+
uniref90_id = feature['psc'].get(DB_PSC_COL_UNIREF90, None)
119+
elif('ips' in feature):
120+
uniref90_id = feature['ips'].get(DB_PSC_COL_UNIREF90, None)
121+
if(uniref90_id is not None):
122+
if(bc.DB_PREFIX_UNIREF_90 in uniref90_id):
123+
uniref90_id = uniref90_id[9:] # remove 'UniRef90_' prefix
124+
future = tpe.submit(fetch_db_psc_result, uniref90_id)
125+
rec_futures.append((feature, future))
130126

131127
for (feature, future) in rec_futures:
132128
rec = future.result()
@@ -153,12 +149,13 @@ def lookup(features: Sequence[dict], pseudo: bool = False):
153149
log.info('looked-up=%i', no_psc_lookups)
154150

155151

156-
def fetch_db_psc_result(conn: sqlite3.Connection, uniref90_id: str):
157-
c = conn.cursor()
158-
c.execute('select * from psc where uniref90_id=?', (uniref90_id,))
159-
rec = c.fetchone()
160-
c.close()
161-
return rec
152+
def fetch_db_psc_result(uniref90_id: str):
153+
with bu.get_db_connection() as conn:
154+
c = conn.cursor()
155+
c.execute('select * from psc where uniref90_id=?', (uniref90_id,))
156+
rec = c.fetchone()
157+
c.close()
158+
return rec
162159

163160

164161
def parse_annotation(rec) -> dict:

bakta/pscc.py

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import subprocess as sp
3-
import sqlite3
43

54
from concurrent.futures import ThreadPoolExecutor
65
from typing import Sequence, Tuple
76

87
import bakta.config as cfg
98
import bakta.constants as bc
109
import bakta.features.orf as orf
10+
import bakta.utils as bu
1111

1212

1313
############################################################################
@@ -96,25 +96,22 @@ def lookup(features: Sequence[dict], pseudo: bool = False):
9696
no_pscc_lookups = 0
9797
try:
9898
rec_futures = []
99-
with sqlite3.connect(f"file:{cfg.db_path.joinpath('bakta.db')}?mode=ro&nolock=1&cache=shared", uri=True, check_same_thread=False) as conn:
100-
conn.execute('PRAGMA omit_readlock;')
101-
conn.row_factory = sqlite3.Row
102-
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
103-
for feature in features:
104-
uniref50_id = None
105-
if(pseudo): # if pseudogene use pseudogene info
106-
if('psc' in feature[bc.PSEUDOGENE]):
107-
uniref50_id = feature[bc.PSEUDOGENE]['psc'].get(DB_PSCC_COL_UNIREF50, None)
108-
else:
109-
if('psc' in feature):
110-
uniref50_id = feature['psc'].get(DB_PSCC_COL_UNIREF50, None)
111-
elif('pscc' in feature):
112-
uniref50_id = feature['pscc'].get(DB_PSCC_COL_UNIREF50, None)
113-
if(uniref50_id is not None):
114-
if(bc.DB_PREFIX_UNIREF_50 in uniref50_id):
115-
uniref50_id = uniref50_id[9:] # remove 'UniRef50_' prefix
116-
future = tpe.submit(fetch_db_pscc_result, conn, uniref50_id)
117-
rec_futures.append((feature, future))
99+
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
100+
for feature in features:
101+
uniref50_id = None
102+
if(pseudo): # if pseudogene use pseudogene info
103+
if('psc' in feature[bc.PSEUDOGENE]):
104+
uniref50_id = feature[bc.PSEUDOGENE]['psc'].get(DB_PSCC_COL_UNIREF50, None)
105+
else:
106+
if('psc' in feature):
107+
uniref50_id = feature['psc'].get(DB_PSCC_COL_UNIREF50, None)
108+
elif('pscc' in feature):
109+
uniref50_id = feature['pscc'].get(DB_PSCC_COL_UNIREF50, None)
110+
if(uniref50_id is not None):
111+
if(bc.DB_PREFIX_UNIREF_50 in uniref50_id):
112+
uniref50_id = uniref50_id[9:] # remove 'UniRef50_' prefix
113+
future = tpe.submit(fetch_db_pscc_result, uniref50_id)
114+
rec_futures.append((feature, future))
118115

119116
for (feature, future) in rec_futures:
120117
rec = future.result()
@@ -140,12 +137,13 @@ def lookup(features: Sequence[dict], pseudo: bool = False):
140137
log.info('looked-up=%i', no_pscc_lookups)
141138

142139

143-
def fetch_db_pscc_result(conn: sqlite3.Connection, uniref50_id: str):
144-
c = conn.cursor()
145-
c.execute('select * from pscc where uniref50_id=?', (uniref50_id,))
146-
rec = c.fetchone()
147-
c.close()
148-
return rec
140+
def fetch_db_pscc_result(uniref50_id: str):
141+
with bu.get_db_connection() as conn:
142+
c = conn.cursor()
143+
c.execute('select * from pscc where uniref50_id=?', (uniref50_id,))
144+
rec = c.fetchone()
145+
c.close()
146+
return rec
149147

150148

151149
def parse_annotation(rec) -> dict:

bakta/ups.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import logging
2-
import sqlite3
32

43
from concurrent.futures import ThreadPoolExecutor
54
from typing import Sequence
65

76
import bakta.config as cfg
87
import bakta.constants as bc
8+
import bakta.utils as bu
99

1010

1111
############################################################################
@@ -27,16 +27,13 @@ def lookup(features: Sequence[dict]):
2727
features_found = []
2828
features_not_found = []
2929
rec_futures = []
30-
with sqlite3.connect(f"file:{cfg.db_path.joinpath('bakta.db')}?mode=ro&nolock=1&cache=shared", uri=True, check_same_thread=False) as conn:
31-
conn.execute('PRAGMA omit_readlock;')
32-
conn.row_factory = sqlite3.Row
33-
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
34-
for feature in features:
35-
if('truncated' not in feature): # skip truncated CDS
36-
future = tpe.submit(fetch_db_ups_result, conn, feature)
37-
rec_futures.append((feature, future))
38-
else:
39-
features_not_found.append(feature)
30+
with ThreadPoolExecutor(max_workers=max(10, cfg.threads)) as tpe: # use min 10 threads for IO bound non-CPU lookups
31+
for feature in features:
32+
if('truncated' not in feature): # skip truncated CDS
33+
future = tpe.submit(fetch_db_ups_result, feature)
34+
rec_futures.append((feature, future))
35+
else:
36+
features_not_found.append(feature)
4037

4138
for (feature, future) in rec_futures:
4239
rec = future.result()
@@ -58,13 +55,13 @@ def lookup(features: Sequence[dict]):
5855
raise Exception("SQL error!", ex)
5956

6057

61-
def fetch_db_ups_result(conn: sqlite3.Connection, feature: dict):
62-
c = conn.cursor()
63-
print(f"Type: {type(feature['aa_digest'])}, Value: {feature['aa_digest']}")
64-
c.execute('select * from ups where hash=?', (feature['aa_digest'],))
65-
rec = c.fetchone()
66-
c.close()
67-
return rec
58+
def fetch_db_ups_result(feature: dict):
59+
with bu.get_db_connection() as conn:
60+
c = conn.cursor()
61+
c.execute('select * from ups where hash=?', (feature['aa_digest'],))
62+
rec = c.fetchone()
63+
c.close()
64+
return rec
6865

6966

7067
def parse_annotation(rec: dict) -> dict:

bakta/utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import shutil
1010
import sys
1111
import subprocess as sp
12+
import sqlite3
13+
import threading
1214

1315
from argparse import Namespace
1416
from datetime import datetime
@@ -50,6 +52,9 @@ def print_version(self):
5052
DEPENDENCY_PYCIRCLIZE = (Version(1, 7, 0), Version(VERSION_MAX_DIGIT, VERSION_MAX_DIGIT, VERSION_MAX_DIGIT), VERSION_REGEX, 'pyCirclize', (sys.executable, '-c', 'import pycirclize; print(pycirclize.__version__)'), ['--skip-plot'])
5153

5254

55+
_local = threading.local() # # init thread-local storage for DB connections
56+
57+
5358
def init_parser(sub_command: str=''):
5459
parser = argparse.ArgumentParser(
5560
prog=f'bakta{sub_command}',
@@ -507,3 +512,23 @@ def extract_feature_sequence(feature: dict, sequence: dict) -> str:
507512
if(feature['strand'] == bc.STRAND_REVERSE):
508513
nt = str(Seq(nt).reverse_complement())
509514
return nt
515+
516+
517+
def get_db_connection():
518+
if not hasattr(_local, "connection"):
519+
uri_path = cfg.db_path.joinpath('bakta.db').absolute().resolve().as_posix()
520+
db_uri = f"file:{uri_path}?immutable=1&mode=ro"
521+
# immutable=1: disables locking/coordination for NAS speed
522+
# mode=ro: explicitly read-only
523+
524+
conn = sqlite3.connect(db_uri, uri=True, check_same_thread=False)
525+
526+
# performance tuning
527+
conn.execute('PRAGMA journal_mode = OFF')
528+
conn.execute('PRAGMA cache_size = 4000') # 4000 * 4Kb ~ 16Mb cache per thread
529+
conn.execute(f'PRAGMA mmap_size = {1 * 1024 * 1024 * 1024}') # 1Gb memory mapping
530+
conn.row_factory = sqlite3.Row
531+
532+
_local.connection = conn
533+
534+
return _local.connection

0 commit comments

Comments
 (0)