Skip to content

Commit 5f3835c

Browse files
authored
Merge pull request #27 from microbiomedata/22-create-a-general-batch-query-function
22 create a general batch query function
2 parents 8d28e75 + 15a3f4e commit 5f3835c

File tree

6 files changed

+147
-6
lines changed

6 files changed

+147
-6
lines changed

nmdc_api_utilities/collection_search.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def get_record_by_id(
175175
results = response.json()
176176
return results
177177

178-
def check_ids_exist(self, ids: list, chunk_size=100) -> bool:
178+
def check_ids_exist(self, ids: list, chunk_size=100, return_missing_ids=False) -> bool:
179179
"""
180180
Check if the IDs exist in the collection.
181181
@@ -187,6 +187,8 @@ def check_ids_exist(self, ids: list, chunk_size=100) -> bool:
187187
A list of IDs to check if they exist in the collection.
188188
chunk_size : int
189189
The number of IDs to check in each query. Default is 100.
190+
return_missing_ids : bool
191+
If True, and if ids are missing in the collection, return the list of IDs that do not exist in the collection. Default is False.
190192
Returns
191193
-------
192194
bool
@@ -201,16 +203,46 @@ def check_ids_exist(self, ids: list, chunk_size=100) -> bool:
201203
# to avoid the maximum URL length limit
202204
ids_test = list(set(ids))
203205
for i in range(0, len(ids_test), chunk_size):
204-
chunk = ids[i:i + chunk_size]
206+
chunk = ids_test[i:i + chunk_size]
205207
filter_dict = {
206208
"id": {"$in": chunk}
207209
}
208210
filter_json_string = json.dumps(filter_dict, separators=(',', ':'))
209211

210212
results = self.get_records(filter=filter_json_string, max_page_size=len(chunk), fields="id")
211-
if len(results) != len(chunk):
212-
raise ValueError(f"IDs not found in collection: {set(chunk) - set([r['id'] for r in results])}")
213+
if len(results) != len(chunk) and return_missing_ids:
214+
missing_ids = list(set(chunk) - set([record["id"] for record in results]))
215+
return False, missing_ids
216+
elif len(results) != len(chunk) and not return_missing_ids:
217+
return False
213218
return True
219+
220+
def get_batch_records(self, id_list: list, search_field:str, chunk_size=100, fields="") -> list:
221+
"""
222+
Get a batch of records from the collection by a list of input IDs. This method is used to identify records that include any of the IDs from the input list, matching the search_field.
223+
This is using the MongoDB filter keyword $in to identify other records that include the input IDs.
224+
params:
225+
id_list: list
226+
A list of IDs to get records for.
227+
search_field: str
228+
The field to search for. This must match a field from the NMDC Schema.
229+
chunk_size: int
230+
The number of IDs to get in each query. Default is 100.
231+
fields: str
232+
The fields to return. Default is all fields.
233+
returns:
234+
list: A list of records.
235+
"""
236+
dp = DataProcessing()
237+
results = []
238+
id_list = list(set(id_list))
239+
chunks = dp.split_list(input_list=id_list, chunk_size=chunk_size)
240+
for chunk in chunks:
241+
chunk = dp._string_mongo_list(data=chunk)
242+
filter = f'{{"{search_field}": {{"$in": {chunk}}}}}'
243+
res = self.get_records(filter=filter, max_page_size=len(chunk), fields=fields, all_pages=True)
244+
results += res
245+
return results
214246

215247

216248
if __name__ == "__main__":

nmdc_api_utilities/data_processing.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,22 @@ def convert_to_df(self, data: list) -> pd.DataFrame:
2424
A list of dictionaries.
2525
"""
2626
return pd.DataFrame(data)
27-
27+
def split_list(self, input_list:list, chunk_size:int=100)->list:
28+
"""
29+
Split a list into chunks of a specified size.
30+
params:
31+
input_list: list
32+
The list to split.
33+
chunk_size: int
34+
The size of each chunk.
35+
returns:
36+
list: A list of lists.
37+
"""
38+
result = []
39+
for i in range(0, len(input_list), chunk_size):
40+
result.append(input_list[i:i + chunk_size])
41+
42+
return result
2843
def rename_columns(self, df: pd.DataFrame, new_col_names: list) -> pd.DataFrame:
2944
"""
3045
Rename columns in a pandas dataframe.
@@ -117,3 +132,23 @@ def build_filter(self, attributes, exact_match=False):
117132
clean = self._string_mongo_list(filter_dict)
118133
logging.debug(f"Filter cleaned: {clean}")
119134
return clean
135+
136+
def extract_field(self, api_results:list, field_name:str) -> list:
137+
"""
138+
This function is used to extract a field from the API results.
139+
params:
140+
api_results: list
141+
A list of dictionaries.
142+
field_name: str
143+
The name of the field to extract.
144+
returns:
145+
list: A list of IDs.
146+
"""
147+
field_list = []
148+
for item in api_results:
149+
if type(item[field_name]) == str:
150+
field_list.append(item[field_name])
151+
elif type(item[field_name]) == list:
152+
for another_item in item[field_name]:
153+
field_list.append(another_item)
154+
return field_list
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from nmdc_api_utilities.data_processing import DataProcessing
2+
from nmdc_api_utilities.data_object_search import DataObjectSearch
3+
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
4+
5+
dos_client = DataObjectSearch()
6+
7+
dp_client = DataProcessing()
8+
9+
# Using the DataObjectSearch class to get records from the DataObject collection. We are looking for records with the attribute 'data_object_type' equal to 'FT ICR-MS Analysis Results'.
10+
# We want to get the first 100 records and we want to include the fields 'id', 'md5_checksum', and 'url' in the results. We also want to get all pages of results.
11+
processed_nom = dos_client.get_record_by_attribute(attribute_name='data_object_type', attribute_value='FT ICR-MS Analysis Results', max_page_size=100, fields='id,md5_checksum,url', all_pages=True)
12+
# clarify names
13+
for dataobject in processed_nom:
14+
dataobject["processed_nom_id"] = dataobject.pop("id")
15+
dataobject["processed_nom_md5_checksum"] = dataobject.pop("md5_checksum")
16+
dataobject["processed_nom_url"] = dataobject.pop("url")
17+
18+
# convert to df
19+
processed_nom_df = dp_client.convert_to_df(processed_nom)
20+
print(processed_nom_df.head())
21+
# Next, we query the WorkflowExecution collection. To do so, we need to create an instance of it
22+
we_client = WorkflowExecutionSearch()
23+
# use utility function to get a list of the ids from processed_nom
24+
result_ids = dp_client.extract_field(processed_nom, "processed_nom_id")
25+
# Using the WorkflowExecutionSearch class to get records from the WorkflowExecution collection. We are looking for records with the attribute 'has_output' equal to the list of ids we got from the previous step.
26+
# We use the get_batch_records method to identify records that include any of the ids from the input list, matching the 'has_output' field.
27+
analysis_dataobj = we_client.get_batch_records(id_list=result_ids, search_field="has_output", fields="id,has_input,has_output", chunk_size=100)
28+
29+
# clarify names
30+
for dataobject in analysis_dataobj:
31+
dataobject["analysis_id"] = dataobject.pop("id")
32+
dataobject["analysis_has_input"] = dataobject.pop("has_input")
33+
dataobject["analysis_has_output"] = dataobject.pop("has_output")
34+
35+
# convert to data frame
36+
analysis_dataobj_df = dp_client.convert_to_df(analysis_dataobj)

nmdc_api_utilities/test/test_collection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def test_check_ids_exist(self):
4747
def test_check_ids_exist_multiple(self):
4848
# simple test to check if the check_ids_exist method returns a boolean
4949
ids = ["nmdc:bsm-11-002vgm56","nmdc:bsm-11-006pnx90","nmdc:bsm-11-00dkyf35","nmdc:bsm-11-00hrxp98","nmdc:bsm-11-00m15h97","nmdc:bsm-11-00yhef97","nmdc:bsm-11-011z7z70","nmdc:bsm-11-0169zs66","nmdc:bsm-11-01bbrr08","nmdc:bsm-11-01f6m423","nmdc:bsm-11-01g9wf51","nmdc:bsm-11-01jah904","nmdc:bsm-11-01teww33","nmdc:bsm-11-01vt2q72","nmdc:bsm-11-024rsd62","nmdc:bsm-11-02kcw433","nmdc:bsm-11-02n85875","nmdc:bsm-11-02v78297","nmdc:bsm-11-02x97z84","nmdc:bsm-11-034x5t48"]
50+
# ids = ['nmdc:bsm-11-002vgm56','nmdc:bsm-11-006pnx90']
5051
collection = CollectionSearch("biosample_set",env=ENV)
5152
results = collection.check_ids_exist(ids)
5253
assert results == True
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from nmdc_api_utilities.data_processing import DataProcessing
2+
from nmdc_api_utilities.data_object_search import DataObjectSearch
3+
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
4+
def test_nom_notebook():
5+
6+
dos_client = DataObjectSearch()
7+
8+
dp_client = DataProcessing()
9+
processed_nom = dos_client.get_record_by_attribute(attribute_name='data_object_type', attribute_value='FT ICR-MS Analysis Results', max_page_size=100, fields='id,md5_checksum,url', all_pages=True)
10+
# clarify names
11+
for dataobject in processed_nom:
12+
dataobject["processed_nom_id"] = dataobject.pop("id")
13+
dataobject["processed_nom_md5_checksum"] = dataobject.pop("md5_checksum")
14+
dataobject["processed_nom_url"] = dataobject.pop("url")
15+
16+
# convert to df
17+
processed_nom_df = dp_client.convert_to_df(processed_nom)
18+
19+
# since we are querying the WorkflowExecution collection, we need to create an instance of it
20+
we_client = WorkflowExecutionSearch()
21+
# use utility function to get a list of the ids from processed_nom
22+
result_ids = dp_client.extract_field(processed_nom, "processed_nom_id")
23+
# get the analysis data objects
24+
analysis_dataobj = we_client.get_batch_records(id_list=result_ids, search_field="has_output", fields="id,has_input,has_output", chunk_size=100)
25+
26+
# clarify names
27+
for dataobject in analysis_dataobj:
28+
dataobject["analysis_id"] = dataobject.pop("id")
29+
dataobject["analysis_has_input"] = dataobject.pop("has_input")
30+
dataobject["analysis_has_output"] = dataobject.pop("has_output")
31+
32+
# convert to data frame
33+
analysis_dataobj_df = dp_client.convert_to_df(analysis_dataobj)
34+
assert analysis_dataobj_df.shape[0] > 2000
35+
test_nom_notebook()

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "nmdc_api_utilities"
7-
version = "0.3.6"
7+
8+
version = "0.3.7"
9+
810
description = "A Python library for general research functions using NMDC APIs"
911
authors = [
1012
{ name = "Olivia Hess", email = "[email protected]" },

0 commit comments

Comments
 (0)