diff --git a/aws/s3/main.go b/aws/s3/main.go index 2dfa453..1880be7 100644 --- a/aws/s3/main.go +++ b/aws/s3/main.go @@ -3,6 +3,7 @@ package s3 import ( "log" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -59,7 +60,9 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(int(m.BatchSize))) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/http/command.go b/http/command.go index 4667ebc..02d15b1 100644 --- a/http/command.go +++ b/http/command.go @@ -2,7 +2,9 @@ package http import ( "log" + "time" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -14,25 +16,31 @@ type SubjecterOpts struct { // Main holds the config for the http command. type Main struct { - Bind string `help:"Listen for post requests on this address."` - PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."` - Index string `help:"Pilosa index to write to."` - BatchSize uint `help:"Batch size for Pilosa imports."` - Framer pdk.DashFrame - Subjecter SubjecterOpts - Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"` + Bind string `help:"Listen for post requests on this address."` + PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."` + Index string `help:"Pilosa index to write to."` + BatchSize uint `help:"Batch size for Pilosa imports. Default: 100000"` + ImportStrategy string `help:"Import strategy. One of 'batch' or 'timeout'. Default: batch"` + ThreadCount uint `help:"Number of import workers. Default: 1"` + ImportTimeout uint `help:"Timeout in milliseconds for the import strategy. Default: 100"` + Framer pdk.DashFrame + Subjecter SubjecterOpts + Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"` } // NewMain gets a new Main with default values. func NewMain() *Main { return &Main{ - Bind: ":12121", - PilosaHosts: []string{"localhost:10101"}, - Index: "jsonhttp", - BatchSize: 10, - Framer: pdk.DashFrame{}, - Subjecter: SubjecterOpts{}, - Proxy: ":13131", + Bind: ":12121", + PilosaHosts: []string{"localhost:10101"}, + Index: "jsonhttp", + BatchSize: 100000, + ImportStrategy: "batch", + ThreadCount: 1, + ImportTimeout: 200, + Framer: pdk.DashFrame{}, + Subjecter: SubjecterOpts{}, + Proxy: ":13131", } } @@ -79,7 +87,33 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + importOptions := []gopilosa.ImportOption{} + + if m.BatchSize < 1 { + return errors.New("Batch size should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportBatchSize(int(m.BatchSize))) + + if m.ImportTimeout < 1 { + return errors.New("Import timeout should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportTimeout(time.Duration(m.ImportTimeout)*time.Millisecond)) + + if m.ThreadCount < 1 { + return errors.New("Number of import workers should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportThreadCount(int(m.ThreadCount))) + + switch m.ImportStrategy { + case "batch": + importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.BatchImport)) + case "timeout": + importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.TimeoutImport)) + default: + return errors.New("Import strategy should be one of: batch, timeout") + } + + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, importOptions...) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/kafka/main.go b/kafka/main.go index d454ca1..439bc57 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -3,6 +3,7 @@ package kafka import ( "log" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -61,7 +62,9 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(int(m.BatchSize))) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/pilosa.go b/pilosa.go index 4d73092..cd08945 100644 --- a/pilosa.go +++ b/pilosa.go @@ -11,20 +11,21 @@ import ( ) type Index struct { - client *gopilosa.Client - batchSize uint + client *gopilosa.Client lock sync.RWMutex index *gopilosa.Index importWG sync.WaitGroup bitChans map[string]chanBitIterator fieldChans map[string]map[string]chanValIterator + frames map[string]struct{} // frames tracks which frames have been setup } func newIndex() *Index { return &Index{ bitChans: make(map[string]chanBitIterator), fieldChans: make(map[string]map[string]chanValIterator), + frames: make(map[string]struct{}), } } @@ -124,6 +125,8 @@ type FrameSpec struct { InverseEnabled bool TimeQuantum gopilosa.TimeQuantum Fields []FieldSpec + + importOptions []gopilosa.ImportOption } func (f FrameSpec) toOptions() *gopilosa.FrameOptions { @@ -145,23 +148,27 @@ type FieldSpec struct { // NewRankedFrameSpec returns a new FrameSpec with the cache type ranked and the // given name and size. -func NewRankedFrameSpec(name string, size int) FrameSpec { +func NewRankedFrameSpec(name string, size int, importOptions ...gopilosa.ImportOption) FrameSpec { fs := FrameSpec{ Name: name, CacheType: gopilosa.CacheTypeRanked, CacheSize: uint(size), + + importOptions: importOptions, } return fs } // NewFieldFrameSpec creates a frame which is dedicated to a single BSI field // which will have the same name as the frame -func NewFieldFrameSpec(name string, min int, max int) FrameSpec { +func NewFieldFrameSpec(name string, min int, max int, importOptions ...gopilosa.ImportOption) FrameSpec { fs := FrameSpec{ Name: name, CacheType: gopilosa.CacheType(""), CacheSize: 0, Fields: []FieldSpec{{Name: name, Min: min, Max: max}}, + + importOptions: importOptions, } return fs } @@ -171,6 +178,12 @@ func NewFieldFrameSpec(name string, min int, max int) FrameSpec { // threadsafe - callers must hold i.lock.Lock() or guarantee that they have // exclusive access to Index before calling. func (i *Index) setupFrame(frame FrameSpec) error { + // If this frame has already been set up, don't set it up again. + if _, ok := i.frames[frame.Name]; ok { + return nil + } + i.frames[frame.Name] = struct{}{} + var fram *gopilosa.Frame var err error if _, ok := i.bitChans[frame.Name]; !ok { @@ -182,19 +195,26 @@ func (i *Index) setupFrame(frame FrameSpec) error { if err != nil { return errors.Wrapf(err, "creating frame '%v'", frame) } - i.bitChans[frame.Name] = newChanBitIterator() - i.importWG.Add(1) - go func(fram *gopilosa.Frame, cbi chanBitIterator) { - defer i.importWG.Done() - err := i.client.ImportFrame(fram, cbi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize))) - if err != nil { - log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name)) - } - }(fram, i.bitChans[frame.Name]) + + // Don't handle bits for a frame with fields. + if len(frame.Fields) == 0 { + i.bitChans[frame.Name] = newChanBitIterator() + i.importWG.Add(1) + go func(fram *gopilosa.Frame, cbi chanBitIterator) { + defer i.importWG.Done() + err := i.client.ImportFrame(fram, cbi, frame.importOptions...) + if err != nil { + log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name)) + } + }(fram, i.bitChans[frame.Name]) + } } else { + // TODO: Currently this is assuming that the frame already exists. + // Should options be passed just in case? It seems odd that there + // is effectively only a getOrCreateFrame in the client. fram, err = i.index.Frame(frame.Name, nil) if err != nil { - return errors.Wrap(err, "making frame: %v") + return errors.Wrap(err, "getting frame: %v") } } if _, ok := i.fieldChans[frame.Name]; !ok { @@ -213,7 +233,7 @@ func (i *Index) setupFrame(frame FrameSpec) error { i.importWG.Add(1) go func(fram *gopilosa.Frame, field FieldSpec, cvi chanValIterator) { defer i.importWG.Done() - err := i.client.ImportValueFrame(fram, field.Name, cvi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize))) + err := i.client.ImportValueFrame(fram, field.Name, cvi, frame.importOptions...) if err != nil { log.Println(errors.Wrapf(err, "starting field import for %v", field)) } @@ -231,9 +251,8 @@ func (i *Index) ensureField(frame *gopilosa.Frame, fieldSpec FieldSpec) error { } // SetupPilosa returns a new Indexer after creating the given frames and starting importers. -func SetupPilosa(hosts []string, index string, frames []FrameSpec, batchsize uint) (Indexer, error) { +func SetupPilosa(hosts []string, index string, frames []FrameSpec) (Indexer, error) { indexer := newIndex() - indexer.batchSize = batchsize client, err := gopilosa.NewClient(hosts, gopilosa.SocketTimeout(time.Minute*60), gopilosa.ConnectTimeout(time.Second*60)) diff --git a/pilosa_test.go b/pilosa_test.go index 7918620..ada5f9a 100644 --- a/pilosa_test.go +++ b/pilosa_test.go @@ -52,7 +52,9 @@ func TestSetupPilosa(t *testing.T) { }, } - indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, 2) + indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(2)) if err != nil { t.Fatalf("SetupPilosa: %v", err) } diff --git a/usecase/ssb/cmd.go b/usecase/ssb/cmd.go index ea17ef3..c365fc6 100644 --- a/usecase/ssb/cmd.go +++ b/usecase/ssb/cmd.go @@ -11,6 +11,7 @@ import ( "sync" "time" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -49,7 +50,9 @@ func (m *Main) Run() (err error) { return errors.Wrap(err, "getting new translator") } log.Println("setting up pilosa") - m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, 1000000) + m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(1000000)) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/usecase/taxi/main.go b/usecase/taxi/main.go index 0b95840..06d834d 100644 --- a/usecase/taxi/main.go +++ b/usecase/taxi/main.go @@ -17,6 +17,7 @@ import ( // for profiling _ "net/http/pprof" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -111,7 +112,9 @@ func (m *Main) Run() error { pdk.NewRankedFrameSpec("pickup_elevation", 10000), pdk.NewRankedFrameSpec("drop_elevation", 10000), } - m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, uint(m.BufferSize)) + m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(m.BufferSize)) if err != nil { return errors.Wrap(err, "setting up indexer") } diff --git a/usecase/weather/main.go b/usecase/weather/main.go index 77e4a8e..27d83dc 100644 --- a/usecase/weather/main.go +++ b/usecase/weather/main.go @@ -120,7 +120,9 @@ func (m *Main) Run() (err error) { pdk.NewRankedFrameSpec("pressure_i", 0), pdk.NewRankedFrameSpec("humidity", 0), } - m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, uint(m.BufferSize)) + m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(m.BufferSize)) if err != nil { return errors.Wrap(err, "setting up pilosa") }