Skip to content

Commit ca5cdf5

Browse files
weiliu1031claude
andauthored
fix: [2.6] set ValidData on user-provided $meta before partial update merge (milvus-io#48085)
## Summary Cherry-pick of milvus-io#48084 to 2.6 branch. When partial upsert includes user-provided dynamic fields and the batch mixes existing rows (update) with new rows (insert), the `$meta` field lacks `ValidData`, causing `FillWithDefaultValue` to fail with `the length of valid_data of field($meta) is wrong`. Fix: initialize all-true `ValidData` on user-provided `$meta` in `PreExecute` before `queryPreExecute` runs. pr: milvus-io#48084 issue: milvus-io#47957 --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fc6fe84 commit ca5cdf5

File tree

3 files changed

+433
-0
lines changed

3 files changed

+433
-0
lines changed

internal/proxy/task_upsert.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,18 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error {
306306
fieldData.FieldId = fieldSchema.GetFieldID()
307307
fieldData.FieldName = fieldName
308308

309+
// Ensure dynamic field has ValidData before merge logic.
310+
// SDK doesn't set ValidData on $meta; without it, AppendFieldDataByColumn
311+
// won't propagate ValidData for insert rows, causing length mismatch.
312+
if fieldData.GetIsDynamic() && len(fieldData.GetValidData()) == 0 {
313+
nRows := int(it.upsertMsg.InsertMsg.NRows())
314+
validData := make([]bool, nRows)
315+
for i := range validData {
316+
validData[i] = true
317+
}
318+
fieldData.ValidData = validData
319+
}
320+
309321
// compatible with different nullable/default_value data format from sdk
310322
if len(fieldData.GetValidData()) != 0 {
311323
var err error

internal/proxy/task_upsert_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package proxy
1717

1818
import (
1919
"context"
20+
"encoding/json"
2021
"testing"
2122

2223
"github.com/bytedance/mockey"
@@ -35,6 +36,7 @@ import (
3536
"github.com/milvus-io/milvus/internal/proxy/shardclient"
3637
"github.com/milvus-io/milvus/internal/util/function/embedding"
3738
"github.com/milvus-io/milvus/internal/util/segcore"
39+
"github.com/milvus-io/milvus/pkg/v2/common"
3840
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
3941
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
4042
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
@@ -2084,3 +2086,199 @@ func TestUpsertTask_queryPreExecute_DefaultValueError(t *testing.T) {
20842086
assert.Error(t, err)
20852087
assert.ErrorIs(t, err, merr.ErrParameterInvalid)
20862088
}
2089+
2090+
func TestUpsertTask_queryPreExecute_DynamicFieldValidData(t *testing.T) {
2091+
// Schema with dynamic field enabled, simulating a collection with id + value + $meta
2092+
schema := newSchemaInfo(&schemapb.CollectionSchema{
2093+
Name: "test_dynamic_validdata",
2094+
EnableDynamicField: true,
2095+
Fields: []*schemapb.FieldSchema{
2096+
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
2097+
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
2098+
{
2099+
FieldID: 102, Name: common.MetaFieldName, DataType: schemapb.DataType_JSON,
2100+
IsDynamic: true, Nullable: true,
2101+
DefaultValue: &schemapb.ValueField{
2102+
Data: &schemapb.ValueField_StringData{StringData: "{}"},
2103+
},
2104+
},
2105+
},
2106+
})
2107+
2108+
t.Run("dynamic field with ValidData merges correctly", func(t *testing.T) {
2109+
// Upsert 3 rows: IDs 1,2 (update), 3 (insert)
2110+
// User provides dynamic field $meta WITHOUT ValidData
2111+
// queryPreExecute will auto-fill ValidData with all-true before merge
2112+
meta1, _ := json.Marshal(map[string]interface{}{"color": "gold"})
2113+
meta2, _ := json.Marshal(map[string]interface{}{"color": "silver"})
2114+
meta3, _ := json.Marshal(map[string]interface{}{"color": "bronze"})
2115+
2116+
upsertData := []*schemapb.FieldData{
2117+
{
2118+
FieldName: "id", FieldId: 100, Type: schemapb.DataType_Int64,
2119+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}}}}},
2120+
},
2121+
{
2122+
FieldName: "value", FieldId: 101, Type: schemapb.DataType_Int32,
2123+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}}}}},
2124+
},
2125+
{
2126+
FieldName: common.MetaFieldName, FieldId: 102, Type: schemapb.DataType_JSON, IsDynamic: true,
2127+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{
2128+
JsonData: &schemapb.JSONArray{Data: [][]byte{meta1, meta2, meta3}},
2129+
}}},
2130+
// No ValidData — queryPreExecute auto-fills with all-true
2131+
},
2132+
}
2133+
2134+
// Query result: existing PKs 1, 2
2135+
existMeta1, _ := json.Marshal(map[string]interface{}{"color": "red"})
2136+
existMeta2, _ := json.Marshal(map[string]interface{}{"color": "blue"})
2137+
mockQueryResult := &milvuspb.QueryResults{
2138+
Status: merr.Success(),
2139+
FieldsData: []*schemapb.FieldData{
2140+
{
2141+
FieldName: "id", FieldId: 100, Type: schemapb.DataType_Int64,
2142+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2}}}}},
2143+
},
2144+
{
2145+
FieldName: "value", FieldId: 101, Type: schemapb.DataType_Int32,
2146+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{10, 20}}}}},
2147+
},
2148+
{
2149+
FieldName: common.MetaFieldName, FieldId: 102, Type: schemapb.DataType_JSON, IsDynamic: true,
2150+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{
2151+
JsonData: &schemapb.JSONArray{Data: [][]byte{existMeta1, existMeta2}},
2152+
}}},
2153+
},
2154+
},
2155+
}
2156+
2157+
task := &upsertTask{
2158+
ctx: context.Background(),
2159+
schema: schema,
2160+
req: &milvuspb.UpsertRequest{
2161+
FieldsData: upsertData,
2162+
NumRows: 3,
2163+
},
2164+
upsertMsg: &msgstream.UpsertMsg{
2165+
InsertMsg: &msgstream.InsertMsg{
2166+
InsertRequest: &msgpb.InsertRequest{
2167+
FieldsData: upsertData,
2168+
NumRows: 3,
2169+
Version: msgpb.InsertDataVersion_ColumnBased,
2170+
},
2171+
},
2172+
},
2173+
node: &Proxy{},
2174+
}
2175+
2176+
mockRetrieve := mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build()
2177+
defer mockRetrieve.UnPatch()
2178+
2179+
err := task.queryPreExecute(context.Background())
2180+
assert.NoError(t, err)
2181+
2182+
// Verify merged $meta has 3 entries with correct ValidData length
2183+
var metaField *schemapb.FieldData
2184+
for _, f := range task.insertFieldData {
2185+
if f.GetFieldName() == common.MetaFieldName {
2186+
metaField = f
2187+
break
2188+
}
2189+
}
2190+
assert.NotNil(t, metaField)
2191+
metaData := metaField.GetScalars().GetJsonData().GetData()
2192+
assert.Equal(t, 3, len(metaData), "merged $meta should have 3 rows")
2193+
// ValidData should also have 3 entries (2 from update + 1 from insert)
2194+
assert.Equal(t, 3, len(metaField.GetValidData()), "ValidData length should match row count")
2195+
})
2196+
2197+
t.Run("dynamic field without ValidData is auto-filled by queryPreExecute", func(t *testing.T) {
2198+
// This test verifies the fix: when $meta has NO ValidData (SDK behavior),
2199+
// queryPreExecute auto-fills it with all-true, so merge produces correct length
2200+
meta1, _ := json.Marshal(map[string]interface{}{"color": "gold"})
2201+
meta2, _ := json.Marshal(map[string]interface{}{"color": "silver"})
2202+
meta3, _ := json.Marshal(map[string]interface{}{"color": "bronze"})
2203+
2204+
upsertData := []*schemapb.FieldData{
2205+
{
2206+
FieldName: "id", FieldId: 100, Type: schemapb.DataType_Int64,
2207+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}}}}},
2208+
},
2209+
{
2210+
FieldName: "value", FieldId: 101, Type: schemapb.DataType_Int32,
2211+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}}}}},
2212+
},
2213+
{
2214+
FieldName: common.MetaFieldName, FieldId: 102, Type: schemapb.DataType_JSON, IsDynamic: true,
2215+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{
2216+
JsonData: &schemapb.JSONArray{Data: [][]byte{meta1, meta2, meta3}},
2217+
}}},
2218+
// NO ValidData — queryPreExecute will auto-fill
2219+
},
2220+
}
2221+
2222+
existMeta1, _ := json.Marshal(map[string]interface{}{"color": "red"})
2223+
existMeta2, _ := json.Marshal(map[string]interface{}{"color": "blue"})
2224+
mockQueryResult := &milvuspb.QueryResults{
2225+
Status: merr.Success(),
2226+
FieldsData: []*schemapb.FieldData{
2227+
{
2228+
FieldName: "id", FieldId: 100, Type: schemapb.DataType_Int64,
2229+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2}}}}},
2230+
},
2231+
{
2232+
FieldName: "value", FieldId: 101, Type: schemapb.DataType_Int32,
2233+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{10, 20}}}}},
2234+
},
2235+
{
2236+
FieldName: common.MetaFieldName, FieldId: 102, Type: schemapb.DataType_JSON, IsDynamic: true,
2237+
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{
2238+
JsonData: &schemapb.JSONArray{Data: [][]byte{existMeta1, existMeta2}},
2239+
}}},
2240+
},
2241+
},
2242+
}
2243+
2244+
task := &upsertTask{
2245+
ctx: context.Background(),
2246+
schema: schema,
2247+
req: &milvuspb.UpsertRequest{
2248+
FieldsData: upsertData,
2249+
NumRows: 3,
2250+
},
2251+
upsertMsg: &msgstream.UpsertMsg{
2252+
InsertMsg: &msgstream.InsertMsg{
2253+
InsertRequest: &msgpb.InsertRequest{
2254+
FieldsData: upsertData,
2255+
NumRows: 3,
2256+
Version: msgpb.InsertDataVersion_ColumnBased,
2257+
},
2258+
},
2259+
},
2260+
node: &Proxy{},
2261+
}
2262+
2263+
mockRetrieve := mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build()
2264+
defer mockRetrieve.UnPatch()
2265+
2266+
err := task.queryPreExecute(context.Background())
2267+
assert.NoError(t, err)
2268+
2269+
// queryPreExecute auto-fills ValidData on $meta, so merge produces correct length 3
2270+
var metaField *schemapb.FieldData
2271+
for _, f := range task.insertFieldData {
2272+
if f.GetFieldName() == common.MetaFieldName {
2273+
metaField = f
2274+
break
2275+
}
2276+
}
2277+
assert.NotNil(t, metaField)
2278+
metaData := metaField.GetScalars().GetJsonData().GetData()
2279+
assert.Equal(t, 3, len(metaData), "merged $meta should have 3 rows")
2280+
validData := metaField.GetValidData()
2281+
assert.Equal(t, 3, len(validData),
2282+
"queryPreExecute auto-fills ValidData, merge produces correct length 3")
2283+
})
2284+
}

0 commit comments

Comments
 (0)