Skip to content

Commit fab147b

Browse files
committed
update async_add method to follow pinecone-client example
1 parent 084a95b commit fab147b

File tree

1 file changed

+38
-18
lines changed

1 file changed

+38
-18
lines changed

llama_index/vector_stores/pinecone.py

+38-18
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
DEFAULT_BATCH_SIZE = 200
3535

36+
SEM_MAX_CONCURRENT = 10
37+
3638
_logger = logging.getLogger(__name__)
3739

3840

@@ -98,6 +100,14 @@ def _to_pinecone_filter(standard_filters: MetadataFilters) -> dict:
98100
return filters
99101

100102

103+
async def async_upload(index, vectors, batch_size, semaphore):
104+
async def send_batch(batch):
105+
async with semaphore:
106+
return await asyncio.to_thread(index.upsert, batch, async_req=True)
107+
108+
await asyncio.gather(*[send_batch(chunk) for chunk in iter_batch(vectors, size=batch_size)])
109+
110+
101111
import_err_msg = (
102112
"`pinecone` package not found, please run `pip install pinecone-client`"
103113
)
@@ -224,39 +234,42 @@ def from_params(
224234
def class_name(cls) -> str:
225235
return "PinconeVectorStore"
226236

227-
def add(
228-
self,
229-
nodes: List[BaseNode],
230-
) -> List[str]:
231-
"""Add nodes to index.
232-
233-
Args:
234-
nodes: List[BaseNode]: list of nodes with embeddings
235-
236-
"""
237-
ids = []
237+
def _prepare_entries_for_upsert(self, nodes: List[BaseNode]) -> List[Dict]:
238238
entries = []
239239
for node in nodes:
240-
node_id = node.node_id
241-
242240
metadata = node_to_metadata_dict(
243241
node, remove_text=False, flat_metadata=self.flat_metadata
244242
)
245243

246244
entry = {
247-
ID_KEY: node_id,
245+
ID_KEY: node.node_id,
248246
VECTOR_KEY: node.get_embedding(),
249247
METADATA_KEY: metadata,
250248
}
251-
if self.add_sparse_vector and self._tokenizer is not None:
249+
250+
if self.add_sparse_vector:
252251
sparse_vector = generate_sparse_vectors(
253252
[node.get_content(metadata_mode=MetadataMode.EMBED)],
254253
self._tokenizer,
255254
)[0]
256255
entry[SPARSE_VECTOR_KEY] = sparse_vector
257256

258-
ids.append(node_id)
259257
entries.append(entry)
258+
259+
return entries
260+
261+
def add(
262+
self,
263+
nodes: List[BaseNode],
264+
) -> List[str]:
265+
"""Add nodes to index.
266+
267+
Args:
268+
nodes: List[BaseNode]: list of nodes with embeddings
269+
270+
"""
271+
272+
entries = self._prepare_entries_for_upsert(nodes)
260273

261274
[
262275
self._pinecone_index.upsert(
@@ -266,7 +279,7 @@ def add(
266279
for batch in iter_batch(entries, self.batch_size)
267280
]
268281

269-
return ids
282+
return [entry[ID_KEY] for entry in entries]
270283

271284
async def async_add(
272285
self,
@@ -280,7 +293,14 @@ async def async_add(
280293
Returns:
281294
List[str]: List of IDs of the added documents.
282295
"""
283-
return await asyncio.to_thread(self.add, nodes) # type: ignore
296+
297+
entries = self._prepare_entries_for_upsert(nodes)
298+
299+
semaphore = asyncio.Semaphore(SEM_MAX_CONCURRENT)
300+
await async_upload(self._pinecone_index, entries, DEFAULT_BATCH_SIZE, semaphore)
301+
302+
return [entry[ID_KEY] for entry in entries]
303+
284304

285305
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
286306
"""

0 commit comments

Comments
 (0)