Skip to content

Commit 10f7e1f

Browse files
dmsmith25claude
andcommitted
Add Pinecone bulk import support as alternative to upsert-based population
Introduces bulk import from cloud storage (GCS) using Pinecone's import API, allowing large datasets to be loaded without client-side upserts. Adds BulkImportUser, CLI args (--pinecone_bulk_import, --pinecone_import_uri, --pinecone_import_error_mode), skip_passages optimization to avoid downloading passage data locally, and GCS URIs for all supported workloads. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e5f62e7 commit 10f7e1f

File tree

13 files changed

+501
-33
lines changed

13 files changed

+501
-33
lines changed

vsb/cmdline_args.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,26 @@ def add_vsb_cmdline_args(
316316
default=1,
317317
help="Number of replicas for dedicated read nodes. Default is %(default)s.",
318318
)
319+
pinecone_group.add_argument(
320+
"--pinecone_bulk_import",
321+
action=argparse.BooleanOptionalAction,
322+
default=False,
323+
help="Use Pinecone's bulk import API instead of upsert for population.",
324+
)
325+
pinecone_group.add_argument(
326+
"--pinecone_import_uri",
327+
type=str,
328+
default=None,
329+
help="Cloud storage URI for bulk import (e.g., gs://bucket/path/).",
330+
env_var="VSB__PINECONE_IMPORT_URI",
331+
)
332+
pinecone_group.add_argument(
333+
"--pinecone_import_error_mode",
334+
type=str,
335+
default="abort",
336+
choices=["abort", "continue"],
337+
help="Error handling: 'abort' stops on first error, 'continue' skips failed records.",
338+
)
319339

