diff --git a/pkg/plugin/processor/builtin/impl/cohere/command.go b/pkg/plugin/processor/builtin/impl/cohere/command.go index f6f94160a..7a30308d9 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/command.go +++ b/pkg/plugin/processor/builtin/impl/cohere/command.go @@ -146,7 +146,11 @@ func (p *CommandProcessor) Process(ctx context.Context, records []opencdc.Record return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed to resolve reference %v: %w", p.config.RequestBodyRef, err)}) } - content := fmt.Sprintf(p.config.Prompt, p.getInput(requestRef.Get())) + input, err := p.getInput(requestRef.Get()) + if err != nil { + return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed to get input: %w", err)}) + } + content := fmt.Sprintf(p.config.Prompt, input) for { resp, err := p.client.command(ctx, content) attempt := p.backoffCfg.Attempt() @@ -248,14 +252,16 @@ func unmarshalChatResponse(res []byte) (*ChatResponse, error) { return response, nil } -func (p *CommandProcessor) getInput(val any) string { +func (p *CommandProcessor) getInput(val any) (string, error) { switch v := val.(type) { - case opencdc.RawData: - return string(v) - case opencdc.StructuredData: - return string(v.Bytes()) + case opencdc.Position: + return string(v), nil + case opencdc.Data: + return string(v.Bytes()), nil + case string: + return v, nil default: - return fmt.Sprintf("%v", v) + return "", fmt.Errorf("unsupported type %T", v) } } @@ -266,12 +272,12 @@ func (p *CommandProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceReso ref, err := refRes.Resolve(r) if err != nil { - return fmt.Errorf("error reference resolver: %w", err) + return fmt.Errorf("error resolving reference: %w", err) } err = ref.Set(data) if err != nil { - return fmt.Errorf("error reference set: %w", err) + return fmt.Errorf("error setting reference: %w", err) } return nil diff --git a/pkg/plugin/processor/builtin/impl/cohere/command_examples_test.go b/pkg/plugin/processor/builtin/impl/cohere/command_examples_test.go index f1fcd45f1..b3074c746 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/command_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/cohere/command_examples_test.go @@ -33,7 +33,7 @@ func ExampleCommandProcessor() { Summary: `Generate responses using Cohere's command model`, Description: ` This example demonstrates how to use the Cohere command processor to generate responses for a record's ` + "`.Payload.After`" + ` field. -The processor sends the input text to the Cohere API and replaces it with the model's response.`, +The processor sends the input text from the configured "request.body" to the Cohere API and stores the model's response into the configured "response.body"`, Config: config.Config{ commandProcessorConfigApiKey: "apikey", commandProcessorConfigPrompt: "hello", diff --git a/pkg/plugin/processor/builtin/impl/cohere/embed.go b/pkg/plugin/processor/builtin/impl/cohere/embed.go index 50d3e472f..5cc6aa83f 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/embed.go +++ b/pkg/plugin/processor/builtin/impl/cohere/embed.go @@ -53,7 +53,9 @@ type embedProcConfig struct { // The maximum waiting time before retrying. BackoffRetryMax time.Duration `json:"backoffRetry.max" default:"5s"` // Specifies the field from which the request body should be created. - InputField string `json:"inputField" validate:"regex=^\\.(Payload|Key).*" default:".Payload.After"` + InputField string `json:"inputField" default:".Payload.After"` + // OutputField specifies which field will the response body be saved at. + OutputField string `json:"outputField" default:".Payload.After"` // MaxTextsPerRequest controls the number of texts sent in each Cohere embedding API call (max 96) MaxTextsPerRequest int `json:"maxTextsPerRequest" default:"96"` } @@ -75,8 +77,9 @@ type embedModel interface { type EmbedProcessor struct { sdk.UnimplementedProcessor - inputFieldRefResolver *sdk.ReferenceResolver - logger log.CtxLogger + inputFieldRefResolver *sdk.ReferenceResolver + outputFieldRefResolver *sdk.ReferenceResolver + logger log.CtxLogger config embedProcConfig backoffCfg *backoff.Backoff @@ -109,10 +112,16 @@ func (p *EmbedProcessor) Configure(ctx context.Context, cfg config.Config) error func (p *EmbedProcessor) Open(ctx context.Context) error { inputResolver, err := sdk.NewReferenceResolver(p.config.InputField) if err != nil { - return cerrors.Errorf(`failed to create a field resolver for %v parameter: %w`, p.config.InputField, err) + return cerrors.Errorf("failed to create a field resolver for %v parameter: %w", p.config.InputField, err) } p.inputFieldRefResolver = &inputResolver + outputResolver, err := sdk.NewReferenceResolver(p.config.OutputField) + if err != nil { + return cerrors.Errorf("failed to create a field resolver for %v parameter: %w", p.config.OutputField, err) + } + p.outputFieldRefResolver = &outputResolver + // Initialize the client only if it hasn't been injected if p.client == nil { p.client = &embedClient{ @@ -136,12 +145,14 @@ func (p *EmbedProcessor) Specification() (sdk.Specification, error) { // parameters it expects. return sdk.Specification{ - Name: "cohere.embed", - Summary: "Conduit processor for Cohere's embed model.", - Description: "Conduit processor for Cohere's embed model.", - Version: "v0.1.0", - Author: "Meroxa, Inc.", - Parameters: embedProcConfig{}.Parameters(), + Name: "cohere.embed", + Summary: "Conduit processor for Cohere's embed model.", + Description: "The Cohere embed processor extracts text from the configured inputField, generates embeddings " + + "using Cohere's embedding model, and stores the result in the configured outputField. " + + "The embeddings are compressed using the zstd algorithm for efficient storage and transmission.", + Version: "v0.1.0", + Author: "Meroxa, Inc.", + Parameters: embedProcConfig{}.Parameters(), }, nil } @@ -174,7 +185,12 @@ func (p *EmbedProcessor) processBatch(ctx context.Context, records []opencdc.Rec if err != nil { return out, cerrors.Errorf("failed to resolve reference %v: %w", p.config.InputField, err) } - embeddingInputs = append(embeddingInputs, p.getEmbeddingInput(inRef.Get())) + + input, err := p.getInput(inRef.Get()) + if err != nil { + return out, cerrors.Errorf("failed to get input: %w", err) + } + embeddingInputs = append(embeddingInputs, input) } var embeddings [][]float64 @@ -231,12 +247,8 @@ func (p *EmbedProcessor) processBatch(ctx context.Context, records []opencdc.Rec return out, cerrors.Errorf("failed to compress embeddings: %w", err) } - // Store the embedding in .Payload.After - switch record.Payload.After.(type) { - case opencdc.RawData: - record.Payload.After = opencdc.RawData(compressedEmbedding) - case opencdc.StructuredData: - record.Payload.After = opencdc.StructuredData{"embedding": compressedEmbedding} + if err := p.setField(&record, p.outputFieldRefResolver, compressedEmbedding); err != nil { + return out, cerrors.Errorf("failed to set output: %w", err) } out = append(out, sdk.SingleRecord(record)) @@ -245,14 +257,34 @@ func (p *EmbedProcessor) processBatch(ctx context.Context, records []opencdc.Rec return out, nil } -func (p *EmbedProcessor) getEmbeddingInput(val any) string { +func (p *EmbedProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResolver, data any) error { + if refRes == nil { + return nil + } + + ref, err := refRes.Resolve(r) + if err != nil { + return cerrors.Errorf("error resolving reference: %w", err) + } + + err = ref.Set(data) + if err != nil { + return cerrors.Errorf("error setting reference: %w", err) + } + + return nil +} + +func (p *EmbedProcessor) getInput(val any) (string, error) { switch v := val.(type) { - case opencdc.RawData: - return string(v) - case opencdc.StructuredData: - return string(v.Bytes()) + case opencdc.Position: + return string(v), nil + case opencdc.Data: + return string(v.Bytes()), nil + case string: + return v, nil default: - return fmt.Sprintf("%v", v) + return "", fmt.Errorf("unsupported type %T", v) } } diff --git a/pkg/plugin/processor/builtin/impl/cohere/embed_examples_test.go b/pkg/plugin/processor/builtin/impl/cohere/embed_examples_test.go index 97682adfb..8102ebbbd 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/embed_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/cohere/embed_examples_test.go @@ -15,39 +15,69 @@ package cohere import ( + "context" + "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil" + "github.com/goccy/go-json" ) func ExampleEmbedProcessor() { p := NewEmbedProcessor(log.Nop()) p.client = mockEmbedClient{} + embedding, err := p.client.embed(context.Background(), []string{"test input"}) + if err != nil { + panic(fmt.Sprintf("failed to get embedding: %v", err)) + } + if len(embedding) == 0 { + panic("no embeddings found") + } + + embeddingJSON, err := json.Marshal(embedding[0]) + if err != nil { + panic(fmt.Sprintf("failed to marshal embeddings: %v", err)) + } + + // Compress the embedding using zstd + compressedEmbedding, err := compressData(embeddingJSON) + if err != nil { + panic(fmt.Sprintf("failed to compress embeddings: %v", err)) + } + exampleutil.RunExample(p, exampleutil.Example{ Summary: `Generate embeddings using Cohere's embedding model`, Description: ` This example demonstrates how to use the Cohere embedding processor to generate embeddings for a record. -The processor extracts text from the specified input field (default: ".Payload.After"), sends it to the Cohere API, -and stores the resulting embeddings in the record's ".Payload.After" field as compressed data using the zstd algorithm. +The processor extracts text from the configured "inputField" (default: ".Payload.After"), sends it to the Cohere API, +and stores the resulting embeddings in the configured "outputField" as compressed data using the zstd algorithm. In this example, the processor is configured with a mock client and an API key. The input record's metadata is updated -to include the embedding model used ("embed-english-v2.0"). Note that the compressed embeddings cannot be directly compared -in this test, so the focus is on verifying the metadata update.`, +to include the embedding model used ("embed-english-v2.0").`, Config: config.Config{ - "apiKey": "fake-api-key", + "apiKey": "fake-api-key", + "inputField": ".Payload.After", + "outputField": ".Payload.After", }, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Position: opencdc.Position("pos-1"), Metadata: map[string]string{}, + Payload: opencdc.Change{ + After: opencdc.RawData("test input"), + }, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Position: opencdc.Position("pos-1"), Metadata: opencdc.Metadata{"cohere.embed.model": "embed-english-v2.0"}, + Payload: opencdc.Change{ + After: opencdc.RawData(compressedEmbedding), + }, }, }) @@ -66,7 +96,8 @@ in this test, so the focus is on verifying the metadata update.`, // "key": null, // "payload": { // "before": null, - // "after": null + // - "after": "test input" + // + "after": "(\ufffd/\ufffd\u0004\u0000i\u0000\u0000[0.1,0.2,0.3]\ufffd^xH" // } // } } diff --git a/pkg/plugin/processor/builtin/impl/cohere/embed_test.go b/pkg/plugin/processor/builtin/impl/cohere/embed_test.go index 21494964b..1c4bee0d2 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/embed_test.go +++ b/pkg/plugin/processor/builtin/impl/cohere/embed_test.go @@ -162,9 +162,10 @@ func TestEmbedProcessor_Process(t *testing.T) { wantErr string }{ { - name: "successful process with single record", + name: "successful process single record to replace .Payload.After with result of the request", config: config.Config{ - embedProcConfigApiKey: "api-key", + embedProcConfigApiKey: "api-key", + embedProcConfigOutputField: ".Payload.After", }, records: []opencdc.Record{ { @@ -201,9 +202,26 @@ func TestEmbedProcessor_Process(t *testing.T) { wantErr: "", }, { - name: "successful process with single record having structured data in payload", + name: "failed to process single record to set new field 'response' in .Payload.After having raw data", config: config.Config{ - embedProcConfigApiKey: "api-key", + embedProcConfigApiKey: "api-key", + embedProcConfigOutputField: ".Payload.After.response", + }, + records: []opencdc.Record{ + { + Payload: opencdc.Change{ + After: opencdc.RawData("test input"), + }, + Metadata: map[string]string{}, + }, + }, + wantErr: `failed to set output: error resolving reference: could not resolve field "response": .Payload.After does not contain structured data: cannot resolve reference`, + }, + { + name: "successful process single record to set new field 'response' in .Payload.After having structured data", + config: config.Config{ + embedProcConfigApiKey: "api-key", + embedProcConfigOutputField: ".Payload.After.response", }, records: []opencdc.Record{ { @@ -228,7 +246,7 @@ func TestEmbedProcessor_Process(t *testing.T) { result := []sdk.ProcessedRecord{ sdk.SingleRecord(opencdc.Record{ Payload: opencdc.Change{ - After: opencdc.StructuredData{"embedding": compressedEmbedding}, + After: opencdc.StructuredData{"test": "testInput", "response": compressedEmbedding}, }, Metadata: map[string]string{ EmbedModelMetadata: "embed-english-v2.0", diff --git a/pkg/plugin/processor/builtin/impl/cohere/paramgen_embed_proc.go b/pkg/plugin/processor/builtin/impl/cohere/paramgen_embed_proc.go index 5ec85d7f0..1a74be070 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/paramgen_embed_proc.go +++ b/pkg/plugin/processor/builtin/impl/cohere/paramgen_embed_proc.go @@ -4,8 +4,6 @@ package cohere import ( - "regexp" - "github.com/conduitio/conduit-commons/config" ) @@ -19,6 +17,7 @@ const ( embedProcConfigInputType = "inputType" embedProcConfigMaxTextsPerRequest = "maxTextsPerRequest" embedProcConfigModel = "model" + embedProcConfigOutputField = "outputField" ) func (embedProcConfig) Parameters() map[string]config.Parameter { @@ -63,9 +62,7 @@ func (embedProcConfig) Parameters() map[string]config.Parameter { Default: ".Payload.After", Description: "Specifies the field from which the request body should be created.", Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRegex{Regex: regexp.MustCompile("^\\.(Payload|Key).*")}, - }, + Validations: []config.Validation{}, }, embedProcConfigInputType: { Default: "", @@ -85,5 +82,11 @@ func (embedProcConfig) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, + embedProcConfigOutputField: { + Default: ".Payload.After", + Description: "OutputField specifies which field will the response body be saved at.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, } } diff --git a/pkg/plugin/processor/builtin/impl/cohere/rerank.go b/pkg/plugin/processor/builtin/impl/cohere/rerank.go index 499b0be44..56f50876f 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/rerank.go +++ b/pkg/plugin/processor/builtin/impl/cohere/rerank.go @@ -147,7 +147,11 @@ func (p *RerankProcessor) Process(ctx context.Context, records []opencdc.Record) return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed to resolve reference %v: %w", p.config.RequestBodyRef, err)}) } - documents = append(documents, p.getInput(requestRef.Get())) + input, err := p.getInput(requestRef.Get()) + if err != nil { + return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed to get input: %w", err)}) + } + documents = append(documents, input) } var resp []RerankResult @@ -265,14 +269,16 @@ func unmarshalRerankResponse(res []byte) (*RerankResponse, error) { return response, nil } -func (p *RerankProcessor) getInput(val any) string { +func (p *RerankProcessor) getInput(val any) (string, error) { switch v := val.(type) { - case opencdc.RawData: - return string(v) - case opencdc.StructuredData: - return string(v.Bytes()) + case opencdc.Position: + return string(v), nil + case opencdc.Data: + return string(v.Bytes()), nil + case string: + return v, nil default: - return fmt.Sprintf("%v", v) + return "", fmt.Errorf("unsupported type %T", v) } } @@ -283,12 +289,12 @@ func (p *RerankProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResol ref, err := refRes.Resolve(r) if err != nil { - return fmt.Errorf("error reference resolver: %w", err) + return fmt.Errorf("error resolving reference: %w", err) } err = ref.Set(data) if err != nil { - return fmt.Errorf("error reference set: %w", err) + return fmt.Errorf("error setting reference: %w", err) } return nil diff --git a/pkg/plugin/processor/builtin/impl/cohere/rerank_examples_test.go b/pkg/plugin/processor/builtin/impl/cohere/rerank_examples_test.go index 2386b3aa7..681f8ffcf 100644 --- a/pkg/plugin/processor/builtin/impl/cohere/rerank_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/cohere/rerank_examples_test.go @@ -34,8 +34,8 @@ func ExampleRerankProcessor() { Summary: `Generate responses using Cohere's rerank model`, Description: ` This example demonstrates how to use the Cohere rerank processor.This takes in a query and a list of texts and produces an ordered -array with each text assigned a relevance score. The processor extracts text from the specified input field (default: ".Payload.After"), -sends it to the Cohere API, and stores the response in the record's ".Payload.After" field. +array with each text assigned a relevance score. The processor extracts text from the configured "request.body" (default: ".Payload.After"), +sends it to the Cohere API, and stores the response in the configured "response.body". In this example, the processor is configured with a mock client and an API key. The input record's metadata is updated to include the rerank model used ("rerank-v3.5").`, diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.command.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.command.json index 260b450df..c15f6ac5b 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.command.json +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.command.json @@ -109,7 +109,7 @@ "examples": [ { "summary": "Generate responses using Cohere's command model", - "description": "\nThis example demonstrates how to use the Cohere command processor to generate responses for a record's `.Payload.After` field.\nThe processor sends the input text to the Cohere API and replaces it with the model's response.", + "description": "\nThis example demonstrates how to use the Cohere command processor to generate responses for a record's `.Payload.After` field.\nThe processor sends the input text from the configured \"request.body\" to the Cohere API and stores the model's response into the configured \"response.body\"", "config": { "apiKey": "apikey", "backoffRetry.count": "0", diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.embed.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.embed.json index fa812c87c..3c58e3d78 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.embed.json +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.embed.json @@ -2,7 +2,7 @@ "specification": { "name": "cohere.embed", "summary": "Conduit processor for Cohere's embed model.", - "description": "Conduit processor for Cohere's embed model.", + "description": "The Cohere embed processor extracts text from the configured inputField, generates embeddings using Cohere's embedding model, and stores the result in the configured outputField. The embeddings are compressed using the zstd algorithm for efficient storage and transmission.", "version": "v0.1.0", "author": "Meroxa, Inc.", "parameters": { @@ -55,12 +55,7 @@ "default": ".Payload.After", "description": "Specifies the field from which the request body should be created.", "type": "string", - "validations": [ - { - "type": "regex", - "value": "^\\.(Payload|Key).*" - } - ] + "validations": [] }, "inputType": { "default": "", @@ -80,6 +75,12 @@ "type": "string", "validations": [] }, + "outputField": { + "default": ".Payload.After", + "description": "OutputField specifies which field will the response body be saved at.", + "type": "string", + "validations": [] + }, "sdk.schema.decode.key.enabled": { "default": "true", "description": "Whether to decode the record key using its corresponding schema from the schema registry.", @@ -109,7 +110,7 @@ "examples": [ { "summary": "Generate embeddings using Cohere's embedding model", - "description": "\nThis example demonstrates how to use the Cohere embedding processor to generate embeddings for a record.\nThe processor extracts text from the specified input field (default: \".Payload.After\"), sends it to the Cohere API,\nand stores the resulting embeddings in the record's \".Payload.After\" field as compressed data using the zstd algorithm.\n\nIn this example, the processor is configured with a mock client and an API key. The input record's metadata is updated\nto include the embedding model used (\"embed-english-v2.0\"). Note that the compressed embeddings cannot be directly compared\nin this test, so the focus is on verifying the metadata update.", + "description": "\nThis example demonstrates how to use the Cohere embedding processor to generate embeddings for a record.\nThe processor extracts text from the configured \"inputField\" (default: \".Payload.After\"), sends it to the Cohere API,\nand stores the resulting embeddings in the configured \"outputField\" as compressed data using the zstd algorithm.\n\nIn this example, the processor is configured with a mock client and an API key. The input record's metadata is updated\nto include the embedding model used (\"embed-english-v2.0\").", "config": { "apiKey": "fake-api-key", "backoffRetry.count": "0", @@ -118,7 +119,8 @@ "backoffRetry.min": "100ms", "inputField": ".Payload.After", "maxTextsPerRequest": "96", - "model": "embed-english-v2.0" + "model": "embed-english-v2.0", + "outputField": ".Payload.After" }, "have": { "position": "cG9zLTE=", @@ -127,7 +129,7 @@ "key": null, "payload": { "before": null, - "after": null + "after": "test input" } }, "want": { @@ -139,7 +141,7 @@ "key": null, "payload": { "before": null, - "after": null + "after": "(\ufffd/\ufffd\u0004\u0000i\u0000\u0000[0.1,0.2,0.3]\ufffd^xH" } } } diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.rerank.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.rerank.json index 0627b5eb2..0daf7da13 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.rerank.json +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/cohere.rerank.json @@ -109,7 +109,7 @@ "examples": [ { "summary": "Generate responses using Cohere's rerank model", - "description": "\nThis example demonstrates how to use the Cohere rerank processor.This takes in a query and a list of texts and produces an ordered \narray with each text assigned a relevance score. The processor extracts text from the specified input field (default: \".Payload.After\"), \nsends it to the Cohere API, and stores the response in the record's \".Payload.After\" field.\n\nIn this example, the processor is configured with a mock client and an API key. The input record's metadata is updated\nto include the rerank model used (\"rerank-v3.5\").", + "description": "\nThis example demonstrates how to use the Cohere rerank processor.This takes in a query and a list of texts and produces an ordered \narray with each text assigned a relevance score. The processor extracts text from the configured \"request.body\" (default: \".Payload.After\"), \nsends it to the Cohere API, and stores the response in the configured \"response.body\".\n\nIn this example, the processor is configured with a mock client and an API key. The input record's metadata is updated\nto include the rerank model used (\"rerank-v3.5\").", "config": { "apiKey": "fakeapiKey", "backoffRetry.count": "0",