-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathquery.py
More file actions
202 lines (154 loc) · 6.8 KB
/
query.py
File metadata and controls
202 lines (154 loc) · 6.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import re
from sqlalchemy import text
from .locus import Locus, parse_region_string
from .reader import RecordReader, RecordSource
from .s3 import list_objects
def fetch(config, engine, index, qss, restricted=None):
"""
Run multiple queries in parallel and chain the readers returned
into a single reader.
"""
if len(qss[0]) != index.schema.arity:
raise ValueError(f'Arity mismatch for index schema "{index.schema}"')
return _run_queries(config, engine, index, qss, restricted)
def fetch_all(config, index, restricted=None, key_limit=None):
"""
Scans for all the S3 files in the schema and creates a dummy cursor
to read all the records from all the files. Returns a RecordReader
of the results.
"""
s3_objects = list_objects(config.s3_bucket, config.s3_path(index.s3_prefix), max_keys=key_limit)
# arbitrarily limit the number of keys
if key_limit:
s3_objects = [o[1] for o in zip(range(key_limit), s3_objects)]
# create a RecordSource for each object
sources = [RecordSource.from_s3_object(obj, record_filter=None) for obj in s3_objects]
# create the reader object, begin reading the records
return RecordReader(config, sources, index, restricted=restricted)
def fetch_keys(engine, index, columns, restricted=None, key_limit=None):
"""
Fetch all unique keys from schema index (e.g. to fill a self-updating dropdown menu)
"""
if not index.built:
raise ValueError(f'Index "{index.name}" is not built')
if columns is None:
column_name_str = ', '.join([f'`{col}`' for col in index.schema.key_columns])
else:
column_name_str = ', '.join([f'`{col}`' for col in index.schema.key_columns if col in columns])
sql = (
f'SELECT DISTINCT {column_name_str} '
f'FROM `{index.table}`'
)
with engine.connect() as conn:
cursor = conn.execute(text(sql))
return [list(row) for row in cursor.fetchall()]
def count(config, engine, index, q):
"""
Estimate the number of records that will be returned by a query.
"""
reader = fetch_all(config, index) if len(q) == 0 else _run_queries(config, engine, index, [q], None)
# read a couple hundred records to get the total bytes read
records = list(zip(range(500), reader.records))
# if less than N records read, that's how many there are exactly
if reader.at_end:
return reader.count
# get the % of bytes read to estimate the total number of records
return int(len(records) * reader.bytes_total / reader.bytes_read)
def match(config, engine, index, q):
"""
Returns a subset of unique keys that match the query.
If the final column being indexed is a locus, it is an error and
no keys will be returned.
"""
if not 0 < len(q) <= len(index.schema.key_columns):
raise ValueError(f'Too few/many keys for index schema "{index.schema}"')
# ensure the index is built
if not index.built:
raise ValueError(f'Index "{index.name}" is not built')
# which column will be returned?
distinct_column = index.schema.key_columns[len(q) - 1]
# exact query parameters and match parameter
tests = [f'`{k}` = :{k}' for k in index.schema.key_columns[:len(q) - 1]]
# append the matching query
tests.append(f'`{distinct_column}` LIKE :{distinct_column}')
# build the SQL statement
sql = (
f'SELECT `{distinct_column}` FROM `{index.table}` '
f'USE INDEX (`schema_idx`) '
)
# add match conditionals
if len(tests) > 0:
sql += f'WHERE {" AND ".join(tests)} '
# create the match pattern
pattern = '%' if q[-1] in ['_', '*'] else re.sub(r'_|%|$', lambda m: f'%{m.group(0)}', q[-1])
prev_key = None
# fetch all the results
with engine.connect() as conn:
params = dict(zip(index.schema.key_columns, q[:-1]))
params.update({distinct_column: pattern})
cursor = conn.execution_options(stream_results=True).execute(text(sql), params)
# yield all the results until no more matches
for r in cursor:
if r[0] != prev_key:
yield r[0]
# don't return this key again
prev_key = r[0]
def _run_queries(config, engine, index, qss, restricted):
sources = _get_sources(config, engine, index, qss)
return RecordReader(
config,
sources,
index,
restricted=restricted,
)
def _get_sources(config, engine, index, qss):
"""
Construct a SQL query to fetch S3 objects and byte offsets. Run it and
return a list of RecordSources to the results.
"""
sources = []
for q in qss:
record_filter = None
# validate the index
if not index.built:
raise ValueError(f'Index "{index.name}" is not built')
# build the query
sql = (
f'SELECT `__Keys`.`key`, MIN(`start_offset`), MAX(`end_offset`) '
f'FROM `{index.table}` '
f'INNER JOIN `__Keys` '
f'ON `__Keys`.`id` = `{index.table}`.`key` '
f'WHERE {index.schema.sql_filters} '
f'GROUP BY `key` '
f'ORDER BY `key` ASC'
)
# query parameter list
query_params = q
escaped_column_names = [col.replace("|", "_") for col in index.schema.schema_columns]
# if the schema has a locus, parse the query parameter
if index.schema.has_locus:
if index.schema.locus_is_template:
chromosome, start, stop = index.schema.locus_class(q[-1]).region()
else:
chromosome, start, stop = parse_region_string(q[-1], config)
# positions are stepped, and need to be between stepped ranges
step_start = (start // Locus.LOCUS_STEP) * Locus.LOCUS_STEP
step_stop = (stop // Locus.LOCUS_STEP) * Locus.LOCUS_STEP
# replace the last query parameter with the locus
query_params = dict(zip(escaped_column_names, q[:-1]))
query_params.update({"chromosome": chromosome, "start_pos": step_start, "end_pos": step_stop})
# match templated locus or overlapping loci
def overlaps(row):
if index.schema.locus_is_template:
return row[index.schema.locus_columns[0]] == q[-1]
return index.schema.locus_of_row(row).overlaps(chromosome, start, stop)
# filter records read by locus
record_filter = overlaps
with engine.connect() as conn:
if isinstance(query_params, list):
query_params = dict(zip(escaped_column_names, query_params))
cursor = conn.execute(text(sql), query_params)
rows = cursor.fetchall()
# create a RecordSource for each entry in the database
sources += [RecordSource(*row, record_filter) for row in rows]
return sources