Skip to content

Commit 25e0903

Browse files
committed
add test for compound database and mappers
1 parent 88ef8a9 commit 25e0903

File tree

1 file changed

+211
-0
lines changed

1 file changed

+211
-0
lines changed

tests/test_compound_database.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
import sqlite3
2+
from pathlib import Path
3+
import numpy as np
4+
import pandas as pd
5+
import pytest
6+
7+
from ms2query.compound_database import (
8+
CompoundDatabase,
9+
SpecToCompoundMap,
10+
map_from_spectraldb_metadata,
11+
get_unique_compounds_from_spectraldb,
12+
compute_fingerprints,
13+
inchikey14_from_full,
14+
)
15+
16+
# -------------------------
17+
# Helpers
18+
# -------------------------
19+
20+
def make_tmp_db(tmp_path: Path, name: str = "test.sqlite") -> str:
21+
p = tmp_path / name
22+
if p.exists():
23+
p.unlink()
24+
return str(p)
25+
26+
# Some example InChIKeys
27+
IK_FULL_1 = "BSYNRYMUTXBXSQ-UHFFFAOYSA-N" # glucose
28+
IK_FULL_2 = "BSYNRYMUTXBXSQ-UHFFFAOYSA-O" # same first14, different suffix (stereo/isotope)
29+
IK_FULL_3 = "BQJCRHHNABKAKU-KBQPJGBKSA-N" # ethanol
30+
IK14_1 = "BSYNRYMUTXBXSQ"
31+
IK14_3 = "BQJCRHHNABKAKU"
32+
33+
# -------------------------
34+
# Tests: low-level utilities
35+
# -------------------------
36+
37+
def test_inchikey14():
38+
assert inchikey14_from_full(IK_FULL_1) == IK14_1
39+
assert inchikey14_from_full("bsynrymutxbxsq-uhfffaoysa-n") == IK14_1
40+
assert inchikey14_from_full("BQJCRHHNABKAKU-KBQPJGBKSA-N") == IK14_3
41+
assert inchikey14_from_full("SHORT") is None # too short
42+
43+
def test_compute_fingerprints_placeholder():
44+
fp = compute_fingerprints("C(CO)O", None)
45+
assert isinstance(fp, np.ndarray)
46+
assert fp.dtype == np.uint8
47+
np.testing.assert_array_equal(fp, np.array([0, 1, 0, 1], dtype=np.uint8))
48+
49+
# -------------------------
50+
# Tests: CompoundDatabase
51+
# -------------------------
52+
53+
def test_compound_upsert_and_get(tmp_path):
54+
db_path = make_tmp_db(tmp_path)
55+
cdb = CompoundDatabase(db_path)
56+
57+
# Upsert a compound
58+
cid = cdb.upsert_compound(
59+
smiles="C(CO)O",
60+
inchi="InChI=1S/C2H6O/c1-2-3/h3H,2H2,1H3",
61+
inchikey=IK_FULL_3,
62+
classyfire_class="Alcohols",
63+
classyfire_superclass="Organic compounds",
64+
)
65+
assert cid == IK14_3
66+
67+
row = cdb.get_compound(cid)
68+
assert row is not None
69+
assert row["inchikey"] == IK_FULL_3
70+
assert isinstance(row["fingerprint"], np.ndarray)
71+
np.testing.assert_array_equal(row["fingerprint"], np.array([0,1,0,1], dtype=np.uint8))
72+
73+
# Upsert another with the same comp_id (different full IK) -> should overwrite row cleanly
74+
cid2 = cdb.upsert_compound(
75+
smiles="C6H12O6",
76+
inchi=None,
77+
inchikey=IK_FULL_1,
78+
classyfire_class="Carbohydrates",
79+
classyfire_superclass="Organic compounds",
80+
)
81+
assert cid2 == IK14_1
82+
row2 = cdb.get_compound(IK14_1)
83+
assert row2["inchikey"] == IK_FULL_1
84+
85+
cdb.close()
86+
87+
def test_compound_upsert_many(tmp_path):
88+
db_path = make_tmp_db(tmp_path)
89+
cdb = CompoundDatabase(db_path)
90+
91+
# Insert two rows that collapse to the same comp_id (same first14), newest should win
92+
comp_ids = cdb.upsert_many([
93+
{"smiles": "X", "inchi": None, "inchikey": IK_FULL_1, "classyfire_class": "A"},
94+
{"smiles": "Y", "inchi": None, "inchikey": IK_FULL_2, "classyfire_class": "B"},
95+
{"smiles": "Z", "inchi": None, "inchikey": IK_FULL_3, "classyfire_class": "C"},
96+
])
97+
assert set(comp_ids) == {IK14_1, IK14_3}
98+
99+
row = cdb.get_compound(IK14_1)
100+
# After ON CONFLICT(comp_id) UPDATE, row reflects last data for that comp_id
101+
assert row["smiles"] in {"X", "Y"} # depends on order; both acceptable here
102+
assert row["inchikey"] in {IK_FULL_1, IK_FULL_2}
103+
assert row["classyfire_class"] in {"A", "B"}
104+
105+
count = cdb.sql_query("SELECT COUNT(*) as n FROM compounds")["n"].iloc[0]
106+
assert count == 2
107+
108+
cdb.close()
109+
110+
# -------------------------
111+
# Tests: SpecToCompoundMap + integration
112+
# -------------------------
113+
114+
def create_min_spectral_table(sqlite_path: str, rows):
115+
"""Create a minimal spectra table (spec_id, inchikey) and insert rows."""
116+
con = sqlite3.connect(sqlite_path)
117+
cur = con.cursor()
118+
cur.executescript("""
119+
PRAGMA journal_mode=WAL;
120+
CREATE TABLE IF NOT EXISTS spectra(
121+
spec_id INTEGER PRIMARY KEY AUTOINCREMENT,
122+
inchikey TEXT
123+
);
124+
""")
125+
cur.executemany("INSERT INTO spectra(inchikey) VALUES (?)", [(r,) for r in rows])
126+
con.commit()
127+
con.close()
128+
129+
def test_mapping_and_compound_creation(tmp_path):
130+
db_path = make_tmp_db(tmp_path)
131+
132+
# Create minimal spectra table (3 rows; one is NULL inchikey)
133+
create_min_spectral_table(db_path, [IK_FULL_1, IK_FULL_2, None])
134+
135+
# Run mapping (same db hosts compounds + mapping)
136+
n_mapped, n_new = map_from_spectraldb_metadata(db_path)
137+
assert n_mapped == 2 # two spectra had inchikeys
138+
assert n_new == 1 or n_new == 2 # depending on upsert collapsing; at least one unique comp
139+
140+
# Validate mapping contents
141+
mapper = SpecToCompoundMap(db_path)
142+
df_map = mapper.get_comp_id_for_specs([1, 2, 3])
143+
assert set(df_map.columns) == {"spec_id", "comp_id"}
144+
# spec_id 3 has no inchikey -> may be missing
145+
assert set(df_map["spec_id"]) <= {1, 2, 3}
146+
# comp_ids are 14 chars
147+
assert all(len(c) == 14 for c in df_map["comp_id"])
148+
mapper.close()
149+
150+
# Validate compounds exist
151+
cdb = CompoundDatabase(db_path)
152+
dfc = cdb.sql_query("SELECT comp_id, inchikey FROM compounds")
153+
assert not dfc.empty
154+
assert all(len(cid) == 14 for cid in dfc["comp_id"])
155+
cdb.close()
156+
157+
def test_mapper_link_and_get(tmp_path):
158+
db_path = make_tmp_db(tmp_path)
159+
160+
# need compounds table for FK-like behavior not enforced; mapping works independently
161+
cdb = CompoundDatabase(db_path)
162+
cdb.upsert_compound(inchikey=IK_FULL_1) # ensure a compound exists
163+
cdb.close()
164+
165+
mapper = SpecToCompoundMap(db_path)
166+
mapper.link(123, IK14_1)
167+
mapper.link_many([(124, IK14_1), (125, IK14_1)])
168+
169+
ids = mapper.get_specs_for_comp(IK14_1)
170+
assert set(ids) == {123, 124, 125}
171+
172+
df = mapper.get_comp_id_for_specs([122, 123, 124, 125])
173+
assert set(df.columns) == {"spec_id", "comp_id"}
174+
assert set(df["spec_id"]) == {123, 124, 125}
175+
176+
mapper.close()
177+
178+
# -------------------------
179+
# Tests: get_unique_compounds_from_spectraldb
180+
# -------------------------
181+
182+
def test_get_unique_compounds_basic(tmp_path):
183+
db_path = make_tmp_db(tmp_path)
184+
# spectra: two with same IK14, one different, one NULL
185+
create_min_spectral_table(db_path, [IK_FULL_1, IK_FULL_2, IK_FULL_3, None])
186+
187+
uniq = get_unique_compounds_from_spectraldb(db_path)
188+
# Expect 2 unique IK14 values
189+
assert list(uniq.columns[:3]) == ["inchikey14", "n_spectra", "inchikey"]
190+
assert set(uniq["inchikey14"]) == {IK14_1, IK14_3}
191+
# Counts: IK14_1 appears twice, IK14_3 once
192+
counts = dict(zip(uniq["inchikey14"], uniq["n_spectra"]))
193+
assert counts[IK14_1] == 2
194+
assert counts[IK14_3] == 1
195+
196+
def test_get_unique_compounds_with_external_merge(tmp_path):
197+
db_path = make_tmp_db(tmp_path)
198+
create_min_spectral_table(db_path, [IK_FULL_1, IK_FULL_3])
199+
200+
external = pd.DataFrame({
201+
"inchikey14": [IK14_1, "NOPE0000000000"],
202+
"my_tag": ["hit", "miss"],
203+
"score": [0.9, 0.1],
204+
})
205+
uniq = get_unique_compounds_from_spectraldb(db_path, external_meta=external)
206+
# Should have merged columns
207+
assert "my_tag" in uniq.columns and "score" in uniq.columns
208+
# Only IK14_1 should have my_tag filled
209+
row = uniq.loc[uniq["inchikey14"] == IK14_1].iloc[0]
210+
assert row["my_tag"] == "hit"
211+
assert pytest.approx(row["score"]) == 0.9

0 commit comments

Comments
 (0)