Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 5d71f04

Browse files
committed
reset fragment.rowCache after importValue
1 parent a24482e commit 5d71f04

File tree

3 files changed

+140
-4
lines changed

3 files changed

+140
-4
lines changed

fragment.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -2191,7 +2191,14 @@ func (f *fragment) importValueSmallWrite(columnIDs []uint64, values []int64, bit
21912191
rowSet[uint64(i)] = struct{}{}
21922192
}
21932193
err := f.importPositions(toSet, toClear, rowSet)
2194-
return errors.Wrap(err, "importing positions")
2194+
if err != nil {
2195+
return errors.Wrap(err, "importing positions")
2196+
}
2197+
2198+
// Reset the rowCache.
2199+
f.rowCache = &simpleCache{make(map[uint64]*Row)}
2200+
2201+
return nil
21952202
}
21962203

21972204
// importValue bulk imports a set of range-encoded values.
@@ -2230,6 +2237,9 @@ func (f *fragment) importValue(columnIDs []uint64, values []int64, bitDepth uint
22302237
// We don't actually care, except we want our stats to be accurate.
22312238
f.incrementOpN(totalChanges)
22322239

2240+
// Reset the rowCache.
2241+
f.rowCache = &simpleCache{make(map[uint64]*Row)}
2242+
22332243
// in theory, this should probably have happened anyway, but if enough
22342244
// of the bits matched existing bits, we'll be under our opN estimate, and
22352245
// we want to ensure that the snapshot happens.

fragment_internal_test.go

+63-3
Original file line numberDiff line numberDiff line change
@@ -3363,15 +3363,15 @@ func TestImportMultipleValues(t *testing.T) {
33633363
cols []uint64
33643364
vals []int64
33653365
checkCols []uint64
3366-
checkVals []uint64
3366+
checkVals []int64
33673367
depth uint
33683368
}{
33693369
{
33703370
cols: []uint64{0, 0},
33713371
vals: []int64{97, 100},
33723372
depth: 7,
33733373
checkCols: []uint64{0},
3374-
checkVals: []uint64{100},
3374+
checkVals: []int64{100},
33753375
},
33763376
}
33773377

@@ -3395,7 +3395,7 @@ func TestImportMultipleValues(t *testing.T) {
33953395
if !exists {
33963396
t.Errorf("column %d should exist", cc)
33973397
}
3398-
if n != 100 {
3398+
if n != cv {
33993399
t.Errorf("wrong value: %d is not %d", n, cv)
34003400
}
34013401
}
@@ -3405,6 +3405,66 @@ func TestImportMultipleValues(t *testing.T) {
34053405
}
34063406
}
34073407

3408+
func TestImportValueRowCache(t *testing.T) {
3409+
type testCase struct {
3410+
cols []uint64
3411+
vals []int64
3412+
checkCols []uint64
3413+
depth uint
3414+
}
3415+
tests := []struct {
3416+
tc1 testCase
3417+
tc2 testCase
3418+
}{
3419+
{
3420+
tc1: testCase{
3421+
cols: []uint64{2},
3422+
vals: []int64{1},
3423+
depth: 1,
3424+
checkCols: []uint64{2},
3425+
},
3426+
tc2: testCase{
3427+
cols: []uint64{1000},
3428+
vals: []int64{1},
3429+
depth: 1,
3430+
checkCols: []uint64{2, 1000},
3431+
},
3432+
},
3433+
}
3434+
3435+
for i, test := range tests {
3436+
for _, maxOpN := range []int{1, 10000} {
3437+
t.Run(fmt.Sprintf("%dMaxOpN%d", i, maxOpN), func(t *testing.T) {
3438+
f := mustOpenBSIFragment("i", "f", viewBSIGroupPrefix+"foo", 0)
3439+
f.MaxOpN = maxOpN
3440+
defer f.Clean(t)
3441+
3442+
// First import (tc1)
3443+
if err := f.importValue(test.tc1.cols, test.tc1.vals, test.tc1.depth, false); err != nil {
3444+
t.Fatalf("importing values: %v", err)
3445+
}
3446+
3447+
if r, err := f.rangeOp(pql.GT, test.tc1.depth, 0); err != nil {
3448+
t.Error("getting range of values")
3449+
} else if !reflect.DeepEqual(r.Columns(), test.tc1.checkCols) {
3450+
t.Errorf("wrong column values. expected: %v, but got: %v", test.tc1.checkCols, r.Columns())
3451+
}
3452+
3453+
// Second import (tc2)
3454+
if err := f.importValue(test.tc2.cols, test.tc2.vals, test.tc2.depth, false); err != nil {
3455+
t.Fatalf("importing values: %v", err)
3456+
}
3457+
3458+
if r, err := f.rangeOp(pql.GT, test.tc2.depth, 0); err != nil {
3459+
t.Error("getting range of values")
3460+
} else if !reflect.DeepEqual(r.Columns(), test.tc2.checkCols) {
3461+
t.Errorf("wrong column values. expected: %v, but got: %v", test.tc2.checkCols, r.Columns())
3462+
}
3463+
})
3464+
}
3465+
}
3466+
}
3467+
34083468
func TestFragmentConcurrentReadWrite(t *testing.T) {
34093469
f := mustOpenFragment("i", "f", viewStandard, 0, CacheTypeRanked)
34103470
defer f.Clean(t)

http/client_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,72 @@ func TestClient_ImportKeys(t *testing.T) {
777777
})
778778
}
779779

