Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,9 +1119,10 @@ func prepareDbTranslib(t *testing.T) {

// subscriptionQuery represent the input to create an gnmi.Subscription instance.
type subscriptionQuery struct {
Query []string
SubMode pb.SubscriptionMode
SampleInterval uint64
Query []string
SubMode pb.SubscriptionMode
SampleInterval uint64
SuppressRedundant bool
}

func pathToString(q client.Path) string {
Expand All @@ -1136,28 +1137,26 @@ func pathToString(q client.Path) string {
}

// createQuery creates a client.Query with the given args. It assigns query.SubReq.
func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []subscriptionQuery, updatesOnly bool) (*client.Query, error) {
func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []subscriptionQuery) (*client.Query, error) {
s := &pb.SubscribeRequest_Subscribe{
Subscribe: &pb.SubscriptionList{
Mode: subListMode,
Prefix: &pb.Path{Target: target},
},
}
if updatesOnly {
s.Subscribe.UpdatesOnly = true
}

for _, qq := range queries {
pp, err := ygot.StringToPath(pathToString(qq.Query), ygot.StructuredPath, ygot.StringSlicePath)
if err != nil {
return nil, fmt.Errorf("invalid query path %q: %v", qq, err)
return nil, fmt.Errorf("invalid query path %q: %v", qq.Query, err)
}
s.Subscribe.Subscription = append(
s.Subscribe.Subscription,
&pb.Subscription{
Path: pp,
Mode: qq.SubMode,
SampleInterval: qq.SampleInterval,
Path: pp,
Mode: qq.SubMode,
SampleInterval: qq.SampleInterval,
SuppressRedundant: qq.SuppressRedundant,
})
}

Expand All @@ -1168,8 +1167,8 @@ func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []
}

// createQueryOrFail creates a query, in case of a failure it fails the test.
func createQueryOrFail(t *testing.T, subListMode pb.SubscriptionList_Mode, target string, queries []subscriptionQuery, updatesOnly bool) client.Query {
q, err := createQuery(subListMode, target, queries, updatesOnly)
func createQueryOrFail(t *testing.T, subListMode pb.SubscriptionList_Mode, target string, queries []subscriptionQuery) client.Query {
q, err := createQuery(subListMode, target, queries)
if err != nil {
t.Fatalf("failed to create query: %v", err)
}
Expand All @@ -1187,8 +1186,7 @@ func createEventsQuery(t *testing.T, paths ...string) client.Query {
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
},
},
false)
})
}

func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
Expand All @@ -1200,8 +1198,7 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
},
},
false)
})
}

// createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode.
Expand All @@ -1214,8 +1211,7 @@ func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Que
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
},
},
false)
})
}

// createRatesTableSetUpdate creates a HSET request on the RATES table.
Expand All @@ -1231,18 +1227,18 @@ func createRatesTableSetUpdate(tableKey string, fieldName string, fieldValue str
}

// createCountersDbQuerySampleMode creates a query with SAMPLE mode.
func createCountersDbQuerySampleMode(t *testing.T, interval time.Duration, updateOnly bool, paths ...string) client.Query {
func createCountersDbQuerySampleMode(t *testing.T, interval time.Duration, suppressRedundant bool, paths ...string) client.Query {
return createQueryOrFail(t,
pb.SubscriptionList_STREAM,
"COUNTERS_DB",
[]subscriptionQuery{
{
Query: paths,
SubMode: pb.SubscriptionMode_SAMPLE,
SampleInterval: uint64(interval.Nanoseconds()),
Query: paths,
SubMode: pb.SubscriptionMode_SAMPLE,
SampleInterval: uint64(interval.Nanoseconds()),
SuppressRedundant: suppressRedundant,
},
},
updateOnly)
})
}

// createCountersTableSetUpdate creates a HSET request on the COUNTERS table.
Expand Down Expand Up @@ -3610,7 +3606,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
},
{
desc: "(update only) sample stream query for table key Ethernet*/Pfcwd with field value update",
desc: "(suppress_redundant) sample stream query for table key Ethernet*/Pfcwd with field value update",
generateIntervals: true,
q: createCountersDbQuerySampleMode(t, 0, true, "COUNTERS", "Ethernet*", "Pfcwd"),
updates: []tablePathValue{
Expand Down
22 changes: 11 additions & 11 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
if subMode == gnmipb.SubscriptionMode_SAMPLE {
c.w.Add(1) // wait group to indicate the streaming session is complete.
c.synced.Add(1) // wait group to indicate whether sync_response is sent.
go streamSampleSubscription(c, sub, subscribe.GetUpdatesOnly())
go streamSampleSubscription(c, sub)
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
c.w.Add(1)
c.synced.Add(1)
Expand Down Expand Up @@ -269,13 +269,13 @@ func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
}
} else {
// sample interval and update only parameters are not applicable
// sample interval and suppress redundant parameters are not applicable
go dbTableKeySubscribe(c, gnmiPath, 0, true)
}
}

