Skip to content

Commit 50e52ab

Browse files
Add multimodal image support to VectorDBConnector (#14760)
1 parent 81c919b commit 50e52ab

4 files changed

Lines changed: 695 additions & 110 deletions

File tree

docs/en/annotator_entries/VectorDBConnector.md

Lines changed: 108 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,34 @@ VectorDBConnector
55
{%- capture description -%}
66
Connector for storing and retrieving embeddings from vector databases.
77

8-
This annotator takes embeddings from previous annotators (like `BertEmbeddings`,
9-
`SentenceEmbeddings`, `OpenAIEmbeddings`, etc.) and stores them in a vector database for
8+
This annotator takes embeddings from previous annotators (like `BertSentenceEmbeddings`,
9+
`SentenceEmbeddings`, `E5VEmbeddings`, etc.) and stores them in a vector database for
1010
similarity search and retrieval. Currently supports [**Pinecone**](https://app.pinecone.io/) with more providers planned.
1111

1212
The annotator automatically manages vector IDs, metadata, and batch operations, making it easy
1313
to integrate vector database capabilities into your Spark NLP pipelines without additional
1414
boilerplate code.
1515

16-
**Key Features:**
17-
- **Automatic ID Management**: Generates UUIDs or uses custom ID columns
18-
- **Metadata Support**: Store additional context with vectors (e.g., text, category, source)
19-
- **Batch Processing**: Efficiently upsert vectors to Pinecone with configurable batch sizes
20-
- **Namespace Support**: Organize vectors within index partitions
21-
- **Error Handling**: Graceful handling of connection and processing errors
16+
**Supports two modality modes:**
17+
- **`text`** (default): expects `DOCUMENT + SENTENCE_EMBEDDINGS` input columns.
18+
- **`image`**: expects `IMAGE + IMAGE_EMBEDDINGS` input columns.
2219

2320
For more extended examples see the
2421
[VectorDBConnector_Pinecone_Demo.ipynb](https://github.com/JohnSnowLabs/spark-nlp/tree/master/examples/python/annotation/text/english/vector-db/VectorDBConnector_Pinecone_Demo.ipynb) and [VectorDBConnectorTest](https://github.com/JohnSnowLabs/spark-nlp/tree/master/src/test/scala/com/johnsnowlabs/ml/ai/VectorDBConnectorTest.scala).
2522
{%- endcapture -%}
2623

2724
{%- capture python_example -%}
28-
import sparknlp
2925
from sparknlp.base import *
3026
from sparknlp.annotator import *
3127
from pyspark.ml import Pipeline
28+
from pyspark.sql.functions import lit
29+
30+
## Upserted Text Embeddings
3231

3332
documentAssembler = DocumentAssembler() \
3433
.setInputCol("text") \
3534
.setOutputCol("document")
3635

37-
tokenizer = Tokenizer() \
38-
.setInputCols(["document"]) \
39-
.setOutputCol("token")
40-
4136
embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_768") \
4237
.setInputCols(["document"]) \
4338
.setOutputCol("sentence_embeddings")
@@ -46,16 +41,14 @@ vectorDB = VectorDBConnector() \
4641
.setInputCols(["document", "sentence_embeddings"]) \
4742
.setOutputCol("vectordb_result") \
4843
.setProvider("pinecone") \
49-
.setIndexName("my-semantic-index") \
50-
.setNamespace("production") \
44+
.setIndexName('text-mode') \
45+
.setNamespace('upserted-test-embeddings') \
5146
.setIdColumn("doc_id") \
5247
.setMetadataColumns(["text", "category"]) \
53-
.setBatchSize(100)
5448

5549
pipeline = Pipeline() \
5650
.setStages([
5751
documentAssembler,
58-
tokenizer,
5952
embeddings,
6053
vectorDB
6154
])
@@ -68,30 +61,58 @@ data = spark.createDataFrame([
6861

6962
result = pipeline.fit(data).transform(data)
7063

71-
# View the output - contains vector IDs returned from Pinecone
72-
result.select("vectordb_result").show(truncate=False)
73-
+-----------------------------------------------------------+
74-
|vectordb_result |
75-
+-----------------------------------------------------------+
76-
|[[document, 0, 29, doc_001, {vectordb_status -> upserted}, |
77-
|[[document, 31, 64, doc_002, {vectordb_status -> upserted},|
78-
|[[document, 65, 95, doc_003, {vectordb_status -> upserted},|
79-
+-----------------------------------------------------------+
64+
## Upserted Image Embeddings
65+
66+
image_folder = "/content/test_images"
67+
image_df = spark.read.format("image") \
68+
.option("dropInvalid", True) \
69+
.load(image_folder)
70+
71+
image_prompt = (
72+
"<|start_header_id|>user<|end_header_id|>\n\n"
73+
"<image>\\nSummary above image in one word: "
74+
"<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
75+
)
76+
77+
test_df = image_df.withColumn("text", lit(image_prompt))
78+
79+
image_assembler = ImageAssembler() \
80+
.setInputCol("image") \
81+
.setOutputCol("image_assembler")
82+
83+
e5v_embeddings = E5VEmbeddings.pretrained() \
84+
.setInputCols(["image_assembler"]) \
85+
.setOutputCol("image_embeddings")
86+
87+
vector_db = VectorDBConnector() \
88+
.setInputCols(['image_assembler', 'image_embeddings']) \
89+
.setOutputCol('vectordb_result') \
90+
.setProvider('pinecone') \
91+
.setIndexName('e5v-test') \
92+
.setNamespace('image-integration-test-updated-metadata') \
93+
.setModalityMode('image')
94+
95+
pipeline = Pipeline().setStages([
96+
image_assembler,
97+
e5v_embeddings,
98+
vector_db
99+
])
100+
101+
image_result = pipeline.fit(test_df).transform(test_df)
80102
{%- endcapture -%}
81103

82104
{%- capture scala_example -%}
83-
import spark.implicits._
84-
import com.johnsnowlabs.nlp._
105+
import com.johnsnowlabs.nlp.base._
106+
import com.johnsnowlabs.nlp.annotator._
85107
import org.apache.spark.ml.Pipeline
108+
import org.apache.spark.sql.functions.lit
109+
110+
// Upserted Text Embeddings
86111

87112
val documentAssembler = new DocumentAssembler()
88113
.setInputCol("text")
89114
.setOutputCol("document")
90115

91-
val tokenizer = new Tokenizer()
92-
.setInputCols(Array("document"))
93-
.setOutputCol("token")
94-
95116
val embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_768")
96117
.setInputCols(Array("document"))
97118
.setOutputCol("sentence_embeddings")
@@ -100,37 +121,64 @@ val vectorDB = new VectorDBConnector()
100121
.setInputCols(Array("document", "sentence_embeddings"))
101122
.setOutputCol("vectordb_result")
102123
.setProvider("pinecone")
103-
.setIndexName("my-semantic-index")
104-
.setNamespace("production")
124+
.setIndexName("text-mode")
125+
.setNamespace("upserted-test-embeddings")
105126
.setIdColumn("doc_id")
106127
.setMetadataColumns(Array("text", "category"))
107-
.setBatchSize(100)
108128

109129
val pipeline = new Pipeline()
110130
.setStages(Array(
111131
documentAssembler,
112-
tokenizer,
113132
embeddings,
114133
vectorDB
115134
))
116135

117-
val data = Seq(
118-
("doc_001", "Spark NLP is a powerful library", "technology"),
119-
("doc_002", "Vector databases enable semantic search", "technology"),
120-
("doc_003", "Machine learning requires quality data", "data-science")
121-
).toDF("doc_id", "text", "category")
136+
val data = spark.createDataFrame(Seq(
137+
("doc_001", "Spark NLP is a powerful library", "technology"),
138+
("doc_002", "Vector databases enable semantic search", "technology"),
139+
("doc_003", "Machine learning requires quality data", "data-science")
140+
)).toDF("doc_id", "text", "category")
122141

123142
val result = pipeline.fit(data).transform(data)
124143

125-
// View the output - contains vector IDs returned from Pinecone
126-
result.select("vectordb_result").show(false)
127-
+-----------------------------------------------------------+
128-
|vectordb_result |
129-
+-----------------------------------------------------------+
130-
|[[document, 0, 29, doc_001, {vectordb_status -> upserted},_|
131-
|[[document, 31, 64, doc_002, {vectordb_status -> upserted},|
132-
|[[document, 65, 95, doc_003, {vectordb_status -> upserted},|
133-
+-----------------------------------------------------------+
144+
// Upserted Image Embeddings
145+
146+
val imageFolder = "/content/test_images"
147+
val imageDf = spark.read.format("image")
148+
.option("dropInvalid", value = true)
149+
.load(imageFolder)
150+
151+
val imagePrompt =
152+
"<|start_header_id|>user<|end_header_id|>\n\n" +
153+
"<image>\nSummary above image in one word: " +
154+
"<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
155+
156+
val testDf = imageDf.withColumn("text", lit(imagePrompt))
157+
158+
val imageAssembler = new ImageAssembler()
159+
.setInputCol("image")
160+
.setOutputCol("image_assembler")
161+
162+
val e5vEmbeddings = E5VEmbeddings.pretrained()
163+
.setInputCols(Array("image_assembler"))
164+
.setOutputCol("image_embeddings")
165+
166+
val vectorDbImage = new VectorDBConnector()
167+
.setInputCols(Array("image_assembler", "image_embeddings"))
168+
.setOutputCol("vectordb_result")
169+
.setProvider("pinecone")
170+
.setIndexName("e5v-test")
171+
.setNamespace("image-integration-test-updated-metadata")
172+
.setModalityMode("image")
173+
174+
val imagePipeline = new Pipeline()
175+
.setStages(Array(
176+
imageAssembler,
177+
e5vEmbeddings,
178+
vectorDbImage
179+
))
180+
181+
val imageResult = imagePipeline.fit(testDf).transform(testDf)
134182
{%- endcapture -%}
135183

136184
{%- capture note -%}
@@ -146,30 +194,36 @@ The annotator returns annotations with the vector ID stored in the `result` fiel
146194

147195
**ID Management:**
148196
- If `idColumn` is set, values from that column are used as vector IDs
149-
- If `idColumn` is not set, UUIDs are automatically generated
197+
- In text mode, if `idColumn` is not set, random UUIDs are automatically generated
198+
- In image mode, if `idColumn` is not set, a deterministic UUID-v3 derived from the image file path (origin) is used, ensuring stable re-indexing of the same image
150199
- The ID is returned in the output annotation's `result` field
151200

201+
**Modality Modes:**
202+
- `text` (default): Pipeline must include a `DocumentAssembler` followed by a sentence embeddings annotator. Input columns must be `[document_col, embeddings_col]`.
203+
- `image`: Pipeline must include an `ImageAssembler` followed by an image embeddings annotator (e.g. `E5VEmbeddings`). Input columns must be `[image_col, embeddings_col]`. Output annotations are synthesized DOCUMENT annotations carrying image metadata (`modality`, `image_origin`, `image_filename`, `image_width`, `image_height`, `image_nChannels`).
204+
152205
**Namespace Support:**
153206
The `namespace` parameter is optional and provider-specific. In Pinecone, it's used to partition vectors within an index, enabling multi-tenant scenarios or logical data separation.
154207

155208
**Batch Processing:**
156209
Vectors are automatically grouped into batches (default: 100) before being sent to Pinecone. This improves performance and reduces network overhead. The batch size can be customized with `setBatchSize()`.
210+
157211
{%- endcapture -%}
158212

159213
{%- capture input_anno -%}
160-
DOCUMENT, SENTENCE_EMBEDDINGS
214+
DOCUMENT, SENTENCE_EMBEDDINGS or IMAGE, SENTENCE_EMBEDDINGS
161215
{%- endcapture -%}
162216

163217
{%- capture output_anno -%}
164218
DOCUMENT
165219
{%- endcapture -%}
166220

167221
{%- capture api_link -%}
168-
[VectorDBConnector](/api/com/johnsnowlabs/nlp/ml/ai/VectorDBConnector)
222+
[VectorDBConnector](/api/com/johnsnowlabs/ml/ai/VectorDBConnector)
169223
{%- endcapture -%}
170224

171225
{%- capture python_api_link -%}
172-
[VectorDBConnector](/api/python/reference/autosummary/sparknlp/annotator/vector_db_connector/index.html)
226+
[VectorDBConnector](/api/python/reference/autosummary/sparknlp/annotator/vector_db/vector_db_connector/index.html)
173227
{%- endcapture -%}
174228

175229
{%- capture source_link -%}

0 commit comments

Comments
 (0)