@@ -814,6 +814,12 @@ def __init__(self, vespa_client: Vespa, hosts: List[Dict], client_options: Dict)
814814 self ._namespace = client_options .get ("namespace" , "benchmark" )
815815 self ._cluster = client_options .get ("cluster" , None )
816816
817+ # Concurrency settings for bulk operations
818+ # Vespa has no native bulk API, so we send parallel HTTP requests.
819+ # Default is conservative (5) to avoid 429 errors. With 8 workers, this = 40 concurrent.
820+ # Vespa's default queue limit is 256, so stay well under that.
821+ self ._max_concurrent = client_options .get ("max_concurrent" , 5 )
822+
817823 # Create application package for schema management
818824 self ._app_package = ApplicationPackage (name = self ._app_name )
819825
@@ -831,14 +837,23 @@ def __init__(self, vespa_client: Vespa, hosts: List[Dict], client_options: Dict)
831837
832838 # aiohttp session (lazy initialized)
833839 self ._session : Optional [aiohttp .ClientSession ] = None
834- self ._session_lock = asyncio .Lock ()
840+ self ._session_lock : Optional [asyncio .Lock ] = None
841+
842+ # Unique client ID for debugging
843+ import os
844+ self ._client_id = f"pid{ os .getpid ()} _{ id (self )} "
845+ self .logger .info ("VespaDatabaseClient created: client_id=%s, base_url=%s" , self ._client_id , self ._base_url )
835846
836847 async def _ensure_session (self ):
837848 """Initialize aiohttp session with timing hooks for benchmarking."""
838849 # Fast path: session already exists
839850 if self ._session is not None :
840851 return
841852
853+ # Create lock lazily in async context (must be in same event loop)
854+ if self ._session_lock is None :
855+ self ._session_lock = asyncio .Lock ()
856+
842857 # Slow path: acquire lock and create session (double-checked locking)
843858 async with self ._session_lock :
844859 # Double-check after acquiring lock (another coroutine may have created it)
@@ -932,7 +947,8 @@ async def bulk(self, body: Any,
932947 Returns:
933948 Dict with bulk operation results in OpenSearch format
934949 """
935- self .logger .info ("DEBUG: VespaDatabaseClient.bulk() called with index=%s" , index )
950+ self .logger .info ("DEBUG: VespaDatabaseClient.bulk() called: client_id=%s, index=%s, session=%s" ,
951+ self ._client_id , index , "exists" if self ._session else "None" )
936952 await self ._ensure_session ()
937953
938954 document_type = index or self ._app_name
@@ -945,7 +961,7 @@ async def bulk(self, body: Any,
945961 if documents :
946962 self .logger .info ("DEBUG: First document (of %d): %s" , len (documents ), str (documents [0 ])[:500 ])
947963
948- max_concurrent = kwargs .get ("max_concurrent" , 50 )
964+ max_concurrent = kwargs .get ("max_concurrent" , self . _max_concurrent )
949965 timeout_val = aiohttp .ClientTimeout (total = kwargs .get ("request_timeout" , 30 ))
950966
951967 # Build query params for Vespa
0 commit comments