Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/pinecone-connector-refactor-summary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Pinecone Connector Phase 1 Summary

## Overview
Phase 1 of the Pinecone connector refactor modernizes connection management, reliability tooling, and automated validation within the VectorDB subsystem. The work introduces shared helpers, centralized client lifecycle handling, and targeted unit tests to harden production behavior while laying groundwork for future observability and documentation.

## Change Summary
| Area | File(s) | Description |
| --- | --- | --- |
| Typed configuration | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/types.ts` | Defines Pinecone connector settings covering auth, retry, timeout, and health inputs to keep runtime code declarative. |
| Async utilities | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/async-utils.ts` | Adds cancellable timeout wrapper and jittered sleep helper to standardize operation control flow. |
| Resilient retries | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/retry-utils.ts` | Implements exponential backoff with jitter, per-operation timeouts, and attempt callbacks. |
| Connection lifecycle | `packages/core/src/subsystems/IO/VectorDB.service/connectors/pinecone/connection-manager.ts` | Centralizes Pinecone client creation, credential lookups (ManagedVault/Vault/direct), and local caching. |
| Main connector refactor | `packages/core/src/subsystems/IO/VectorDB.service/connectors/PineconeVectorDB.class.ts` | Consumes the new helpers for namespace CRUD, query, and upsert flows with retry + timeout safety. |
| Automated validation | `packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts` | Covers helper utilities and connection manager behavior using vitest without external dependencies. |

## Rationale and Improvements
- **Configuration clarity**. Moving Pinecone-specific knobs into `types.ts` allows safer merges and runtime validation while de-duplicating hard-coded constants.
- **Deterministic async control**. `async-utils.ts` enables uniform timeout enforcement and jittered backoff, preventing thread starvation and aligning with platform abort semantics.
- **Consistent resiliency posture**. `retry-utils.ts` enforces explicit retry policies per operation, reducing copy/paste retry logic and concentrating future enhancements (e.g., metrics hooks).
- **Credential hygiene**. `connection-manager.ts` encapsulates ManagedVault and Vault resolution with local caching, minimizing secret fetches and ensuring fallbacks for direct configuration.
- **Safer connector operations**. `PineconeVectorDB.class.ts` now delegates to the connection manager and wraps I/O calls in timeouts/retries, shielding consumers from transient Pinecone failures.
- **Regression coverage**. Adding `pinecone-helpers.test.ts` establishes a dedicated test harness, increasing confidence in helper evolution and supporting CI enforcement.

## Business Impact
- **Reliability for Production Agents**. Centralized connection lifecycle management, retries, and jittered backoff significantly reduce transient Pinecone failures. These improvements reinforce uptime and stability, a requirement for production-grade AI agent runtimes.
- **Enterprise Security and Compliance**. Vault-based credential resolution with caching formalizes secret management. This creates an auditable and secure flow, aligning with enterprise adoption requirements and SLA-driven commitments.
- **Developer Velocity and Ecosystem Growth**. Typed configuration, shared helpers, and streamlined retry logic simplify extension and maintenance. This reduces cognitive load and accelerates safe feature delivery, directly supporting SmythOS’s goal of rapid ecosystem growth.
- **Operational Efficiency**. The connection manager exposes explicit reset and shutdown pathways. This simplifies incident response playbooks, improves recoverability, and enhances operational resilience.
- **Pathway to Enterprise Observability**. Retry and connection abstractions provide natural integration points for structured logging, metrics, and circuit breaking. These hooks lay the groundwork for future observability enhancements and SLA enforcement.

## Testing Instructions
- **Install dependencies**. From the repository root run `pnpm install` if workspace packages have not been bootstrapped.
- **Prepare environment**. Ensure any required `.env` secrets for vault integration are stubbed or mocked as the unit suite relies on internal mocks and does not call external services.
- **Execute targeted suite**. Run `pnpm vitest run packages/core/tests/unit/004-VectorDB/pinecone-helpers.test.ts` from the repository root to validate the Pinecone helper and connection manager behavior.
- **Inspect output**. Confirm all tests pass and review console warnings for context on mocked vault connectors.

## Future Enhancements
- **Structured logging and metrics** integration within `connection-manager.ts` and `PineconeVectorDB.class.ts` to surface latency, retries, and health signals.
- **Health probe orchestration** leveraging the `health` configuration placeholders to support automated circuit breaking.
- **Extended documentation**: update `docs/core/documents/connectors_vectordb.html` (or source markdown) to reference Pinecone-specific connection guidance and troubleshooting.
- **Integration tests**: add staging exercises with a live Pinecone sandbox, gated behind feature flags, to validate end-to-end pipelines.
- **Config validation tooling** ensuring that runtime-provided retry/timeout/auth options conform to expected ranges before use.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@ import { AccessRequest } from '@sre/Security/AccessControl/AccessRequest.class';
import { AccessCandidate } from '@sre/Security/AccessControl/AccessCandidate.class';
import { SecureConnector } from '@sre/Security/SecureConnector.class';
import { VectorDBConnector, DeleteTarget } from '../VectorDBConnector';
import {
DatasourceDto,
IStorageVectorDataSource,
IStorageVectorNamespace,
IVectorDataSourceDto,
QueryOptions,
VectorsResultData,
} from '@sre/types/VectorDB.types';
import { Pinecone } from '@pinecone-database/pinecone';
import { DatasourceDto, IStorageVectorDataSource, IVectorDataSourceDto, QueryOptions, VectorsResultData } from '@sre/types/VectorDB.types';
import { ConnectorService } from '@sre/Core/ConnectorsService';
import { Logger } from '@sre/helpers/Log.helper';
import { NKVConnector } from '@sre/IO/NKV.service/NKVConnector';
Expand All @@ -22,17 +14,20 @@ import { JSONContentHelper } from '@sre/helpers/JsonContent.helper';
import { CacheConnector } from '@sre/MemoryManager/Cache.service/CacheConnector';
import crypto from 'crypto';
import { BaseEmbedding, TEmbeddings } from '../embed/BaseEmbedding';
import { EmbeddingsFactory, SupportedProviders, SupportedModels } from '../embed';
import { EmbeddingsFactory } from '../embed';
import { chunkText } from '@sre/utils/string.utils';
import { jsonrepair } from 'jsonrepair';
import { PineconeConnectionManager } from './pinecone/connection-manager';
import { PineconeAuthConfig, PineconeHealthConfig, PineconeRequestTimeouts, PineconeRetryConfig } from './pinecone/types';
import { withSafeRetry } from './pinecone/retry-utils';

const console = Logger('Pinecone VectorDB');

export type PineconeConfig = {
/**
* The Pinecone API key
*/
apiKey: string;
apiKey?: string;
/**
* The Pinecone index name
*/
Expand All @@ -41,11 +36,18 @@ export type PineconeConfig = {
* The embeddings model to use
*/
embeddings?: TEmbeddings;
/**
* Auth configuration for vault-based credentials
*/
auth?: PineconeAuthConfig;
requestTimeouts?: PineconeRequestTimeouts;
retry?: PineconeRetryConfig;
health?: PineconeHealthConfig;
};
export class PineconeVectorDB extends VectorDBConnector {
public name = 'PineconeVectorDB';
public id = 'pinecone';
private client: Pinecone;
private connection: PineconeConnectionManager;
private indexName: string;
private cache: CacheConnector;
private accountConnector: AccountConnector;
Expand All @@ -54,20 +56,15 @@ export class PineconeVectorDB extends VectorDBConnector {

constructor(protected _settings: PineconeConfig) {
super(_settings);
if (!_settings.apiKey) {
console.warn('Missing Pinecone API key : returning empty Pinecone connector');
return;
}
if (!_settings.indexName) {
console.warn('Missing Pinecone index name : returning empty Pinecone connector');
return;
}

this.client = new Pinecone({
apiKey: _settings.apiKey,
});
console.info('Pinecone client initialized');
console.info('Pinecone index name:', _settings.indexName);
if (!_settings.apiKey && !_settings.auth?.vaultKey) {
console.warn('Pinecone connector configured without apiKey or auth.vaultKey; operations will fail until credentials are provided at runtime');
}

this.indexName = _settings.indexName;
this.accountConnector = ConnectorService.getAccountConnector();
this.cache = ConnectorService.getCacheConnector();
Expand All @@ -77,8 +74,16 @@ export class PineconeVectorDB extends VectorDBConnector {
}
if (!_settings.embeddings.params) _settings.embeddings.params = { dimensions: 1024 };
if (!_settings.embeddings.params?.dimensions) _settings.embeddings.params.dimensions = 1024;

this.embedder = EmbeddingsFactory.create(_settings.embeddings.provider, _settings.embeddings);

this.connection = new PineconeConnectionManager({
indexName: this.indexName,
embeddings: _settings.embeddings,
auth: _settings.auth ?? (_settings.apiKey ? { apiKey: _settings.apiKey } : undefined),
requestTimeouts: _settings.requestTimeouts,
retry: _settings.retry,
health: _settings.health,
});
}

public async getResourceACL(resourceId: string, candidate: IAccessCandidate): Promise<ACL> {
Expand Down Expand Up @@ -109,19 +114,27 @@ export class PineconeVectorDB extends VectorDBConnector {
displayName: namespace,
...metadata,
};
await this.client
.Index(this.indexName)
.namespace(preparedNs)
.upsert([
{
id: `_reserved_${preparedNs}`,
values: this.embedder.dummyVector,
metadata: {
isSkeletonVector: true,
...nsData,
},
},
]);
const client = await this.connection.getClient(acRequest);
await withSafeRetry(
'createNamespace',
async () => {
await client
.Index(this.indexName)
.namespace(preparedNs)
.upsert([
{
id: `_reserved_${preparedNs}`,
values: this.embedder.dummyVector,
metadata: {
isSkeletonVector: true,
...nsData,
},
},
]);
},
this.settings.retry,
this.settings.requestTimeouts
);

await this.setACL(acRequest, preparedNs, acl.ACL);

Expand All @@ -131,27 +144,37 @@ export class PineconeVectorDB extends VectorDBConnector {
@SecureConnector.AccessControl
protected async namespaceExists(acRequest: AccessRequest, namespace: string): Promise<boolean> {
//const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate);
const stats = await this.client.Index(this.indexName).describeIndexStats();
const client = await this.connection.getClient(acRequest);
const stats = await withSafeRetry(
'describeIndexStats',
async () => client.Index(this.indexName).describeIndexStats(),
this.settings.retry,
this.settings.requestTimeouts
);
return Object.keys(stats.namespaces).includes(this.constructNsName(acRequest.candidate as AccessCandidate, namespace));
}

@SecureConnector.AccessControl
protected async deleteNamespace(acRequest: AccessRequest, namespace: string): Promise<void> {
//const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate);
//const candidate = AccessCandidate.team(teamId);
const preparedNs = this.constructNsName(acRequest.candidate as AccessCandidate, namespace);

await this.client
.Index(this.indexName)
.namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace))
.deleteAll()
.catch((e) => {
if (e?.name == 'PineconeNotFoundError') {
console.warn(`Namespace ${namespace} does not exist and was requested to be deleted`);
return;
}
throw e;
});
const client = await this.connection.getClient(acRequest);
await withSafeRetry(
'deleteNamespace',
async () =>
client
.Index(this.indexName)
.namespace(preparedNs)
.deleteAll()
.catch((e) => {
if (e?.name == 'PineconeNotFoundError') {
console.warn(`Namespace ${namespace} does not exist and was requested to be deleted`);
return;
}
throw e;
}),
this.settings.retry,
this.settings.requestTimeouts
);

await this.deleteACL(AccessCandidate.clone(acRequest.candidate), namespace);
}
Expand All @@ -165,20 +188,27 @@ export class PineconeVectorDB extends VectorDBConnector {
): Promise<VectorsResultData> {
//const teamId = await this.accountConnector.getCandidateTeam(acRequest.candidate);

const pineconeIndex = this.client.Index(this.indexName).namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace));
const client = await this.connection.getClient(acRequest);
const pineconeIndex = client.Index(this.indexName).namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace));
let _vector = query;
if (typeof query === 'string') {
_vector = await this.embedder.embedText(query, acRequest.candidate as AccessCandidate);
}

const topK = (options.topK || 10) + 1; //* we increment one in case it included the skeleton vector

const results = await pineconeIndex.query({
topK,
vector: _vector as number[],
includeMetadata: true,
includeValues: true,
});
const results = await withSafeRetry(
'query',
async () =>
pineconeIndex.query({
topK,
vector: _vector as number[],
includeMetadata: true,
includeValues: true,
}),
this.settings.retry,
this.settings.requestTimeouts
);

let matches = [];

Expand Down Expand Up @@ -226,10 +256,17 @@ export class PineconeVectorDB extends VectorDBConnector {
}));

// await pineconeStore.addDocuments(chunks, ids);
await this.client
.Index(this.indexName)
.namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace))
.upsert(preparedSource);
const client = await this.connection.getClient(acRequest);
await withSafeRetry(
'upsert',
async () =>
client
.Index(this.indexName)
.namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace))
.upsert(preparedSource),
this.settings.retry,
this.settings.requestTimeouts
);

const accessCandidate = acRequest.candidate;

Expand All @@ -251,10 +288,17 @@ export class PineconeVectorDB extends VectorDBConnector {
} else {
const _ids = Array.isArray(deleteTarget) ? deleteTarget : [deleteTarget];

const res = await this.client
.Index(this.indexName)
.namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace))
.deleteMany(_ids);
const client = await this.connection.getClient(acRequest);
await withSafeRetry(
'deleteMany',
async () =>
client
.Index(this.indexName)
.namespace(this.constructNsName(acRequest.candidate as AccessCandidate, namespace))
.deleteMany(_ids),
this.settings.retry,
this.settings.requestTimeouts
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { setTimeout as sleep } from 'timers/promises';

export type TimeoutOptions = {
timeoutMs: number;
signal?: AbortSignal;
onTimeout?: () => void;
};

export async function sleepWithJitter(delayMs: number, jitterRatio: number, signal?: AbortSignal) {
if (delayMs <= 0) return;
const jitter = delayMs * Math.random() * jitterRatio;
const totalDelay = delayMs + jitter;
await sleep(totalDelay, undefined, { signal });
}

export async function withTimeout<T>(operation: (signal: AbortSignal) => Promise<T>, options: TimeoutOptions): Promise<T> {
const controller = new AbortController();
const { timeoutMs, signal, onTimeout } = options;

const timer = setTimeout(() => {
controller.abort(new Error('Operation timed out'));
onTimeout?.();
}, timeoutMs).unref?.();

const abortListener = () => controller.abort(signal?.reason ?? new Error('Operation aborted'));

if (signal) {
if (signal.aborted) {
clearTimeout(timer as unknown as NodeJS.Timeout);
controller.abort(signal.reason ?? new Error('Operation aborted before start'));
} else {
signal.addEventListener('abort', abortListener, { once: true });
}
}

try {
const result = await Promise.race([
operation(controller.signal),
new Promise<never>((_, reject) => {
const abortHandler = (event: Event) => {
const reason = (event?.target as AbortSignal)?.reason ?? new Error('Operation aborted');
reject(reason instanceof Error ? reason : new Error(String(reason)));
};
controller.signal.addEventListener('abort', abortHandler, { once: true });
}),
]);
return result;
} finally {
clearTimeout(timer as unknown as NodeJS.Timeout);
if (signal) {
signal.removeEventListener('abort', abortListener);
}
}
}
Loading