Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Batch csv #130

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7007f23
move picsv command over from go-pilosa batch ingest branch
jaffee Aug 28, 2019
55d019f
unfortunate move+refactor of picsv batch stuff
jaffee Aug 28, 2019
1979e4f
set up for concurrent/sharded id allocation
jaffee Aug 29, 2019
29a794e
add taxi data test, support timestamp parsing in picsv
jaffee Sep 1, 2019
3fa0161
update deps
jaffee Sep 1, 2019
8faaad4
fixes and test cases for csv taxi test
jaffee Sep 4, 2019
4b588ab
open files or urls
jaffee Sep 5, 2019
b80e433
add option to pass urls/files in as a file
jaffee Sep 5, 2019
9818944
change go version... not sure how this works
jaffee Sep 6, 2019
513aa24
update circleCI go versions
jaffee Sep 6, 2019
39d8f0d
fixup csv/batch tests and check syncschema errors
jaffee Sep 25, 2019
2f29e91
initial mostly working v2 Kafka source
jaffee Sep 25, 2019
5e795d0
more complete testing of Source.Record
jaffee Sep 27, 2019
1038c73
v2 kafka source unit tests looking good
jaffee Sep 27, 2019
fc3ddb2
full kafka source integration test working
jaffee Sep 30, 2019
7102a1d
split out cmd and source, implement cmd, tests
jaffee Oct 4, 2019
425107a
v2 kafka consumer mostly working except for string array
jaffee Oct 7, 2019
018238e
fix some bugs, basic integration test passing
jaffee Oct 8, 2019
6da03e2
fixup tests
jaffee Oct 8, 2019
083e17d
refactor v2/kafka stuff
jaffee Oct 9, 2019
0c25e39
mock v2 kafka consumer
jaffee Oct 10, 2019
38648f0
convert kafka consumer to pflag and env var support
jaffee Oct 10, 2019
0ce6315
dedup kafka test and improve kafka timeout between tests
jaffee Oct 10, 2019
02fbfec
update go-pilosa dep to master (batch-ingest merged)
jaffee Oct 11, 2019
d64466c
update go-pilosa dep
jaffee Oct 11, 2019
f0c735e
scaled floats working
jaffee Oct 13, 2019
25979e8
TLS support to v2 ingest/kafka
jaffee Oct 16, 2019
ba411f5
TLS working for Pilosa, implemented for Kafka and schema registry
jaffee Oct 16, 2019
aa1efcb
add help strings for TLS config
jaffee Oct 16, 2019
5c54509
add profiling and some client options
jaffee Oct 22, 2019
95cf3f5
remove unnecessary OptFieldTypeBool()s
travisturner Oct 22, 2019
1675078
Merge pull request #3 from travisturner/bool-logic-fix
jaffee Oct 23, 2019
127c236
fewer retries
jaffee Oct 23, 2019
b1c2a3e
add locally autogenned IDs
jaffee Oct 23, 2019
12f86f6
pass registry url through main to source, add MaxMsgs
jaffee Oct 28, 2019
0d64d4d
fix ineffassign in test
jaffee Oct 28, 2019
9fbf304
fix consumer bug where a final partial batch would not be imported
jaffee Oct 31, 2019
7af2c67
change primary key encoding to be pipe separated strings
jaffee Nov 1, 2019
82b6637
fix bug where a separator could be inserted initially in a PK
jaffee Nov 1, 2019
012049a
upgrade commandeer to fix duplicated primary keys
jaffee Nov 1, 2019
c24dcb3
update go-pilosa dep to latest
jaffee Nov 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
defaults: &defaults
working_directory: /go/src/github.com/pilosa/pdk
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
fast-checkout: &fast-checkout
Expand Down Expand Up @@ -30,15 +30,15 @@ jobs:
- *fast-checkout
- run: make install-gometalinter
- run: make gometalinter
test-golang-1.11: &base-test
test-golang-1.12: &base-test
<<: *defaults
steps:
- *fast-checkout
- run: make test
test-golang-1.12-rc:
test-golang-1.13:
<<: *base-test
docker:
- image: circleci/golang:1.12-rc
- image: circleci/golang:1.13
workflows:
version: 2
test:
Expand All @@ -47,9 +47,9 @@ workflows:
- linter:
requires:
- build
- test-golang-1.11:
- test-golang-1.12:
requires:
- build
- test-golang-1.12-rc:
- test-golang-1.13:
requires:
- build
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ vendor

.terraform
terraform.tfstate*

build
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ crossbuild:

install:
go install $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/pdk
go install $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/picsv

gometalinter: vendor
GO111MODULE=off gometalinter --vendor --disable-all \
Expand All @@ -54,3 +55,8 @@ install-gometalinter:
GO111MODULE=off go get -u github.com/alecthomas/gometalinter
GO111MODULE=off gometalinter --install
GO111MODULE=off go get github.com/remyoudompheng/go-misc/deadcode