// streamSampleSubscription implements Subscription "SAMPLE STREAM" mode
func streamSampleSubscription(c *DbClient, sub *gnmipb.Subscription, updateOnly bool) {
func streamSampleSubscription(c *DbClient, sub *gnmipb.Subscription) {
samplingInterval, err := validateSampleInterval(sub)
if err != nil {
enqueueFatalMsg(c, err.Error())
Expand All @@ -289,12 +289,12 @@ func streamSampleSubscription(c *DbClient, sub *gnmipb.Subscription, updateOnly
log.V(2).Infof("streamSampleSubscription gnmiPath: %v", gnmiPath)
if tblPaths[0].field != "" {
if len(tblPaths) > 1 {
dbFieldMultiSubscribe(c, gnmiPath, false, samplingInterval, updateOnly)
dbFieldMultiSubscribe(c, gnmiPath, false, samplingInterval, sub.GetSuppressRedundant())
} else {
dbFieldSubscribe(c, gnmiPath, false, samplingInterval)
}
} else {
dbTableKeySubscribe(c, gnmiPath, samplingInterval, updateOnly)
dbTableKeySubscribe(c, gnmiPath, samplingInterval, sub.GetSuppressRedundant())
}
}

Expand Down Expand Up @@ -1129,9 +1129,9 @@ func putFatalMsg(q *queue.PriorityQueue, msg string) {
// dbFieldMultiSubscribe would read a field from multiple tables and put to output queue.
// It handles queries like "COUNTERS/Ethernet*/xyz" where the path translates to a field in multiple tables.
// For SAMPLE mode, it would send periodically regardless of change.
// However, if `updateOnly` is true, the payload would include only the changed fields.
// However, if `suppressRedundant` is true, the payload would include only the changed fields.
// For ON_CHANGE mode, it would send only if the value has changed since the last update.
func dbFieldMultiSubscribe(c *DbClient, gnmiPath *gnmipb.Path, onChange bool, interval time.Duration, updateOnly bool) {
func dbFieldMultiSubscribe(c *DbClient, gnmiPath *gnmipb.Path, onChange bool, interval time.Duration, suppressRedundant bool) {
defer c.w.Done()

tblPaths := c.pathG2S[gnmiPath]
Expand Down Expand Up @@ -1165,7 +1165,7 @@ func dbFieldMultiSubscribe(c *DbClient, gnmiPath *gnmipb.Path, onChange bool, in

// This value was saved before and it hasn't changed since then
_, valueMapped := path2ValueMap[tblPath]
if (onChange || updateOnly) && valueMapped && val == path2ValueMap[tblPath] {
if (onChange || suppressRedundant) && valueMapped && val == path2ValueMap[tblPath] {
continue
}

Expand Down Expand Up @@ -1405,7 +1405,7 @@ func dbSingleTableKeySubscribe(c *DbClient, rsd redisSubData, updateChannel chan
// dbTableKeySubscribe subscribes to tables using a table keys.
// Handles queries like "COUNTERS/Ethernet0" or "COUNTERS/Ethernet*"
// This function handles both ON_CHANGE and SAMPLE modes. "interval" being 0 is interpreted as ON_CHANGE mode.
func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Duration, updateOnly bool) {
func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Duration, suppressRedundant bool) {
defer c.w.Done()

tblPaths := c.pathG2S[gnmiPath]
Expand Down Expand Up @@ -1505,7 +1505,7 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat
signalSync()

// Clear the payload so that next time it will send only updates
if updateOnly {
if suppressRedundant {
msiAll = make(map[string]interface{})
}

Expand Down Expand Up @@ -1547,7 +1547,7 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat
}

// Clear the payload so that next time it will send only updates
if updateOnly {
if suppressRedundant {
msiAll = make(map[string]interface{})
log.V(6).Infof("msiAll cleared: %v", len(msiAll))
}
Expand Down
6 changes: 3 additions & 3 deletions sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,7 +1989,7 @@ func (c *MixedDbClient) dbSingleTableKeySubscribe(rsd redisSubData, updateChanne
// dbTableKeySubscribe subscribes to tables using a table keys.
// Handles queries like "COUNTERS/Ethernet0" or "COUNTERS/Ethernet*"
// This function handles both ON_CHANGE and SAMPLE modes. "interval" being 0 is interpreted as ON_CHANGE mode.
func (c *MixedDbClient) dbTableKeySubscribe(gnmiPath *gnmipb.Path, interval time.Duration, updateOnly bool) {
func (c *MixedDbClient) dbTableKeySubscribe(gnmiPath *gnmipb.Path, interval time.Duration, suppressRedundant bool) {
defer c.w.Done()

msiAll := make(map[string]interface{})
Expand Down Expand Up @@ -2106,7 +2106,7 @@ func (c *MixedDbClient) dbTableKeySubscribe(gnmiPath *gnmipb.Path, interval time
signalSync()

// Clear the payload so that next time it will send only updates
if updateOnly {
if suppressRedundant {
msiAll = make(map[string]interface{})
}

Expand Down Expand Up @@ -2148,7 +2148,7 @@ func (c *MixedDbClient) dbTableKeySubscribe(gnmiPath *gnmipb.Path, interval time
}

// Clear the payload so that next time it will send only updates
if updateOnly {
if suppressRedundant {
msiAll = make(map[string]interface{})
log.V(6).Infof("msiAll cleared: %v", len(msiAll))
}
Expand Down
Loading