Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(checkpoint): Add PostgreSQL store implementation with vector sea… #887

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

MohitTilwani15
Copy link

…rch support

Introduces a new PostgreSQL-backed store for checkpoints with advanced features:

  • Full CRUD operations with namespaced key-value storage
  • Optional vector embeddings using pgvector
  • Configurable vector indexing (HNSW, IVF Flat)
  • Supports complex filtering and similarity search
  • Automatic schema and index migrations

Fixes # (issue)

…rch support

Introduces a new PostgreSQL-backed store for checkpoints with advanced features:
- Full CRUD operations with namespaced key-value storage
- Optional vector embeddings using pgvector
- Configurable vector indexing (HNSW, IVF Flat)
- Supports complex filtering and similarity search
- Automatic schema and index migrations
…h method

Extends the search functionality in AsyncBatchedStore to support an optional query parameter, enhancing flexibility for complex search operations.
Copy link
Collaborator

@benjamincburns benjamincburns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Mohit,

First, thanks so much for this submission - it's really impressive work! In order to accept this, I'll need to request a few changes, however.

We like to keep the main @langchain/langgraph-checkpoint library generic - can you please create a new package in the libs directory called store-postgres and move this logic there? Please be sure to include a README.md, as that'll be helpful for us when documenting this.

I assume the changes to yarn.lock were a result of running yarn dedupe or similar? I think it'd be best if you could please revert that change, as it's unrelated to this PR and it will be hard to review in full.

The logic in this class is rather complex in places. It would be easier to review this if you broke things down a bit more into smaller functions with more readable names. It can be helpful to have typedoc summary comments on those as well, but don't feel compelled to go crazy documenting every little detail - just breaking it down with readable function names will go a long way.

It looks like you've done a good job of using positional parameters in most places, but there are a few places where it looks like you're concatenating queries together from items that could be user input, or manipulated by user input. Please make sure that you've done proper validation and/or escaping on these values where this could be the case.

I notice that you don't have any tests included. I'd recommend using the TestContainers project in your test suite in order to spin up and tear down test databases quickly. You can see how this works in the checkpoint-validation package.

Here are some links to the important operations that you'll need.

I made a few other comments inline in the code. I'll do a more thorough review once you're able to make the changes mentioned above.

Once again, thanks so much for submitting this PR - it's really nice work!

Comment on lines +93 to +122
/**
* PostgreSQL-backed store with optional vector search using pgvector.
*
* --- Basic setup and usage ---
* const store = new PostgresStore({
* connectionString: "postgresql://user:pass@localhost:5432/dbname"
* });
* await store.setup(); // Run migrations once
*
* --- Store and retrieve data ---
* await store.put(["users", "123"], "prefs", { theme: "dark" });
* const item = await store.get(["users", "123"], "prefs");
*
* --- Vector search with embeddings ---
* const store = new PostgresStore({
* connectionString: "postgresql://user:pass@localhost:5432/dbname",
* index: {
* dims: 1536,
* embeddings: new OpenAIEmbeddings({ modelName: "text-embedding-3-small" }),
* fields: ["text"] // specify which fields to embed
* }
* });
*
* --- Store documents ---
* await store.put(["docs"], "doc1", { text: "Python tutorial" });
* await store.put(["docs"], "doc2", { text: "TypeScript guide" });
*
* --- Search by similarity ---
* const results = await store.search(["docs"], { query: "programming guides" });
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including doc comments like this is great, but please use typedoc comment tags so these code samples render correctly when we generate API docs from them.

https://typedoc.org/documents/Tags._example.html#:~:text=The%20example%20tag%20indicates%20that,how%20to%20use%20the%20function.

Comment on lines +592 to +604
return [`value->$${startIndex} = $${startIndex + 1}::jsonb`, [key, JSON.stringify(value)]];
case "$gt":
return [`value->>$${startIndex} > $${startIndex + 1}`, [key, String(value)]];
case "$gte":
return [`value->>$${startIndex} >= $${startIndex + 1}`, [key, String(value)]];
case "$lt":
return [`value->>$${startIndex} < $${startIndex + 1}`, [key, String(value)]];
case "$lte":
return [`value->>$${startIndex} <= $${startIndex + 1}`, [key, String(value)]];
case "$ne":
return [`value->$${startIndex} != $${startIndex + 1}::jsonb`, [key, JSON.stringify(value)]];
default:
throw new Error(`Unsupported operator: ${op}`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct that the arguments to this function may have been user input? If that's the case, you'll need to put startIndex through a parseInt to validate it, otherwise it's a potential SQL injection vulnerability.

Comment on lines +170 to +186
// Base migrations
const migrations = [
`
CREATE TABLE IF NOT EXISTS store (
prefix text NOT NULL,
key text NOT NULL,
value jsonb NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (prefix, key)
)
`,
`
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_prefix_idx
ON store USING btree (prefix text_pattern_ops)
`,
];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend moving the migrations logic to its own file, as we'll likely add migrations over time if the scope of this tool changes. You can look to the checkpoint-postgres project to see how that was organized, if you want some inspiration. Alternatively, feel free to add a dependency on a decent migration management tool that works with postgres.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants