Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"
"unsafe"

"github.com/Azure/sonic-mgmt-common/translib/db"
"github.com/sonic-net/sonic-gnmi/common_utils"
spb "github.com/sonic-net/sonic-gnmi/proto"
sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi"
Expand Down Expand Up @@ -6230,6 +6231,107 @@ func TestGnoiAuthorization(t *testing.T) {
s.Stop()
}

func TestSubscriptionDeduplication(t *testing.T) {
// Create the server
s := createServer(t, 8081)
s.config.EnableTranslation = true
go runServer(t, s)
defer s.Stop()

rclient := db.RedisClient(db.ConfigDB)
defer db.CloseRedisClient(rclient)

// The server is ready - now a request is needed.
tlsConfig := &tls.Config{InsecureSkipVerify: true}
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}

targetAddr := fmt.Sprintf("127.0.0.1:%d", s.config.Port)
conn, err := grpc.Dial(targetAddr, opts...)
if err != nil {
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
}
defer conn.Close()
gClient := pb.NewGNMIClient(conn)

samplePath, _ := xpath.ToGNMIPath("/interfaces/interface[name=Ethernet1/1/1]/config/description")
subReq := &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Subscribe{
Subscribe: &pb.SubscriptionList{
Prefix: &pb.Path{Origin: "openconfig", Target: "OC_YANG"},
Mode: pb.SubscriptionList_STREAM,
Encoding: pb.Encoding_PROTO,
Subscription: []*pb.Subscription{
{
Path: samplePath,
Mode: pb.SubscriptionMode_SAMPLE,
SampleInterval: 1000000000, // 1 second
},
},
},
},
}

updates := make([][]*pb.SubscribeResponse, 2)
wg := sync.WaitGroup{}

// Start a goroutine to change the description of the port every 100ms.
wg.Add(1)
go func() {
defer wg.Done()
end := time.Now().Add(5 * time.Second)
for time.Now().Before(end) {
if err := rclient.HSet(context.Background(), "PORT|Ethernet1/1/1", "description", time.Now().String()).Err(); err != nil {
t.Errorf("Failed to set description: %v", err)
return
}
time.Sleep(100 * time.Millisecond)
}
}()

// Start two Sample subscriptions and collect their responses.
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
subUpdates := []*pb.SubscribeResponse{}
ctx, cancel := context.WithTimeout(context.Background(), 5500*time.Millisecond)
defer cancel()

stream, err := gClient.Subscribe(ctx, grpc.MaxCallRecvMsgSize(6000000))
if err != nil {
t.Error(err.Error())
return
}
if err = stream.Send(subReq); err != nil {
t.Errorf("Failed to send subscription: %v", err)
return
}

syncReceived := false
for {
resp, err := stream.Recv()
if err != nil {
break
}
if resp.GetSyncResponse() {
syncReceived = true
continue
}
if syncReceived {
subUpdates = append(subUpdates, resp)
}
}
updates[i] = subUpdates
}(i)
time.Sleep(300 * time.Millisecond)
}
wg.Wait()

if !reflect.DeepEqual(updates[0], updates[1]) {
t.Errorf("Updates received do not match for identical subscriptions!\nFirst Client's Updates:%v\nSeconds Client's Updates:%v", updates[0], updates[1])
}
}

func init() {
// Enable logs at UT setup
flag.Lookup("v").Value.Set("10")
Expand Down
203 changes: 203 additions & 0 deletions sonic_data_client/super_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package client

import (
"fmt"
log "github.com/golang/glog"
"github.com/golang/protobuf/proto"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
spb "github.com/sonic-net/sonic-gnmi/proto"
"sync"
"sync/atomic"
"time"
)

var superSubs = superSubscriptions{
mu: &sync.Mutex{},
subs: map[*superSubscription]bool{},
}

type superSubscriptions struct {
mu *sync.Mutex
subs map[*superSubscription]bool
}

// superSubscription is used to deduplicate subscriptions. Stream Subscriptions
// become part of a superSubscription and whenever a Sample is processed, the
// response is sent to all clients that are part of the superSubscription.
type superSubscription struct {
mu *sync.RWMutex
clients map[*TranslClient]struct{}
request *gnmipb.SubscriptionList
primaryClient *TranslClient
tickers map[int]*time.Ticker // map of interval duration (nanoseconds) to ticker.
sharedUpdates atomic.Uint64
exclusiveUpdates atomic.Uint64
}

// ------------- Super Subscription Functions -------------
// createSuperSubscription takes a SubscriptionList and returns a new
// superSubscription for that SubscriptionList. This function expects the
// caller to already hold superSubs.mu before calling createSuperSubscription.
func createSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
if subscription == nil {
return nil
}
newSuperSub := &superSubscription{
mu: &sync.RWMutex{},
clients: map[*TranslClient]struct{}{},
request: subscription,
primaryClient: nil,
tickers: map[int]*time.Ticker{},
sharedUpdates: atomic.Uint64{},
exclusiveUpdates: atomic.Uint64{},
}
if _, ok := superSubs.subs[newSuperSub]; ok {
// This should never happen.
log.V(0).Infof("Super Subscription (%p) for %v already exists but a new has been created!", newSuperSub, subscription)
}
superSubs.subs[newSuperSub] = true
return newSuperSub
}