320340
opensearch_group = parser.add_argument_group(
321341
"Options specific to OpenSearch database"

vsb/databases/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,9 @@ def get_record_count(self) -> int:
110110
Returns the total number of records in the database.
111111
"""
112112
pass
113+
114+
def supports_bulk_import(self) -> bool:
115+
"""
116+
Returns True if this database supports bulk import from cloud storage.
117+
"""
118+
return False

vsb/databases/pinecone/pinecone.py

Lines changed: 131 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import vsb
99
from vsb import logger
10-
from pinecone import PineconeException, NotFoundException, UnauthorizedException
10+
from pinecone import Pinecone, PineconeException, NotFoundException, UnauthorizedException, ImportErrorMode
1111
from pinecone.grpc import PineconeGRPC, GRPCIndex
1212
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, after_log
1313
import grpc.experimental.gevent as grpc_gevent
@@ -175,29 +175,46 @@ def __init__(
175175
self.dedicated_shards = config.get("pinecone_dedicated_shards", 1)
176176
self.dedicated_replicas = config.get("pinecone_dedicated_replicas", 1)
177177

178+
# Bulk import configuration
179+
self.use_bulk_import = config.get("pinecone_bulk_import", False)
180+
self.import_uri = config.get("pinecone_import_uri")
181+
self.import_error_mode = config.get("pinecone_import_error_mode", "abort")
182+
self.active_import_id = None
183+
184+
# When bulk import is enabled, use "passages" namespace by default
185+
# since that's where dataset records are stored in GCS
186+
if self.use_bulk_import and self.namespace == "__default__":
187+
self.namespace = "passages"
188+
logger.info(f"PineconeDB: Bulk import enabled, using namespace 'passages'")
189+
178190
if self.index_name is None:
179191
# None specified, default to "vsb-<workload>"
180192
self.index_name = f"vsb-{name}"
181193
spec = config["pinecone_index_spec"]
182-
try:
183-
self.index = self.pc.Index(name=self.index_name)
184-
self.created_index = False
185-
except UnauthorizedException:
186-
api_key = config["pinecone_api_key"]
187-
masked_api_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:]
188-
logger.critical(
189-
f"PineconeDB: Got UnauthorizedException when attempting to connect "
190-
f"to index '{self.index_name}' using API key '{masked_api_key}' - check "
191-
f"your API key and permissions"
192-
)
193-
raise StopUser()
194-
except NotFoundException:
194+
195+
# When bulk import is enabled and the index already exists, generate
196+
# a unique name by appending -2, -3, etc. so each import creates a
197+
# fresh index.
198+
if self.use_bulk_import:
199+
original_name = self.index_name
200+
suffix = 1
201+
while True:
202+
try:
203+
self.pc.Index(name=self.index_name)
204+
# Index exists - increment suffix and try again
205+
suffix += 1
206+
self.index_name = f"{original_name}-{suffix}"
207+
logger.info(
208+
f"PineconeDB: Index '{original_name if suffix == 2 else original_name + '-' + str(suffix - 1)}' "
209+
f"already exists, trying '{self.index_name}'"
210+
)
211+
except (NotFoundException, UnauthorizedException):
212+
# Name is available
213+
break
214+
195215
logger.info(
196-
f"PineconeDB: Specified index '{self.index_name}' was not found, or the "
197-
f"specified API key cannot access it. Creating new index '{self.index_name}'."
216+
f"PineconeDB: Creating new index '{self.index_name}' for bulk import"
198217
)
199-
200-
# Use REST API if dedicated read nodes are enabled
201218
if self.use_dedicated_read_nodes:
202219
_create_index_with_dedicated_read_nodes(
203220
api_key=self.api_key,
@@ -216,9 +233,49 @@ def __init__(
216233
metric=metric.value,
217234
spec=spec,
218235
)
219-
220236
self.index = self.pc.Index(name=self.index_name)
221237
self.created_index = True
238+
else:
239+
try:
240+
self.index = self.pc.Index(name=self.index_name)
241+
self.created_index = False
242+
except UnauthorizedException:
243+
api_key = config["pinecone_api_key"]
244+
masked_api_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:]
245+
logger.critical(
246+
f"PineconeDB: Got UnauthorizedException when attempting to connect "
247+
f"to index '{self.index_name}' using API key '{masked_api_key}' - check "
248+
f"your API key and permissions"
249+
)
250+
raise StopUser()
251+
except NotFoundException:
252+
logger.info(
253+
f"PineconeDB: Specified index '{self.index_name}' was not found, or the "
254+
f"specified API key cannot access it. Creating new index '{self.index_name}'."
255+
)
256+
257+
# Use REST API if dedicated read nodes are enabled
258+
if self.use_dedicated_read_nodes:
259+
_create_index_with_dedicated_read_nodes(
260+
api_key=self.api_key,
261+
index_name=self.index_name,
262+
dimension=dimensions,
263+
metric=metric.value,
264+
spec=spec,
265+
node_type=self.dedicated_node_type,
266+
shards=self.dedicated_shards,
267+
replicas=self.dedicated_replicas,
268+
)
269+
else:
270+
self.pc.create_index(
271+
name=self.index_name,
272+
dimension=dimensions,
273+
metric=metric.value,
274+
spec=spec,
275+
)
276+
277+
self.index = self.pc.Index(name=self.index_name)
278+
self.created_index = True
222279

223280
info = self.pc.describe_index(self.index_name)
224281
index_dims = info["dimension"]
@@ -232,6 +289,11 @@ def __init__(
232289
f"PineconeDB index '{self.index_name}' has incorrect metric - expected:{metric.value}, found:{index_metric}"
233290
)
234291

292+
# Create a REST-based client for bulk import operations (not available in gRPC)
293+
if self.use_bulk_import:
294+
self.rest_pc = Pinecone(api_key=self.api_key)
295+
self.rest_index = self.rest_pc.Index(name=self.index_name)
296+
235297
def close(self):
236298
self.index.close()
237299

@@ -343,3 +405,53 @@ def check_namespace_exists(self, namespace: str) -> bool:
343405
f"PineconeDB: Error while listing namespaces in index '{self.index_name}' - {e}"
344406
)
345407
return False
408+
409+
def supports_bulk_import(self) -> bool:
410+
"""Return True if bulk import is enabled."""
411+
return self.use_bulk_import
412+
413+
def start_bulk_import(self, uri: str, namespace: str) -> str:
414+
"""Start bulk import, return import_id. Uses REST API (not available in gRPC)."""
415+
error_mode = (
416+
ImportErrorMode.ABORT
417+
if self.import_error_mode == "abort"
418+
else ImportErrorMode.CONTINUE
419+
)
420+
result = self.rest_index.start_import(
421+
uri=uri,
422+
error_mode=error_mode,
423+
)
424+
self.active_import_id = result.id
425+
logger.info(f"PineconeDB: Started bulk import with id={result.id} from uri={uri}")
426+
return result.id
427+
428+
def get_import_status(self, import_id: str):
429+
"""Get import status. Uses REST API (not available in gRPC)."""
430+
return self.rest_index.describe_import(id=import_id)
431+
432+
def wait_for_import(self, import_id: str, poll_interval: float = 10.0):
433+
"""Poll until import completes or fails."""
434+
while True:
435+
status = self.get_import_status(import_id)
436+
if status.status == "Completed":
437+
logger.info(f"PineconeDB: Bulk import {import_id} completed successfully")
438+
return status
439+
elif status.status == "Failed":
440+
raise Exception(f"Import failed: {status}")
441+
time.sleep(poll_interval)
442+
443+
def delete_import_namespace(self, namespace: str):
444+
"""Delete a namespace from the index. Used to remove the 'queries'
445+
namespace that gets created during bulk import from GCS datasets
446+
that contain both passages/ and queries/ folders."""
447+
try:
448+
self.index.delete_namespace(namespace=namespace)
449+
logger.info(
450+
f"PineconeDB: Deleted namespace '{namespace}' from index '{self.index_name}'"
451+
)
452+
except PineconeException as e:
453+
# Namespace may not exist if the GCS dataset didn't have a queries folder
454+
logger.debug(
455+
f"PineconeDB: Could not delete namespace '{namespace}' "
456+
f"(may not exist): {e}"
457+
)

vsb/locustfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
# Note: These are _not_ unused, they are required to register our User
3535
# and custom LoadShape classes with locust.
3636
import users
37-
from users import SetupUser, PopulateUser, FinalizeUser, RunUser, LoadShape
37+
from users import SetupUser, PopulateUser, BulkImportUser, FinalizeUser, RunUser, LoadShape
3838

3939
# Display stats of benchmark so far to console very 5 seconds.
4040
locust.stats.CONSOLE_STATS_INTERVAL_SEC = 5

0 commit comments

Comments
 (0)