-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcorpus_indexing.py
88 lines (70 loc) · 2.97 KB
/
corpus_indexing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import asyncio
import copy
from itertools import islice
from typing import Dict, Generator, Tuple
from corpus import WikipediaCorpus
from embeddings import embeddings_loader
from opensearch_indexer import OpenSearchIndexer
def document_source(corpus, embeddings, max_items) -> Generator[Tuple[str, Dict[str, any]], None, None]:
for (id, doc), embedding in islice(zip(corpus.iterator(), embeddings), max_items):
opensearch_document = copy.copy(doc)
opensearch_document['text_knn_lucene'] = embedding.tolist()
yield str(id), opensearch_document
async def index_wikipedia(opensearch_host: str, opensearch_port: int, index_name: str, index_mapping: Dict[str, any],
document_source, total_items: int, index_shards: int, index_replicas: int, batch_size: int, parallelism: int):
os = OpenSearchIndexer(opensearch_host, opensearch_port, index_name)
await os.create_index(shards=index_shards, replicas=index_replicas, mappings=index_mapping)
await os.index(document_source, total_items, batch_size=batch_size, parallelism=parallelism)
await os.close()
# The Corpus to use
CORPUS = WikipediaCorpus()
# the location of pre-generated embeddings for the corpus
EMBEDDINGS_FILES = "embeddings/wikipedia_embeddings_{i}.npy"
# OpenSearch coordinates
OPENSEARCH_HOST = "localhost"
OPENSEARCH_PORT = 9200
# The index settings, currently only shard and replicas
INDEX_SHARDS = 1
INDEX_REPLICAS = 0
# Number of documents to send in one batch
BATCH_SIZE = 1000
# Number of batches to send in parallel
INDEXING_PARALLELISM = 4
MAX_ITEMS = CORPUS.num_entries()
# MAX_ITEMS = int(CORPUS.num_entries()/10)
# MAX_ITEMS = int(CORPUS.num_entries()/100)
INDEX = "wikipedia_en_100pct"
# INDEX = "wikipedia_en_10pct"
# INDEX = "wikipedia_en_1pct"
INDEX_MAPPING = {
"mappings": {
"properties": {
"title": {
"type": "text"
},
"text": {
"type": "text"
},
"text_knn_lucene": {
"type": "knn_vector",
"dimension": 384,
"space_type": "cosinesimil",
"method": {
"name": "hnsw",
"engine": "lucene",
"parameters": {
"ef_construction": 100,
"m": 16
}
}
}
}
}
}
if __name__ == '__main__':
print(f"Indexing {MAX_ITEMS} corpus items to {INDEX}")
source = document_source(corpus=CORPUS, embeddings=embeddings_loader(EMBEDDINGS_FILES), max_items=MAX_ITEMS)
asyncio.run(index_wikipedia(opensearch_host=OPENSEARCH_HOST, opensearch_port=OPENSEARCH_PORT, index_name=INDEX,
index_mapping=INDEX_MAPPING, document_source=source, total_items=MAX_ITEMS,
index_shards=INDEX_SHARDS, index_replicas=INDEX_REPLICAS,
batch_size=BATCH_SIZE, parallelism=INDEXING_PARALLELISM))