// findSuperSubscription takes a SubscriptionList and tries to find an
// existing superSubscription for that SubscriptionList. If one is found,
// the superSubscription is returned. Else, nil is returned. This function
// expects the caller to already hold superSubs.mu before calling findSuperSubscription.
func findSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
if subscription == nil {
return nil
}
for sub, _ := range superSubs.subs {
if sub.request == nil {
continue
}
if proto.Equal(sub.request, subscription) {
return sub
}
}
return nil
}

// deleteSuperSub removes superSub from the superSubs map.
// If the superSub is removed from the TranslClient, there
// should be no remaining references to the superSub. This
// function expects the caller to already hold superSubs.mu
// before calling deleteSuperSubscription.
func deleteSuperSubscription(superSub *superSubscription) {
if superSub == nil {
log.V(0).Info("deleteSuperSubscription called on a nil Super Subscription!")
return
}
tickerCleanup(superSub.tickers)
delete(superSubs.subs, superSub)
}

// ------------- Super Subscription Methods -------------
// sendNotifications takes a value and adds it to the notification
// queue for each subscription in the superSubscription.
func (ss *superSubscription) sendNotifications(v *spb.Value) {
if v == nil {
return
}
ss.mu.RLock()
defer ss.mu.RUnlock()
for client, _ := range ss.clients {
value := proto.Clone(v).(*spb.Value)
client.q.Put(Value{value})
}
}

// populateTickers populates the ticker_info objects in the intervalToTickerInfoMap with the
// shared tickers. If tickers don't exist yet, they are created.
func (ss *superSubscription) populateTickers(intervalToTickerInfoMap map[int][]*ticker_info) error {
if intervalToTickerInfoMap == nil {
return fmt.Errorf("Invalid intervalToTickerInfoMap passed in: %v", intervalToTickerInfoMap)
}
ss.mu.Lock()
defer ss.mu.Unlock()
if len(ss.tickers) == 0 {
// Create the tickers.
for interval, tInfos := range intervalToTickerInfoMap {
ticker := time.NewTicker(time.Duration(interval) * time.Nanosecond)
ss.tickers[interval] = ticker
for _, tInfo := range tInfos {
tInfo.t = ticker
}
}
return nil
}
// Use the existing tickers.
if len(ss.tickers) != len(intervalToTickerInfoMap) {
return fmt.Errorf("Length of intervalToTickerInfoMap does not match length of existing tickers for Super Subscription! existing tickers=%v, intervalToTickerInfoMap=%v", ss.tickers, intervalToTickerInfoMap)
}
for interval, tInfos := range intervalToTickerInfoMap {
ticker, ok := ss.tickers[interval]
if !ok {
return fmt.Errorf("Interval in intervalToTickerInfoMap not found in existing tickers for Super Subscription! interval=%v", interval)
}
for _, tInfo := range tInfos {
tInfo.t = ticker
}
}
return nil
}
func (ss *superSubscription) String() string {
return fmt.Sprintf("[{%p} NumClients=%d, SharedUpdates=%d, ExclusiveUpdates=%d, Request=%v]", ss, len(ss.clients), ss.sharedUpdates.Load(), ss.exclusiveUpdates.Load(), ss.request)
}

// ------------- TranslClient Methods -------------
// isPrimary returns true if the client is the primary client of its superSubscription.
func (c *TranslClient) isPrimary() bool {
if c == nil || c.superSub == nil {
return false
}
c.superSub.mu.RLock()
defer c.superSub.mu.RUnlock()
return c.superSub.primaryClient == c
}

// leaveSuperSubscription removes the client from the superSubscription.
// If there are no remaining clients in the superSubscription, it is deleted.
func (c *TranslClient) leaveSuperSubscription() {
if c == nil || c.superSub == nil {
return
}
superSubs.mu.Lock()
defer superSubs.mu.Unlock()
c.superSub.mu.Lock()
defer c.superSub.mu.Unlock()
delete(c.superSub.clients, c)
if len(c.superSub.clients) == 0 {
deleteSuperSubscription(c.superSub)
log.V(2).Infof("SuperSubscription (%s) closing!", c.superSub)
} else if c.superSub.primaryClient == c {
// Set a new primary client.
for client := range c.superSub.clients {
c.superSub.primaryClient = client
client.wakeChan <- true
break
}
log.V(2).Infof("SuperSubscription (%s): %p is now the primary client", c.superSub, c.superSub.primaryClient)
}
}

// addClientToSuperSubscription adds a client to a superSubscription.
func (c *TranslClient) addClientToSuperSubscription(subscription *gnmipb.SubscriptionList) {
if c == nil || subscription == nil {
return
}
superSubs.mu.Lock()
defer superSubs.mu.Unlock()
superSub := findSuperSubscription(subscription)
if superSub == nil {
superSub = createSuperSubscription(subscription)
}
superSub.mu.Lock()
defer superSub.mu.Unlock()
c.superSub = superSub
superSub.clients[c] = struct{}{}
if superSub.primaryClient == nil {
superSub.primaryClient = c
}
log.V(2).Infof("SuperSubscription (%s): added new client=%p", superSub, c)
}
Loading
Loading