build-consumers:
mkdir -p build
go build -o build/consumer-mac-`git log | head -1 | cut -d' ' -f2 | head -c 7` ./v2/cmd/kafka
GOOS=linux go build -o build/consumer-linux-`git log | head -1 | cut -d' ' -f2 | head -c 7` ./v2/cmd/kafka
106 changes: 106 additions & 0 deletions cmd/picsv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
I've been noodling on ingest to Pilosa for quite some
time. Historically, it's either been slow, difficult, or, if you were
particularly unlucky in the tools or documentation you stumbled upon,
both. The options have been:

- Calling "SetBit" via PQL. Which is insanely slow, even when you
batch multiple calls in the same request.
- Using the `pilosa import` tool, which requires one to massage their
data one field at a time into CSV files of a particular format
before importing.
- Using Pilosa's import endpoints. There are a few variants of these
(import-value for integers, import-roaring for sets and time, and
import for everything else). They are fast, but not well-documented,
still one field at a time, and quite complex to use.
- Using the import functionality in the client libraries. These use
Pilosa's import endpoints under the hood, but they are still
per-field, and you pay a significant performance penalty for the
simpler interface they give you.
- Using PDK tools. These give a nice interface, and can, in some cases
hide all the gory details and allow you to ingest data straight from
Kafka, or CSV files without writing any code at all. They use
go-pilosa's import stuff underneath, and put an even larger
performance hit on top of it, so unfortunately, we're back into
"fairly slow" territory.

The latest turn of this wheel has brought us yet another tool, one
which I'm quite sure is fast, and I hope will prove easier to use. The
basic workflow is this:

1. Using a client librarly, create your schema as usual.
2. Create a Batch object, passing it an ordered list of Pilosa fields
you are going to be using.
3. Call `Batch.Add` with `Row` objects. A row is an ID (Pilosa
column), and a list of values which correspond to the list of
fields you passed in when creating the Batch.
4. When the batch is full, `Add` will return `ErrBatchNowFull`, and
then it's time to call `Batch.Import` to ingest the data to
Pilosa. `Import` does any necessary key translation and then
efficiently (and concurrently) imports all the data to Pilosa.
5. Repeat 3 and 4 for as long as you have records to ingest.

Let's walk through an example of ingesting some tabular data in a CSV
file.

```
ID,Size,Color,Age
1,small,green,42
2,large,red,99
3,small,green,NA
4,small,,31
```

First, you open the file, and read in the header. Create a field in
Pilosa for each item in the header (you do need to know what type each
is at this point). If one of the fields represents the "ID" of that
row, don't create a field for that one. Now, create a Batch object,
passing in the list of Fields you just made which matches up with the
CSV header. Create a `Row` object with a list of `Values` of equal
length to the list of fields. So for our example, we'll have a list of
fields like `["Size", "Color", "Age", "Result"]`, and our `Row` object
will have an empty value list of length 4.

Now, read in each line of the CSV file and parse each field as needed,
then set each value in the `Values` slice to the parsed value. Set
`Row.ID` to the ID from the first field and call `Batch.Add` with the
`Row` object. For the first line in our example file, the `Row` object
will look like:

`{ID: 1, Values: {"small", "green", 42}}`

Currently, there is an implementation of this in [a branch of
go-pilosa](https://github.com/jaffee/go-pilosa/tree/batch-ingest/gpexp)
that has a couple neat properties. The routine calling `Batch.Add` can
reuse the same `Row` object each time it makes the call. This reduces
memory allocations, which decreases garbage collection and improves
cache usage. `Row.Values` is a `[]interface{}` which in Go means it's
a list of objects that can have any type. The `Batch` implementation
does type checking and supports values of various types in various
ways.

- A `uint64` will be treated directly as a Pilosa row ID.
- A `string` will be translated to a row ID (the corresponding field
must have keys enabled).
- An `int64` will be ingested as an integer — the corresponding field
must be an int field.
- A `nil` will be ignored.

`Row.ID` can be a `string` or `uint64` depending on whether you want
to use column key translation on the index.

Caveats:

The current batch implementation does not support Pilosa time fields,
or boolean or mutex fields, though that is in the works. It probably
won't be a good interface for workloads with lots of fields (hundreds
or thousands) where many of them are often nil for any given record.

If you want to see example usage of the Batch interface, check out the
code right [here](../../csv/batch.go) in the PDK's CSV tooling. The
`picsv` tool takes in CSV files and does it's best to ingest them to
Pilosa performantly with minimal supervision. It does, however, have
an optional configuration which allows one to do basic things like
specify which fields are ints vs strings, and how the CSV field names
map on to Pilosa fields. There are some examples of this in the
[tests](./batch_test.go), and be on the look out for a more complete
writeup with documentation, examples, and benchmarks soon!
14 changes: 14 additions & 0 deletions cmd/picsv/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"log"

"github.com/jaffee/commandeer"
"github.com/pilosa/pdk/csv"
)

func main() {
if err := commandeer.Run(csv.NewMain()); err != nil {
log.Fatal(err)
}
}
3 changes: 3 additions & 0 deletions csv/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
marketing-*.csv
config.json
taxiconfig.json
3 changes: 3 additions & 0 deletions csv/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

bench:
GO111MODULE=on go test -bench=. -run=ZZZ -benchtime=3x
Loading