Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Use Go pilosa import options #78

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"log"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -59,7 +60,9 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{},
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(int(m.BatchSize)))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
64 changes: 49 additions & 15 deletions http/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package http

import (
"log"
"time"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand All @@ -14,25 +16,31 @@ type SubjecterOpts struct {

// Main holds the config for the http command.
type Main struct {
Bind string `help:"Listen for post requests on this address."`
PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."`
Index string `help:"Pilosa index to write to."`
BatchSize uint `help:"Batch size for Pilosa imports."`
Framer pdk.DashFrame
Subjecter SubjecterOpts
Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"`
Bind string `help:"Listen for post requests on this address."`
PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."`
Index string `help:"Pilosa index to write to."`
BatchSize uint `help:"Batch size for Pilosa imports. Default: 100000"`
ImportStrategy string `help:"Import strategy. One of 'batch' or 'timeout'. Default: batch"`
ThreadCount uint `help:"Number of import workers. Default: 1"`
ImportTimeout uint `help:"Timeout in milliseconds for the import strategy. Default: 100"`
Framer pdk.DashFrame
Subjecter SubjecterOpts
Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"`
}

// NewMain gets a new Main with default values.
func NewMain() *Main {
return &Main{
Bind: ":12121",
PilosaHosts: []string{"localhost:10101"},
Index: "jsonhttp",
BatchSize: 10,
Framer: pdk.DashFrame{},
Subjecter: SubjecterOpts{},
Proxy: ":13131",
Bind: ":12121",
PilosaHosts: []string{"localhost:10101"},
Index: "jsonhttp",
BatchSize: 100000,
ImportStrategy: "batch",
ThreadCount: 1,
ImportTimeout: 200,
Framer: pdk.DashFrame{},
Subjecter: SubjecterOpts{},
Proxy: ":13131",
}
}

Expand Down Expand Up @@ -79,7 +87,33 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
importOptions := []gopilosa.ImportOption{}

if m.BatchSize < 1 {
return errors.New("Batch size should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportBatchSize(int(m.BatchSize)))

if m.ImportTimeout < 1 {
return errors.New("Import timeout should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportTimeout(time.Duration(m.ImportTimeout)*time.Millisecond))

if m.ThreadCount < 1 {
return errors.New("Number of import workers should be greater than 0")
}
importOptions = append(importOptions, gopilosa.OptImportThreadCount(int(m.ThreadCount)))

switch m.ImportStrategy {
case "batch":
importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.BatchImport))
case "timeout":
importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.TimeoutImport))
default:
return errors.New("Import strategy should be one of: batch, timeout")
}

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, importOptions...)
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
5 changes: 4 additions & 1 deletion kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"log"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -61,7 +62,9 @@ func (m *Main) Run() error {
mapper := pdk.NewCollapsingMapper()
mapper.Framer = &m.Framer

indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize)
indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{},
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(int(m.BatchSize)))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
12 changes: 6 additions & 6 deletions pilosa.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
)

type Index struct {
client *gopilosa.Client
batchSize uint
client *gopilosa.Client
importOptions []gopilosa.ImportOption

lock sync.RWMutex
index *gopilosa.Index
Expand Down Expand Up @@ -186,7 +186,7 @@ func (i *Index) setupFrame(frame FrameSpec) error {
i.importWG.Add(1)
go func(fram *gopilosa.Frame, cbi chanBitIterator) {
defer i.importWG.Done()
err := i.client.ImportFrame(fram, cbi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize)))
err := i.client.ImportFrame(fram, cbi, i.importOptions...)
if err != nil {
log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name))
}
Expand All @@ -213,7 +213,7 @@ func (i *Index) setupFrame(frame FrameSpec) error {
i.importWG.Add(1)
go func(fram *gopilosa.Frame, field FieldSpec, cvi chanValIterator) {
defer i.importWG.Done()
err := i.client.ImportValueFrame(fram, field.Name, cvi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize)))
err := i.client.ImportValueFrame(fram, field.Name, cvi, i.importOptions...)
if err != nil {
log.Println(errors.Wrapf(err, "starting field import for %v", field))
}
Expand All @@ -231,9 +231,9 @@ func (i *Index) ensureField(frame *gopilosa.Frame, fieldSpec FieldSpec) error {
}

// SetupPilosa returns a new Indexer after creating the given frames and starting importers.
func SetupPilosa(hosts []string, index string, frames []FrameSpec, batchsize uint) (Indexer, error) {
func SetupPilosa(hosts []string, index string, frames []FrameSpec, importOptions ...gopilosa.ImportOption) (Indexer, error) {
indexer := newIndex()
indexer.batchSize = batchsize
indexer.importOptions = importOptions
client, err := gopilosa.NewClient(hosts,
gopilosa.SocketTimeout(time.Minute*60),
gopilosa.ConnectTimeout(time.Second*60))
Expand Down
4 changes: 3 additions & 1 deletion pilosa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestSetupPilosa(t *testing.T) {
},
}

indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, 2)
indexer, err := pdk.SetupPilosa(hosts, "newindex", frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(2))
if err != nil {
t.Fatalf("SetupPilosa: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion usecase/ssb/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -49,7 +50,9 @@ func (m *Main) Run() (err error) {
return errors.Wrap(err, "getting new translator")
}
log.Println("setting up pilosa")
m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, 1000000)
m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(1000000))
if err != nil {
return errors.Wrap(err, "setting up Pilosa")
}
Expand Down
5 changes: 4 additions & 1 deletion usecase/taxi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// for profiling
_ "net/http/pprof"

gopilosa "github.com/pilosa/go-pilosa"
"github.com/pilosa/pdk"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -111,7 +112,9 @@ func (m *Main) Run() error {
pdk.NewRankedFrameSpec("pickup_elevation", 10000), pdk.NewRankedFrameSpec("drop_elevation", 10000),
}

m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, uint(m.BufferSize))
m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(m.BufferSize))
if err != nil {
return errors.Wrap(err, "setting up indexer")
}
Expand Down
4 changes: 3 additions & 1 deletion usecase/weather/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func (m *Main) Run() (err error) {
pdk.NewRankedFrameSpec("pressure_i", 0),
pdk.NewRankedFrameSpec("humidity", 0),
}
m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, uint(m.BufferSize))
m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames,
gopilosa.OptImportStrategy(gopilosa.BatchImport),
gopilosa.OptImportBatchSize(m.BufferSize))
if err != nil {
return errors.Wrap(err, "setting up pilosa")
}
Expand Down