Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
563 changes: 435 additions & 128 deletions firestore/pipeline.go

Large diffs are not rendered by default.

400 changes: 198 additions & 202 deletions firestore/pipeline_integration_test.go

Large diffs are not rendered by default.

177 changes: 82 additions & 95 deletions firestore/pipeline_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

package firestore

import (
"fmt"
"reflect"

pb "cloud.google.com/go/firestore/apiv1/firestorepb"
)

// PipelineSource is a factory for creating Pipeline instances.
// It is obtained by calling [Client.Pipeline()].
//
Expand All @@ -30,71 +23,32 @@ type PipelineSource struct {
client *Client
}

// CollectionHints provides hints to the query planner.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type CollectionHints map[string]any

// WithForceIndex specifies an index to force the query to use.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ch CollectionHints) WithForceIndex(index string) CollectionHints {
newCH := make(CollectionHints, len(ch)+1)
for k, v := range ch {
newCH[k] = v
}
newCH["force_index"] = index
return newCH
func WithForceIndex(index string) CollectionSourceOption {
return newFuncOption(func(options map[string]any) {
options["force_index"] = index
})
}

// WithIgnoreIndexFields specifies fields to ignore when selecting an index.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ch CollectionHints) WithIgnoreIndexFields(fields ...string) CollectionHints {
newCH := make(CollectionHints, len(ch)+1)
for k, v := range ch {
newCH[k] = v
}
newCH["ignore_index_fields"] = fields
return newCH
}

func (ch CollectionHints) toProto() (map[string]*pb.Value, error) {
if ch == nil {
return nil, nil
}
optsMap := make(map[string]*pb.Value)
for key, val := range ch {
valPb, _, err := toProtoValue(reflect.ValueOf(val))
if err != nil {
return nil, fmt.Errorf("firestore: error converting option %q: %w", key, err)
}
optsMap[key] = valPb
}
return optsMap, nil
}

// collectionStageSettings provides settings for Collection and CollectionGroup pipeline stages.
type collectionStageSettings struct {
Hints CollectionHints
}

func (cs *collectionStageSettings) toProto() (map[string]*pb.Value, error) {
if cs == nil {
return nil, nil
}
return cs.Hints.toProto()
func WithIgnoreIndexFields(fields ...string) CollectionSourceOption {
return newFuncOption(func(options map[string]any) {
options["ignore_index_fields"] = fields
})
}

// CollectionOption is an option for a Collection pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type CollectionOption interface {
apply(co *collectionStageSettings)
StageOption
isCollectionOption()
}

Expand All @@ -103,62 +57,50 @@ type CollectionOption interface {
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type CollectionGroupOption interface {
apply(co *collectionStageSettings)
StageOption
isCollectionGroupOption()
}

// funcOption wraps a function that modifies collectionStageSettings
// CollectionSourceOption is an option that can be applied to both Collection and CollectionGroup pipeline stages.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type CollectionSourceOption interface {
CollectionOption
CollectionGroupOption
}

// funcOption wraps a function that modifies an options map
// into an implementation of the CollectionOption and CollectionGroupOption interfaces.
type funcOption struct {
f func(*collectionStageSettings)
f func(map[string]any)
}

func (fo *funcOption) apply(cs *collectionStageSettings) {
fo.f(cs)
func (fo *funcOption) applyStage(options map[string]any) {
fo.f(options)
}

func (*funcOption) isCollectionOption() {}

func (*funcOption) isCollectionOption() {}
func (*funcOption) isCollectionGroupOption() {}

func newFuncOption(f func(*collectionStageSettings)) *funcOption {
func newFuncOption(f func(map[string]any)) *funcOption {
return &funcOption{
f: f,
}
}

// WithCollectionHints specifies hints for the query planner.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func WithCollectionHints(hints CollectionHints) CollectionOption {
return newFuncOption(func(cs *collectionStageSettings) {
cs.Hints = hints
})
}

// WithCollectionGroupHints specifies hints for the query planner.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func WithCollectionGroupHints(hints CollectionHints) CollectionGroupOption {
return newFuncOption(func(cs *collectionStageSettings) {
cs.Hints = hints
})
}

// Collection creates a new [Pipeline] that operates on the specified Firestore collection.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ps *PipelineSource) Collection(path string, opts ...CollectionOption) *Pipeline {
cs := &collectionStageSettings{}
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.apply(cs)
opt.applyStage(options)
}
}
return newPipeline(ps.client, newInputStageCollection(path, cs))
return newPipeline(ps.client, newInputStageCollection(path, options))
}

// CollectionGroup creates a new [Pipeline] that operates on all documents in a group
Expand All @@ -174,29 +116,59 @@ func (ps *PipelineSource) Collection(path string, opts ...CollectionOption) *Pip
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ps *PipelineSource) CollectionGroup(collectionID string, opts ...CollectionGroupOption) *Pipeline {
cgs := &collectionStageSettings{}
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.apply(cgs)
opt.applyStage(options)
}
}
return newPipeline(ps.client, newInputStageCollectionGroup("", collectionID, cgs))
return newPipeline(ps.client, newInputStageCollectionGroup("", collectionID, options))
}

// DatabaseOption is an option for a Database pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type DatabaseOption interface {
StageOption
isDatabaseOption()
}

// Database creates a new [Pipeline] that operates on all documents in the Firestore database.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ps *PipelineSource) Database() *Pipeline {
return newPipeline(ps.client, newInputStageDatabase())
func (ps *PipelineSource) Database(opts ...DatabaseOption) *Pipeline {
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.applyStage(options)
}
}
return newPipeline(ps.client, newInputStageDatabase(options))
}

// DocumentsOption is an option for a Documents pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type DocumentsOption interface {
StageOption
isDocumentsOption()
}

// Documents creates a new [Pipeline] that operates on a specific set of Firestore documents.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ps *PipelineSource) Documents(refs ...*DocumentRef) *Pipeline {
return newPipeline(ps.client, newInputStageDocuments(refs...))
func (ps *PipelineSource) Documents(refs []*DocumentRef, opts ...DocumentsOption) *Pipeline {
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.applyStage(options)
}
}
return newPipeline(ps.client, newInputStageDocuments(refs, options))
}

// CreateFromQuery creates a new [Pipeline] from the given [Queryer]. Under the hood, this will
Expand All @@ -217,10 +189,25 @@ func (ps *PipelineSource) CreateFromAggregationQuery(query *AggregationQuery) *P
return query.Pipeline()
}

// LiteralsOption is an option for a Literals pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type LiteralsOption interface {
StageOption
isLiteralsOption()
}

// Literals creates a new [Pipeline] that operates on a fixed set of predefined document objects.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (ps *PipelineSource) Literals(documents ...map[string]any) *Pipeline {
return newPipeline(ps.client, newInputStageLiterals(documents...))
func (ps *PipelineSource) Literals(documents []map[string]any, opts ...LiteralsOption) *Pipeline {
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.applyStage(options)
}
}
return newPipeline(ps.client, newInputStageLiterals(documents, options))
}
Loading
Loading