Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 0e4d0cb

Browse files
authored
Merge pull request #2127 from travisturner/backport-pr2084
Backport pr2084
2 parents a24482e + 5ebd7e8 commit 0e4d0cb

File tree

4 files changed

+170
-30
lines changed

4 files changed

+170
-30
lines changed

field.go

+30-26
Original file line numberDiff line numberDiff line change
@@ -1269,23 +1269,45 @@ func (f *Field) importValue(columnIDs []uint64, values []int64, options *ImportO
12691269
return errors.Wrap(ErrBSIGroupNotFound, f.name)
12701270
}
12711271

1272-
// Find the lowest/highest values.
1272+
// We want to determine the required bit depth, in case the field doesn't
1273+
// have as many bits currently as would be needed to represent these values,
1274+
// but only if the values are in-range for the field.
12731275
var min, max int64
1274-
for i, value := range values {
1275-
if i == 0 || value < min {
1276-
min = value
1276+
if len(values) > 0 {
1277+
min, max = values[0], values[0]
1278+
}
1279+
1280+
// Split import data by fragment.
1281+
dataByFragment := make(map[importKey]importValueData)
1282+
for i := range columnIDs {
1283+
columnID, value := columnIDs[i], values[i]
1284+
if value > bsig.Max {
1285+
return fmt.Errorf("%v, columnID=%v, value=%v", ErrBSIGroupValueTooHigh, columnID, value)
1286+
} else if value < bsig.Min {
1287+
return fmt.Errorf("%v, columnID=%v, value=%v", ErrBSIGroupValueTooLow, columnID, value)
12771288
}
1278-
if i == 0 || value > max {
1289+
if value > max {
12791290
max = value
12801291
}
1292+
if value < min {
1293+
min = value
1294+
}
1295+
1296+
// Attach value to each bsiGroup view.
1297+
for _, name := range []string{viewName} {
1298+
key := importKey{View: name, Shard: columnID / ShardWidth}
1299+
data := dataByFragment[key]
1300+
data.ColumnIDs = append(data.ColumnIDs, columnID)
1301+
data.Values = append(data.Values, value)
1302+
dataByFragment[key] = data
1303+
}
12811304
}
12821305

12831306
// Determine the highest bit depth required by the min & max.
12841307
requiredDepth := bitDepthInt64(min - bsig.Base)
12851308
if v := bitDepthInt64(max - bsig.Base); v > requiredDepth {
12861309
requiredDepth = v
12871310
}
1288-
12891311
// Increase bit depth if required.
12901312
if requiredDepth > bsig.BitDepth {
12911313
if err := func() error {
@@ -1297,26 +1319,8 @@ func (f *Field) importValue(columnIDs []uint64, values []int64, options *ImportO
12971319
}(); err != nil {
12981320
return errors.Wrap(err, "increasing bsi bit depth")
12991321
}
1300-
}
1301-
1302-
// Split import data by fragment.
1303-
dataByFragment := make(map[importKey]importValueData)
1304-
for i := range columnIDs {
1305-
columnID, value := columnIDs[i], values[i]
1306-
if value > bsig.Max {
1307-
return fmt.Errorf("%v, columnID=%v, value=%v", ErrBSIGroupValueTooHigh, columnID, value)
1308-
} else if value < bsig.Min {
1309-
return fmt.Errorf("%v, columnID=%v, value=%v", ErrBSIGroupValueTooLow, columnID, value)
1310-
}
1311-
1312-
// Attach value to each bsiGroup view.
1313-
for _, name := range []string{viewName} {
1314-
key := importKey{View: name, Shard: columnID / ShardWidth}
1315-
data := dataByFragment[key]
1316-
data.ColumnIDs = append(data.ColumnIDs, columnID)
1317-
data.Values = append(data.Values, value)
1318-
dataByFragment[key] = data
1319-
}
1322+
} else {
1323+
requiredDepth = bsig.BitDepth
13201324
}
13211325

13221326
// Import into each fragment.

fragment.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -2191,7 +2191,14 @@ func (f *fragment) importValueSmallWrite(columnIDs []uint64, values []int64, bit
21912191
rowSet[uint64(i)] = struct{}{}
21922192
}
21932193
err := f.importPositions(toSet, toClear, rowSet)
2194-
return errors.Wrap(err, "importing positions")
2194+
if err != nil {
2195+
return errors.Wrap(err, "importing positions")
2196+
}
2197+
2198+
// Reset the rowCache.
2199+
f.rowCache = &simpleCache{make(map[uint64]*Row)}
2200+
2201+
return nil
21952202
}
21962203

21972204
// importValue bulk imports a set of range-encoded values.
@@ -2230,6 +2237,9 @@ func (f *fragment) importValue(columnIDs []uint64, values []int64, bitDepth uint
22302237
// We don't actually care, except we want our stats to be accurate.
22312238
f.incrementOpN(totalChanges)
22322239

2240+
// Reset the rowCache.
2241+
f.rowCache = &simpleCache{make(map[uint64]*Row)}
2242+
22332243
// in theory, this should probably have happened anyway, but if enough
22342244
// of the bits matched existing bits, we'll be under our opN estimate, and
22352245
// we want to ensure that the snapshot happens.

fragment_internal_test.go

+63-3
Original file line numberDiff line numberDiff line change
@@ -3363,15 +3363,15 @@ func TestImportMultipleValues(t *testing.T) {
33633363
cols []uint64
33643364
vals []int64
33653365
checkCols []uint64
3366-
checkVals []uint64
3366+
checkVals []int64
33673367
depth uint
33683368
}{
33693369
{
33703370
cols: []uint64{0, 0},
33713371
vals: []int64{97, 100},
33723372
depth: 7,
33733373
checkCols: []uint64{0},
3374-
checkVals: []uint64{100},
3374+
checkVals: []int64{100},
33753375
},
33763376
}
33773377

@@ -3395,7 +3395,7 @@ func TestImportMultipleValues(t *testing.T) {
33953395
if !exists {
33963396
t.Errorf("column %d should exist", cc)
33973397
}
3398-
if n != 100 {
3398+
if n != cv {
33993399
t.Errorf("wrong value: %d is not %d", n, cv)
34003400
}
34013401
}
@@ -3405,6 +3405,66 @@ func TestImportMultipleValues(t *testing.T) {
34053405
}
34063406
}
34073407

3408+
func TestImportValueRowCache(t *testing.T) {
3409+
type testCase struct {
3410+
cols []uint64
3411+
vals []int64
3412+
checkCols []uint64
3413+
depth uint
3414+
}
3415+
tests := []struct {
3416+
tc1 testCase
3417+
tc2 testCase
3418+
}{
3419+
{
3420+
tc1: testCase{
3421+
cols: []uint64{2},
3422+
vals: []int64{1},
3423+
depth: 1,
3424+
checkCols: []uint64{2},
3425+
},
3426+
tc2: testCase{
3427+
cols: []uint64{1000},
3428+
vals: []int64{1},
3429+
depth: 1,
3430+
checkCols: []uint64{2, 1000},
3431+
},
3432+
},
3433+
}
3434+
3435+
for i, test := range tests {
3436+
for _, maxOpN := range []int{1, 10000} {
3437+
t.Run(fmt.Sprintf("%dMaxOpN%d", i, maxOpN), func(t *testing.T) {
3438+
f := mustOpenBSIFragment("i", "f", viewBSIGroupPrefix+"foo", 0)
3439+
f.MaxOpN = maxOpN
3440+
defer f.Clean(t)
3441+
3442+
// First import (tc1)
3443+
if err := f.importValue(test.tc1.cols, test.tc1.vals, test.tc1.depth, false); err != nil {
3444+
t.Fatalf("importing values: %v", err)
3445+
}
3446+
3447+
if r, err := f.rangeOp(pql.GT, test.tc1.depth, 0); err != nil {
3448+
t.Error("getting range of values")
3449+
} else if !reflect.DeepEqual(r.Columns(), test.tc1.checkCols) {
3450+
t.Errorf("wrong column values. expected: %v, but got: %v", test.tc1.checkCols, r.Columns())
3451+
}
3452+
3453+
// Second import (tc2)
3454+
if err := f.importValue(test.tc2.cols, test.tc2.vals, test.tc2.depth, false); err != nil {
3455+
t.Fatalf("importing values: %v", err)
3456+
}
3457+
3458+
if r, err := f.rangeOp(pql.GT, test.tc2.depth, 0); err != nil {
3459+
t.Error("getting range of values")
3460+
} else if !reflect.DeepEqual(r.Columns(), test.tc2.checkCols) {
3461+
t.Errorf("wrong column values. expected: %v, but got: %v", test.tc2.checkCols, r.Columns())
3462+
}
3463+
})
3464+
}
3465+
}
3466+
}
3467+
34083468
func TestFragmentConcurrentReadWrite(t *testing.T) {
34093469
f := mustOpenFragment("i", "f", viewStandard, 0, CacheTypeRanked)
34103470
defer f.Clean(t)

http/client_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,72 @@ func TestClient_ImportKeys(t *testing.T) {
777777
})
778778
}
779779

780+
func TestClient_ImportIDs(t *testing.T) {
781+
// Ensure that running a query between two imports does
782+
// not affect the result set. It turns out, this is caused
783+
// by the fragment.rowCache failing to be cleared after an
784+
// importValue. This ensures that the rowCache is cleared
785+
// after an import.
786+
t.Run("ImportRangeImport", func(t *testing.T) {
787+
cluster := test.MustRunCluster(t, 1)
788+
defer cluster.Close()
789+
cmd := cluster[0]
790+
host := cmd.URL()
791+
holder := cmd.Server.Holder()
792+
hldr := test.Holder{Holder: holder}
793+
794+
idxName := "i"
795+
fldName := "f"
796+
797+
// Load bitmap into cache to ensure cache gets updated.
798+
index := hldr.MustCreateIndexIfNotExists(idxName, pilosa.IndexOptions{Keys: false})
799+
_, err := index.CreateFieldIfNotExists(fldName, pilosa.OptFieldTypeInt(-10000, 10000))
800+
if err != nil {
801+
t.Fatal(err)
802+
}
803+
804+
// Send import request.
805+
c := MustNewClient(host, http.GetHTTPClient(nil))
806+
if err := c.ImportValue(context.Background(), idxName, fldName, 0, []pilosa.FieldValue{
807+
{ColumnID: 2, Value: 1},
808+
}); err != nil {
809+
t.Fatal(err)
810+
}
811+
812+
// Verify range.
813+
queryRequest := &pilosa.QueryRequest{
814+
Query: fmt.Sprintf(`Row(%s>0)`, fldName),
815+
Remote: false,
816+
}
817+
818+
if result, err := c.Query(context.Background(), idxName, queryRequest); err != nil {
819+
t.Fatal(err)
820+
} else {
821+
res := result.Results[0].(*pilosa.Row).Columns()
822+
if !reflect.DeepEqual(res, []uint64{2}) {
823+
t.Fatalf("unexpected column ids: %v", res)
824+
}
825+
}
826+
827+
// Send import request.
828+
if err := c.ImportValue(context.Background(), idxName, fldName, 0, []pilosa.FieldValue{
829+
{ColumnID: 1000, Value: 1},
830+
}); err != nil {
831+
t.Fatal(err)
832+
}
833+
834+
// Verify range.
835+
if result, err := c.Query(context.Background(), idxName, queryRequest); err != nil {
836+
t.Fatal(err)
837+
} else {
838+
res := result.Results[0].(*pilosa.Row).Columns()
839+
if !reflect.DeepEqual(res, []uint64{2, 1000}) {
840+
t.Fatalf("unexpected column ids: %v", res)
841+
}
842+
}
843+
})
844+
}
845+
780846
// Ensure client can bulk import value data.
781847
func TestClient_ImportValue(t *testing.T) {
782848
cluster := test.MustRunCluster(t, 1)

0 commit comments

Comments
 (0)