Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

WIP: Import options - better gopilosa.StatusChan support #83

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"log"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -59,7 +60,9 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{},
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(int(m.BatchSize)))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
64 changes: 49 additions & 15 deletions http/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package http

import (
"log"
"time"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand All @@ -14,25 +16,31 @@ type SubjecterOpts struct {

// Main holds the config for the http command.
type Main struct {
Bind string `help:"Listen for post requests on this address."`
PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."`
Index string `help:"Pilosa index to write to."`
BatchSize uint `help:"Batch size for Pilosa imports."`
Framer pdk.DashFrame
Subjecter SubjecterOpts
Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"`
Bind string `help:"Listen for post requests on this address."`
PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."`
Index string `help:"Pilosa index to write to."`
BatchSize uint `help:"Batch size for Pilosa imports. Default: 100000"`
ImportStrategy string `help:"Import strategy. One of 'batch' or 'timeout'. Default: batch"`
ThreadCount uint `help:"Number of import workers. Default: 1"`
ImportTimeout uint `help:"Timeout in milliseconds for the import strategy. Default: 100"`
Framer pdk.DashFrame
Subjecter SubjecterOpts
Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"`
}

// NewMain gets a new Main with default values.
func NewMain() *Main {
return &Main{
Bind: ":12121",
PilosaHosts: []string{"localhost:10101"},
Index: "jsonhttp",
BatchSize: 10,
Framer: pdk.DashFrame{},
Subjecter: SubjecterOpts{},
Proxy: ":13131",
Bind: ":12121",
PilosaHosts: []string{"localhost:10101"},
Index: "jsonhttp",
BatchSize: 100000,
ImportStrategy: "batch",
ThreadCount: 1,
ImportTimeout: 200,
Framer: pdk.DashFrame{},
Subjecter: SubjecterOpts{},
Proxy: ":13131",
}
}

Expand Down Expand Up @@ -79,7 +87,33 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
importOptions := []gopilosa.ImportOption{}

if m.BatchSize < 1 {
return errors.New("Batch size should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportBatchSize(int(m.BatchSize)))

if m.ImportTimeout < 1 {
return errors.New("Import timeout should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportTimeout(time.Duration(m.ImportTimeout)*time.Millisecond))

if m.ThreadCount < 1 {
return errors.New("Number of import workers should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportThreadCount(int(m.ThreadCount)))

switch m.ImportStrategy {
case "batch":
importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.BatchImport))
case "timeout":
importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.TimeoutImport))
default:
return errors.New("Import strategy should be one of: batch, timeout")
}

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, importOptions...)
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
5 changes: 4 additions & 1 deletion kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"log"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -61,7 +62,9 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{},
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(int(m.BatchSize)))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
53 changes: 36 additions & 17 deletions pilosa.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
)

type Index struct {
client *gopilosa.Client
batchSize uint
client *gopilosa.Client

lock sync.RWMutex
index *gopilosa.Index
importWG sync.WaitGroup
bitChans map[string]chanBitIterator
fieldChans map[string]map[string]chanValIterator
frames map[string]struct{} // frames tracks which frames have been setup
}

func newIndex() *Index {
return &Index{
bitChans: make(map[string]chanBitIterator),
fieldChans: make(map[string]map[string]chanValIterator),
frames: make(map[string]struct{}),
}
}

Expand Down Expand Up @@ -124,6 +125,8 @@ type FrameSpec struct {
InverseEnabled bool
TimeQuantum gopilosa.TimeQuantum
Fields []FieldSpec

importOptions []gopilosa.ImportOption
}

func (f FrameSpec) toOptions() *gopilosa.FrameOptions {
Expand All @@ -145,23 +148,27 @@ type FieldSpec struct {

// NewRankedFrameSpec returns a new FrameSpec with the cache type ranked and the
// given name and size.
func NewRankedFrameSpec(name string, size int) FrameSpec {
func NewRankedFrameSpec(name string, size int, importOptions ...gopilosa.ImportOption) FrameSpec {
fs := FrameSpec{
Name: name,
CacheType: gopilosa.CacheTypeRanked,
CacheSize: uint(size),

importOptions: importOptions,
}
return fs
}

// NewFieldFrameSpec creates a frame which is dedicated to a single BSI field
// which will have the same name as the frame
func NewFieldFrameSpec(name string, min int, max int) FrameSpec {
func NewFieldFrameSpec(name string, min int, max int, importOptions ...gopilosa.ImportOption) FrameSpec {
fs := FrameSpec{
Name: name,
CacheType: gopilosa.CacheType(""),
CacheSize: 0,
Fields: []FieldSpec{{Name: name, Min: min, Max: max}},

importOptions: importOptions,
}
return fs
}
Expand All @@ -171,6 +178,12 @@ func NewFieldFrameSpec(name string, min int, max int) FrameSpec {
// threadsafe - callers must hold i.lock.Lock() or guarantee that they have
// exclusive access to Index before calling.
func (i *Index) setupFrame(frame FrameSpec) error {
// If this frame has already been set up, don't set it up again.
if _, ok := i.frames[frame.Name]; ok {
return nil
}
i.frames[frame.Name] = struct{}{}

var fram *gopilosa.Frame
var err error
if _, ok := i.bitChans[frame.Name]; !ok {
Expand All @@ -182,19 +195,26 @@ func (i *Index) setupFrame(frame FrameSpec) error {
if err != nil {
return errors.Wrapf(err, "creating frame '%v'", frame)
}
i.bitChans[frame.Name] = newChanBitIterator()
i.importWG.Add(1)
go func(fram *gopilosa.Frame, cbi chanBitIterator) {
defer i.importWG.Done()
err := i.client.ImportFrame(fram, cbi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize)))
if err != nil {
log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name))
}
}(fram, i.bitChans[frame.Name])

// Don't handle bits for a frame with fields.
if len(frame.Fields) == 0 {
i.bitChans[frame.Name] = newChanBitIterator()
i.importWG.Add(1)
go func(fram *gopilosa.Frame, cbi chanBitIterator) {
defer i.importWG.Done()
err := i.client.ImportFrame(fram, cbi, frame.importOptions...)
if err != nil {
log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name))
}
}(fram, i.bitChans[frame.Name])
}
} else {
// TODO: Currently this is assuming that the frame already exists.
// Should options be passed just in case? It seems odd that there
// is effectively only a getOrCreateFrame in the client.
fram, err = i.index.Frame(frame.Name, nil)
if err != nil {
return errors.Wrap(err, "making frame: %v")
return errors.Wrap(err, "getting frame: %v")
}
}
if _, ok := i.fieldChans[frame.Name]; !ok {
Expand All @@ -213,7 +233,7 @@ func (i *Index) setupFrame(frame FrameSpec) error {
i.importWG.Add(1)
go func(fram *gopilosa.Frame, field FieldSpec, cvi chanValIterator) {
defer i.importWG.Done()
err := i.client.ImportValueFrame(fram, field.Name, cvi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize)))
err := i.client.ImportValueFrame(fram, field.Name, cvi, frame.importOptions...)
if err != nil {
log.Println(errors.Wrapf(err, "starting field import for %v", field))
}
Expand All @@ -231,9 +251,8 @@ func (i *Index) ensureField(frame *gopilosa.Frame, fieldSpec FieldSpec) error {
}

// SetupPilosa returns a new Indexer after creating the given frames and starting importers.
func SetupPilosa(hosts []string, index string, frames []FrameSpec, batchsize uint) (Indexer, error) {
func SetupPilosa(hosts []string, index string, frames []FrameSpec) (Indexer, error) {
indexer := newIndex()
indexer.batchSize = batchsize
client, err := gopilosa.NewClient(hosts,
gopilosa.SocketTimeout(time.Minute*60),
gopilosa.ConnectTimeout(time.Second*60))
Expand Down
4 changes: 3 additions & 1 deletion pilosa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestSetupPilosa(t *testing.T) {
},
}

indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, 2)
indexer, err := pdk.SetupPilosa(hosts, "newindex", frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(2))
if err != nil {
t.Fatalf("SetupPilosa: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion usecase/ssb/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -49,7 +50,9 @@ func (m *Main) Run() (err error) {
return errors.Wrap(err, "getting new translator")
}
log.Println("setting up pilosa")
m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, 1000000)
m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(1000000))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
5 changes: 4 additions & 1 deletion usecase/taxi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// for profiling
_ "net/http/pprof"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -111,7 +112,9 @@ func (m *Main) Run() error {
pdk.NewRankedFrameSpec("pickup_elevation", 10000), pdk.NewRankedFrameSpec("drop_elevation", 10000),
}

m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, uint(m.BufferSize))
m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(m.BufferSize))
if err != nil {
return errors.Wrap(err, "setting up indexer")
}
Expand Down
4 changes: 3 additions & 1 deletion usecase/weather/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func (m *Main) Run() (err error) {
pdk.NewRankedFrameSpec("pressure_i", 0),
pdk.NewRankedFrameSpec("humidity", 0),
}
m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, uint(m.BufferSize))
m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(m.BufferSize))
if err != nil {
return errors.Wrap(err, "setting up pilosa")
}
Expand Down