-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathbeir.py
More file actions
155 lines (126 loc) · 4.34 KB
/
beir.py
File metadata and controls
155 lines (126 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import csv
import zipfile
from collections.abc import AsyncIterator
from pathlib import Path
import httpx
import msgspec
import rich.progress
from vechord.embedding import GeminiDenseEmbedding
from vechord.evaluate import BaseEvaluator
from vechord.registry import VechordRegistry
from vechord.spec import Table, Vector
BASE_URL = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip"
DEFAULT_DATASET = "scifact"
TOP_K = 10
emb = GeminiDenseEmbedding()
DenseVector = Vector[3072]
def download_dataset(dataset: str, output: Path):
output.mkdir(parents=True, exist_ok=True)
zip = output / f"{dataset}.zip"
if not zip.is_file():
with (
zip.open("wb") as f,
httpx.stream("GET", BASE_URL.format(dataset)) as stream,
):
total = int(stream.headers["Content-Length"])
with rich.progress.Progress(
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.BarColumn(bar_width=None),
rich.progress.DownloadColumn(),
rich.progress.TransferSpeedColumn(),
) as progress:
download_task = progress.add_task("Download", total=total)
for chunk in stream.iter_bytes():
f.write(chunk)
progress.update(
download_task, completed=stream.num_bytes_downloaded
)
unzip_dir = output / dataset
if not unzip_dir.is_dir():
with zipfile.ZipFile(zip, "r") as f:
f.extractall(output)
return unzip_dir
class Corpus(Table):
uid: str
text: str
title: str
vector: DenseVector
class Query(Table):
uid: str
cid: str
text: str
vector: DenseVector
class Evaluation(msgspec.Struct):
map: float
ndcg: float
recall: float
vr = VechordRegistry(
DEFAULT_DATASET,
"postgresql://postgres:postgres@172.17.0.1:5432/",
tables=[Corpus, Query],
)
@vr.inject(output=Corpus)
async def load_corpus(dataset: str, output: Path) -> AsyncIterator[Corpus]:
file = output / dataset / "corpus.jsonl"
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
title = item.get("title", "")
text = item.get("text", "")
try:
vector = await emb.vectorize_chunk(f"{title}\n{text}")
except Exception as e:
print(f"failed to vectorize {title}: {e}")
continue
yield Corpus(
uid=item["_id"],
text=text,
title=title,
vector=DenseVector(vector),
)
@vr.inject(output=Query)
async def load_query(dataset: str, output: Path) -> AsyncIterator[Query]:
file = output / dataset / "queries.jsonl"
truth = output / dataset / "qrels" / "test.tsv"
table = {}
with open(truth, "r") as f:
reader = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
next(reader) # skip header
for row in reader:
table[row[0]] = row[1]
decoder = msgspec.json.Decoder()
with file.open("r") as f:
for line in f:
item = decoder.decode(line)
uid = item["_id"]
if uid not in table:
continue
text = item.get("text", "")
yield Query(
uid=uid,
cid=table[uid],
text=text,
vector=DenseVector(await emb.vectorize_query(text)),
)
@vr.inject(input=Query)
async def evaluate(cid: str, vector: DenseVector) -> Evaluation:
docs = await vr.search_by_vector(Corpus, vector, topk=TOP_K)
score = BaseEvaluator.evaluate_one(cid, [doc.uid for doc in docs])
return Evaluation(
map=score.get("map"),
ndcg=score.get("ndcg"),
recall=score.get(f"recall_{TOP_K}"),
)
async def main():
save_dir = Path("datasets")
download_dataset(DEFAULT_DATASET, save_dir)
async with vr, emb:
await load_corpus(DEFAULT_DATASET, save_dir)
await load_query(DEFAULT_DATASET, save_dir)
res: list[Evaluation] = await evaluate()
print("ndcg", sum(r.ndcg for r in res) / len(res))
print("recall@10", sum(r.recall for r in res) / len(res))
if __name__ == "__main__":
import asyncio
asyncio.run(main())