Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit 5bdf7f8

Browse files
committed
test and fix clearValues bug
The test demonstrates how a nil integer value added to a Batch would clear an existing integer value instead of ignoring it. This fixes it by tracking which records have nil integer values for each field, and removing those ids/values before importing. In order to be able to execute import requests concurrently, while re-using the same slices for ids and values for each import, we expose encoding separately from importing in the Client, so that we can encode serially reusing the same slices, and import the encoded data concurrently.
1 parent e40d84a commit 5bdf7f8

File tree

3 files changed

+131
-89
lines changed

3 files changed

+131
-89
lines changed

Diff for: client.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -730,20 +730,42 @@ func (c *Client) importValues(field *Field,
730730
// index,field,shard on all nodes which should hold that shard. It
731731
// assumes that the ids have been translated from keys if necessary
732732
// and so tells Pilosa to ignore checking if the index uses column
733-
// keys.
733+
// keys. ImportValues wraps EncodeImportValues and DoImportValues —
734+
// these are broken out and exported so that performance conscious
735+
// users can re-use the same vals and ids byte buffers for local
736+
// encoding, while performing the imports concurrently.
734737
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error {
738+
path, data, err := c.EncodeImportValues(index, field, shard, vals, ids, clear)
739+
if err != nil {
740+
return errors.Wrap(err, "encoding import-values request")
741+
}
742+
err = c.DoImportValues(index, shard, path, data)
743+
return errors.Wrap(err, "doing import values")
744+
}
745+
746+
// EncodeImportValues computes the HTTP path and payload for an
747+
// import-values request. It is typically followed by a call to
748+
// DoImportValues.
749+
func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) (path string, data []byte, err error) {
735750
msg := &pbuf.ImportValueRequest{
736751
Index: index,
737752
Field: field,
738753
Shard: shard,
739754
ColumnIDs: ids,
740755
Values: vals,
741756
}
742-
data, err := proto.Marshal(msg)
757+
data, err = proto.Marshal(msg)
743758
if err != nil {
744-
return errors.Wrap(err, "marshaling to protobuf")
759+
return "", nil, errors.Wrap(err, "marshaling to protobuf")
745760
}
746-
path := fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
761+
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
762+
return path, data, nil
763+
}
764+
765+
// DoImportValues takes a path and data payload (normally from
766+
// EncodeImportValues), logs the import, finds all nodes which own
767+
// this shard, and concurrently imports to those nodes.
768+
func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error {
747769
c.logImport(index, path, shard, false, data)
748770

749771
uris, err := c.getURIsForShard(index, shard)

Diff for: gpexp/importbatch.go

+43-83
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ type Batch struct {
6161
// ids is a slice of length batchSize of record IDs
6262
ids []uint64
6363

64-
// rowIDs is a slice of length len(Batch.header) which contains slices of length batchSize
64+
// rowIDs is a map of field names to slices of length batchSize
65+
// which contain row IDs.
6566
rowIDs map[string][]uint64
6667

6768
// values holds the values for each record of an int field
@@ -70,17 +71,9 @@ type Batch struct {
7071
// times holds a time for each record. (if any of the fields are time fields)
7172
times []QuantizedTime
7273

73-
// clearValues holds a slice of indices into b.ids for each
74-
// integer field which has nil values. After translation, these
75-
// slices will be filled out with the actual column IDs those
76-
// indices pertain to so that they can be cleared.
77-
//
78-
// TODO: This is actually a problem — a nil value doesn't mean
79-
// "clear this value", it should mean "don't touch this value", so
80-
// there is no way currently to update a record with int values
81-
// without knowing all the int values, clearing them, or setting
82-
// them to something else in the process.
83-
clearValues map[string][]uint64
74+
// nullIndices holds a slice of indices into b.ids for each
75+
// integer field which has nil values.
76+
nullIndices map[string][]uint64
8477

8578
// TODO, support timestamps, set fields with more than one value per record, mutex, and bool.
8679

@@ -147,7 +140,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
147140
ids: make([]uint64, 0, size),
148141
rowIDs: rowIDs,
149142
values: values,
150-
clearValues: make(map[string][]uint64),
143+
nullIndices: make(map[string][]uint64),
151144
toTranslate: tt,
152145
toTranslateID: make(map[string][]int),
153146
transCache: NewMapTranslator(),
@@ -164,11 +157,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
164157
return b, nil
165158
}
166159

167-
// Row represents a single record which can be added to a RecordBatch.
168-
//
169-
// Note: it is not named "Record" because there is a conflict with
170-
// another type in this package. This may be rectified by deprecating
171-
// something or splitting packages in the future.
160+
// Row represents a single record which can be added to a Batch.
172161
type Row struct {
173162
ID interface{}
174163
Values []interface{}
@@ -316,12 +305,12 @@ func (b *Batch) Add(rec Row) error {
316305
case nil:
317306
if field.Opts().Type() == pilosa.FieldTypeInt {
318307
b.values[field.Name()] = append(b.values[field.Name()], 0)
319-
clearIndexes, ok := b.clearValues[field.Name()]
308+
nullIndices, ok := b.nullIndices[field.Name()]
320309
if !ok {
321-
clearIndexes = make([]uint64, 0)
310+
nullIndices = make([]uint64, 0)
322311
}
323-
clearIndexes = append(clearIndexes, uint64(len(b.ids)-1))
324-
b.clearValues[field.Name()] = clearIndexes
312+
nullIndices = append(nullIndices, uint64(len(b.ids)-1))
313+
b.nullIndices[field.Name()] = nullIndices
325314

326315
} else {
327316
b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], nilSentinel)
@@ -425,11 +414,6 @@ func (b *Batch) doTranslation() error {
425414
}
426415
}
427416

428-
for _, idIndexes := range b.clearValues {
429-
for i, index := range idIndexes {
430-
idIndexes[i] = b.ids[index]
431-
}
432-
}
433417
return nil
434418
}
435419

@@ -511,77 +495,53 @@ func (b *Batch) importValueData() error {
511495
if shardWidth == 0 {
512496
shardWidth = pilosa.DefaultShardWidth
513497
}
514-
515498
eg := errgroup.Group{}
516-
curShard := b.ids[0] / shardWidth
517-
startIdx := 0
518-
for i := 1; i <= len(b.ids); i++ {
519-
// when i==len(b.ids) we ensure that the import logic gets run
520-
// by making a fake shard once we're past the last ID
521-
recordID := (curShard + 2) * shardWidth
522-
if i < len(b.ids) {
523-
recordID = b.ids[i]
524-
}
525-
if recordID/shardWidth != curShard {
526-
endIdx := i
527-
ids := b.ids[startIdx:endIdx]
528-
for field, values := range b.values {
529-
field := field
530-
shard := curShard
531-
vslice := values[startIdx:endIdx]
532-
eg.Go(func() error {
533-
err := b.client.ImportValues(b.index.Name(), field, shard, vslice, ids, false)
534-
return errors.Wrapf(err, "importing values for %s", field)
535-
})
536-
}
537-
startIdx = i
538-
curShard = recordID / shardWidth
539-
}
540-
}
541-
542-
err := eg.Wait()
543-
if err != nil {
544-
return errors.Wrap(err, "importing value data")
545-
}
546499

547-
// Now we clear any values for which we got a nil.
548-
//
549-
// TODO we need an endpoint which lets us set and clear
550-
// transactionally... this is kind of a hack.
551-
maxLen := 0
552-
for _, ids := range b.clearValues {
553-
if len(ids) > maxLen {
554-
maxLen = len(ids)
500+
ids := make([]uint64, len(b.ids))
501+
for field, values := range b.values {
502+
// grow our temp ids slice to full length
503+
ids = ids[:len(b.ids)]
504+
// copy orig ids back in
505+
copy(ids, b.ids)
506+
507+
// trim out null values from ids and values.
508+
nullIndices := b.nullIndices[field]
509+
for i, nullIndex := range nullIndices {
510+
nullIndex -= uint64(i) // offset the index by the number of items removed so far
511+
ids = append(ids[:nullIndex], ids[nullIndex+1:]...)
512+
values = append(values[:nullIndex], values[nullIndex+1:]...)
555513
}
556-
}
557-
eg = errgroup.Group{}
558-
values := make([]int64, 0, maxLen)
559-
for field, ids := range b.clearValues {
560-
// TODO maybe sort ids here
561-
curShard := b.ids[0] / shardWidth
514+
515+
// now do imports by shard
516+
curShard := ids[0] / shardWidth
562517
startIdx := 0
563518
for i := 1; i <= len(ids); i++ {
564-
recordID := (curShard + 2) * shardWidth
519+
var recordID uint64
565520
if i < len(ids) {
566-
recordID = b.ids[i]
521+
recordID = ids[i]
522+
} else {
523+
recordID = (curShard + 2) * shardWidth
567524
}
525+
568526
if recordID/shardWidth != curShard {
569527
endIdx := i
570-
idSlice := ids[startIdx:endIdx]
571-
values := values[:len(idSlice)]
572-
field := field
573528
shard := curShard
529+
field := field
530+
path, data, err := b.client.EncodeImportValues(b.index.Name(), field, shard, values[startIdx:endIdx], ids[startIdx:endIdx], false)
531+
if err != nil {
532+
return errors.Wrap(err, "encoding import values")
533+
}
574534
eg.Go(func() error {
575-
err := b.client.ImportValues(b.index.Name(), field, shard, values, idSlice, true)
576-
return errors.Wrap(err, "clearing values")
535+
err := b.client.DoImportValues(b.index.Name(), shard, path, data)
536+
return errors.Wrapf(err, "importing values for %s", field)
577537
})
578538
startIdx = i
579539
curShard = recordID / shardWidth
580540
}
581541
}
582542
}
583-
584-
return errors.Wrap(eg.Wait(), "importing clear value data")
543+
err := eg.Wait()
544+
return errors.Wrap(err, "importing value data")
585545
}
586546

587547
// reset is called at the end of importing to ready the batch for the
@@ -602,8 +562,8 @@ func (b *Batch) reset() {
602562
for k := range b.values {
603563
delete(b.values, k) // TODO pool these slices
604564
}
605-
for k := range b.clearValues {
606-
delete(b.clearValues, k) // TODO pool these slices
565+
for k := range b.nullIndices {
566+
delete(b.nullIndices, k) // TODO pool these slices
607567
}
608568
}
609569

Diff for: gpexp/importbatch_test.go

+62-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,66 @@ import (
1212

1313
// TODO test against cluster
1414

15+
func TestImportBatchInts(t *testing.T) {
16+
client := pilosa.DefaultClient()
17+
schema := pilosa.NewSchema()
18+
idx := schema.Index("gopilosatest-blah")
19+
field := idx.Field("anint", pilosa.OptFieldTypeInt())
20+
err := client.SyncSchema(schema)
21+
if err != nil {
22+
t.Fatalf("syncing schema: %v", err)
23+
}
24+
25+
b, err := NewBatch(client, 3, idx, []*pilosa.Field{field})
26+
if err != nil {
27+
t.Fatalf("getting batch: %v", err)
28+
}
29+
30+
r := Row{Values: make([]interface{}, 1)}
31+
32+
for i := uint64(0); i < 3; i++ {
33+
r.ID = i
34+
r.Values[0] = int64(i)
35+
err := b.Add(r)
36+
if err != nil && err != ErrBatchNowFull {
37+
t.Fatalf("adding to batch: %v", err)
38+
}
39+
}
40+
err = b.Import()
41+
if err != nil {
42+
t.Fatalf("importing: %v", err)
43+
}
44+
45+
r.ID = uint64(0)
46+
r.Values[0] = nil
47+
err = b.Add(r)
48+
if err != nil {
49+
t.Fatalf("adding after import: %v", err)
50+
}
51+
r.ID = uint64(1)
52+
r.Values[0] = int64(7)
53+
err = b.Add(r)
54+
if err != nil {
55+
t.Fatalf("adding second after import: %v", err)
56+
}
57+
58+
err = b.Import()
59+
if err != nil {
60+
t.Fatalf("second import: %v", err)
61+
}
62+
63+
resp, err := client.Query(idx.BatchQuery(field.Equals(0), field.Equals(7), field.Equals(2)))
64+
if err != nil {
65+
t.Fatalf("querying: %v", err)
66+
}
67+
68+
for i, result := range resp.Results() {
69+
if !reflect.DeepEqual(result.Row().Columns, []uint64{uint64(i)}) {
70+
t.Errorf("expected %v for %d, but got %v", []uint64{uint64(i)}, i, result.Row().Columns)
71+
}
72+
}
73+
}
74+
1575
func TestBatches(t *testing.T) {
1676
client := pilosa.DefaultClient()
1777
schema := pilosa.NewSchema()
@@ -90,8 +150,8 @@ func TestBatches(t *testing.T) {
90150
if !reflect.DeepEqual(b.values["three"], []int64{99, -10, 99, -10, 99, -10, 99, -10, 0}) {
91151
t.Fatalf("unexpected values: %v", b.values["three"])
92152
}
93-
if !reflect.DeepEqual(b.clearValues["three"], []uint64{8}) {
94-
t.Fatalf("unexpected clearValues: %v", b.clearValues["three"])
153+
if !reflect.DeepEqual(b.nullIndices["three"], []uint64{8}) {
154+
t.Fatalf("unexpected nullIndices: %v", b.nullIndices["three"])
95155
}
96156

97157
if len(b.toTranslate["one"]) != 2 {

0 commit comments

Comments
 (0)