780+
func TestClient_ImportIDs(t *testing.T) {
781+
// Ensure that running a query between two imports does
782+
// not affect the result set. It turns out, this is caused
783+
// by the fragment.rowCache failing to be cleared after an
784+
// importValue. This ensures that the rowCache is cleared
785+
// after an import.
786+
t.Run("ImportRangeImport", func(t *testing.T) {
787+
cluster := test.MustRunCluster(t, 1)
788+
defer cluster.Close()
789+
cmd := cluster[0]
790+
host := cmd.URL()
791+
holder := cmd.Server.Holder()
792+
hldr := test.Holder{Holder: holder}
793+
794+
idxName := "i"
795+
fldName := "f"
796+
797+
// Load bitmap into cache to ensure cache gets updated.
798+
index := hldr.MustCreateIndexIfNotExists(idxName, pilosa.IndexOptions{Keys: false})
799+
_, err := index.CreateFieldIfNotExists(fldName, pilosa.OptFieldTypeInt(-10000, 10000))
800+
if err != nil {
801+
t.Fatal(err)
802+
}
803+
804+
// Send import request.
805+
c := MustNewClient(host, http.GetHTTPClient(nil))
806+
if err := c.ImportValue(context.Background(), idxName, fldName, 0, []pilosa.FieldValue{
807+
{ColumnID: 2, Value: 1},
808+
}); err != nil {
809+
t.Fatal(err)
810+
}
811+
812+
// Verify range.
813+
queryRequest := &pilosa.QueryRequest{
814+
Query: fmt.Sprintf(`Row(%s>0)`, fldName),
815+
Remote: false,
816+
}
817+
818+
if result, err := c.Query(context.Background(), idxName, queryRequest); err != nil {
819+
t.Fatal(err)
820+
} else {
821+
res := result.Results[0].(*pilosa.Row).Columns()
822+
if !reflect.DeepEqual(res, []uint64{2}) {
823+
t.Fatalf("unexpected column ids: %v", res)
824+
}
825+
}
826+
827+
// Send import request.
828+
if err := c.ImportValue(context.Background(), idxName, fldName, 0, []pilosa.FieldValue{
829+
{ColumnID: 1000, Value: 1},
830+
}); err != nil {
831+
t.Fatal(err)
832+
}
833+
834+
// Verify range.
835+
if result, err := c.Query(context.Background(), idxName, queryRequest); err != nil {
836+
t.Fatal(err)
837+
} else {
838+
res := result.Results[0].(*pilosa.Row).Columns()
839+
if !reflect.DeepEqual(res, []uint64{2, 1000}) {
840+
t.Fatalf("unexpected column ids: %v", res)
841+
}
842+
}
843+
})
844+
}
845+
780846
// Ensure client can bulk import value data.
781847
func TestClient_ImportValue(t *testing.T) {
782848
cluster := test.MustRunCluster(t, 1)

0 commit comments

Comments
 (0)