Skip to content

Commit ca77044

Browse files
Parallelize GET and SUBSCRIBE processing
Signed-off-by: Niranjani Vivek <niranjaniv@google.com>
1 parent e980d11 commit ca77044

File tree

4 files changed

+454
-36
lines changed

4 files changed

+454
-36
lines changed

gnmi_server/server_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424
"unsafe"
2525

26+
"github.com/Azure/sonic-mgmt-common/translib/db"
2627
"github.com/sonic-net/sonic-gnmi/common_utils"
2728
spb "github.com/sonic-net/sonic-gnmi/proto"
2829
sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi"
@@ -6229,6 +6230,145 @@ func TestGnoiAuthorization(t *testing.T) {
62296230

62306231
s.Stop()
62316232
}
6233+
func TestMultipleSubscribers(t *testing.T) {
6234+
tests := []struct {
6235+
name string
6236+
delay time.Duration // The delay between sending the subscriptions.
6237+
duration time.Duration // The duration of the subscriptions.
6238+
sampleUpdates uint // The number of Sample updates expected based on the duration and sample interval.
6239+
}{
6240+
{
6241+
name: "TestSubscribeWithNoDelay",
6242+
delay: 0 * time.Second,
6243+
duration: 5 * time.Second,
6244+
sampleUpdates: 4,
6245+
},
6246+
{
6247+
name: "TestSubscribeWithPartialDelay",
6248+
delay: 2 * time.Second,
6249+
duration: 5 * time.Second,
6250+
sampleUpdates: 4,
6251+
},
6252+
{
6253+
name: "TestSubscribeWithFullDelay",
6254+
delay: 5 * time.Second,
6255+
duration: 5 * time.Second,
6256+
sampleUpdates: 4,
6257+
},
6258+
}
6259+
6260+
// Create the server
6261+
s := createServer(t, 8081)
6262+
s.config.EnableTranslation = true
6263+
go runServer(t, s)
6264+
defer s.Stop()
6265+
6266+
// The server is ready - now a request is needed.
6267+
tlsConfig := &tls.Config{InsecureSkipVerify: true}
6268+
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}
6269+
6270+
targetAddr := fmt.Sprintf("127.0.0.1:%d", s.config.Port)
6271+
conn, err := grpc.Dial(targetAddr, opts...)
6272+
if err != nil {
6273+
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
6274+
}
6275+
defer conn.Close()
6276+
gClient := pb.NewGNMIClient(conn)
6277+
6278+
samplePath, _ := xpath.ToGNMIPath("/interfaces/interface[name=Ethernet1/1/1]/state/name")
6279+
onChangePath, _ := xpath.ToGNMIPath("/system/state/config-meta-data")
6280+
subReq := &pb.SubscribeRequest{
6281+
Request: &pb.SubscribeRequest_Subscribe{
6282+
Subscribe: &pb.SubscriptionList{
6283+
Prefix: &pb.Path{Origin: "openconfig", Target: "OC_YANG"},
6284+
Mode: pb.SubscriptionList_STREAM,
6285+
Encoding: pb.Encoding_PROTO,
6286+
Subscription: []*pb.Subscription{
6287+
{
6288+
Path: samplePath,
6289+
Mode: pb.SubscriptionMode_SAMPLE,
6290+
SampleInterval: 1000000000, // 1 second
6291+
},
6292+
{
6293+
Path: onChangePath,
6294+
Mode: pb.SubscriptionMode_ON_CHANGE,
6295+
},
6296+
},
6297+
},
6298+
},
6299+
}
6300+
6301+
var subTest = func(t *testing.T, duration time.Duration, expectedSamples uint, wg *sync.WaitGroup) {
6302+
defer wg.Done()
6303+
t.Helper()
6304+
numSampleUpdates := uint(0)
6305+
numOnChangeUpdates := uint(0)
6306+
rclient := db.RedisClient(db.ConfigDB)
6307+
defer db.CloseRedisClient(rclient)
6308+
ctx, cancel := context.WithTimeout(context.Background(), duration)
6309+
defer cancel()
6310+
stream, err := gClient.Subscribe(ctx, grpc.MaxCallRecvMsgSize(6000000))
6311+
if err != nil {
6312+
t.Error(err.Error())
6313+
return
6314+
}
6315+
// Send the subscription and wait for sync response
6316+
if err = stream.Send(subReq); err != nil {
6317+
t.Errorf("Failed to send subscription: %v", err)
6318+
return
6319+
}
6320+
go func() {
6321+
time.Sleep(duration)
6322+
6323+
}()
6324+
syncReceived := false
6325+
for {
6326+
resp, err := stream.Recv()
6327+
if err != nil {
6328+
break
6329+
}
6330+
if resp.GetSyncResponse() {
6331+
syncReceived = true
6332+
6333+
// Trigger an OnChange update.
6334+
if err = rclient.HSet(context.Background(), "DEVICE_METADATA|localhost", "config-meta-data", time.Now().String()).Err(); err != nil {
6335+
t.Logf("Failed to set config-meta-data field: %v", err)
6336+
}
6337+
6338+
continue
6339+
}
6340+
if syncReceived {
6341+
if strings.Contains(resp.GetUpdate().GetUpdate()[0].GetPath().String(), "\"name\"") {
6342+
numSampleUpdates += 1
6343+
} else if strings.Contains(resp.GetUpdate().GetUpdate()[0].GetPath().String(), "\"config-meta-data\"") {
6344+
numOnChangeUpdates += 1
6345+
}
6346+
}
6347+
}
6348+
if numSampleUpdates < expectedSamples {
6349+
t.Errorf("Unexepcted number of Sample updates after sync response: got=%v, want at least %v", numSampleUpdates, expectedSamples)
6350+
}
6351+
if numOnChangeUpdates < 1 {
6352+
t.Errorf("Unexepcted number of OnChange updates after sync response: got=%v, want at least %v", numOnChangeUpdates, 1)
6353+
}
6354+
}
6355+
6356+
for _, test := range tests {
6357+
t.Run(test.name, func(t *testing.T) {
6358+
wg := &sync.WaitGroup{}
6359+
6360+
wg.Add(1)
6361+
go subTest(t, test.duration, test.sampleUpdates, wg)
6362+
6363+
time.Sleep(test.delay)
6364+
6365+
wg.Add(1)
6366+
go subTest(t, test.duration, test.sampleUpdates, wg)
6367+
6368+
wg.Wait()
6369+
})
6370+
}
6371+
}
62326372

62336373
func init() {
62346374
// Enable logs at UT setup

0 commit comments

Comments
 (0)