Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
baeac70
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 May 13, 2025
38fc2a1
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 May 27, 2025
d0e0b34
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Jul 21, 2025
b475b8e
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Jul 22, 2025
6914692
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Jul 25, 2025
48ce988
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Jul 25, 2025
cec3dd0
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Jul 29, 2025
3ef83cf
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Jul 30, 2025
9637e88
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Jul 31, 2025
56aa077
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 4, 2025
d1adc75
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 6, 2025
5a8f5f1
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Aug 6, 2025
e39b8cf
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 6, 2025
a649bd3
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Aug 6, 2025
180ac0a
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 13, 2025
34af4fd
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Aug 13, 2025
05215c6
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 18, 2025
65978d6
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Aug 18, 2025
eec20e3
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 19, 2025
86e8bbc
Replace priority queue with limited queue to prevent OOM slow polling…
Sarah-Singh16 Aug 19, 2025
fa484a4
Merge branch 'master' into add_limited_queue_for_oom_fix
Sarah-Singh16 Aug 26, 2025
7540535
not4review: add limit queue
Sarah-Singh16 Aug 26, 2025
7fc824a
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Aug 29, 2025
66fb108
not4review: adding limited queue
Sarah-Singh16 Aug 29, 2025
29ebdd8
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Sep 3, 2025
e6a03da
not4review: adding limited queue
Sarah-Singh16 Sep 4, 2025
e934cd4
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Sep 9, 2025
a76bb2a
not4review: adding limited queue
Sarah-Singh16 Sep 9, 2025
1d64b50
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Nov 20, 2025
33f7b97
oom fix
Sarah-Singh16 Nov 20, 2025
20c2776
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Nov 21, 2025
3768c33
oom fix
Sarah-Singh16 Nov 22, 2025
4c86602
oom fix
Sarah-Singh16 Nov 22, 2025
e958a41
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Nov 25, 2025
5cf439b
oom fix
Sarah-Singh16 Nov 25, 2025
7d215b6
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Nov 25, 2025
8d75288
oom fix
Sarah-Singh16 Nov 25, 2025
f664e44
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Dec 1, 2025
65c59b3
oom fix
Sarah-Singh16 Dec 1, 2025
d04b92f
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Dec 11, 2025
ea2e62a
oom fix
Sarah-Singh16 Dec 11, 2025
7f28cc0
oom fix
Sarah-Singh16 Dec 12, 2025
1c06aac
oom fix
Sarah-Singh16 Dec 16, 2025
5c8d628
oom fix
Sarah-Singh16 Dec 17, 2025
1399578
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Dec 29, 2025
3d9a3cf
oom fix
Sarah-Singh16 Dec 29, 2025
8b39e4f
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Jan 23, 2026
887b563
oom fix
Sarah-Singh16 Jan 23, 2026
63d9242
oom fix
Sarah-Singh16 Jan 23, 2026
b366678
oom fix
Sarah-Singh16 Jan 24, 2026
af7c75b
oom fix
Sarah-Singh16 Jan 27, 2026
0a55689
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Feb 2, 2026
a1e500f
oom fix
Sarah-Singh16 Feb 2, 2026
053f685
Merge branch 'master' into noreview_add_limit_queue
Sarah-Singh16 Feb 6, 2026
1968d4f
oom fix
Sarah-Singh16 Feb 6, 2026
1405c47
oom fix
Sarah-Singh16 Feb 6, 2026
72e3e8b
oom fix
Sarah-Singh16 Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions dialout/dialout_client/dialout_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client"
sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config"

"github.com/Workiva/go-datastructures/queue"
log "github.com/golang/glog"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
Expand Down Expand Up @@ -101,6 +100,7 @@ type ClientConfig struct {
Unidirectional bool // by default, no reponse from remote server
TLS *tls.Config // TLS config to use when connecting to target. Optional.
RedisConType string // "unix" or "tcp"
OutputQueueSz uint64
}

// clientSubscription is the container for config data,
Expand All @@ -116,12 +116,12 @@ type clientSubscription struct {

// Running time data
cMu sync.Mutex
client *Client // GNMIDialOutClient
dc sdc.Client // SONiC data client
stop chan struct{} // Inform publishRun routine to stop
q *queue.PriorityQueue // for data passing among go routine
w sync.WaitGroup // Wait for all sub go routine to finish
opened bool // whether there is opened instance for this client subscription
client *Client // GNMIDialOutClient
dc sdc.Client // SONiC data client
stop chan struct{} // Inform publishRun routine to stop
q *sdc.LimitedQueue // for data passing among go routine
w sync.WaitGroup // Wait for all sub go routine to finish
opened bool // whether there is opened instance for this client subscription
cancel context.CancelFunc

conTryCnt uint64 //Number of time trying to connect
Expand Down Expand Up @@ -156,9 +156,9 @@ func (cs *clientSubscription) Close() {
close(cs.stop) //Inform the clientSubscription publish service routine to stop
}

if cs.q != nil {
if !cs.q.Disposed() {
cs.q.Dispose()
if cs.q.Q != nil {
if !cs.q.Q.Disposed() {
cs.q.Q.Dispose()
}
}
if cs.client != nil {
Expand Down Expand Up @@ -212,28 +212,19 @@ func (cs *clientSubscription) NewInstance(ctx context.Context) error {
// send runs until process Queue returns an error.
func (cs *clientSubscription) send(stream spb.GNMIDialOut_PublishClient) error {
for {
items, err := cs.q.Get(1)
item, err := cs.q.DequeueItem()

if items == nil {
log.V(1).Infof("%v", err)
return err
}
if err != nil {
cs.errors++
log.V(1).Infof("%v", err)
return fmt.Errorf("unexpected queue Gext(1): %v", err)
}

var resp *gpb.SubscribeResponse
switch v := items[0].(type) {
case sdc.Value:
if resp, err = sdc.ValToResp(v); err != nil {
cs.errors++
return err
}
default:
log.V(1).Infof("Unknown data type %v for %s in queue", items[0], cs)

if resp, err = sdc.ValToResp(item); err != nil {
cs.errors++
return err
}

cs.sendMsg++
Expand Down Expand Up @@ -295,7 +286,7 @@ func publishRun(ctx context.Context, cs *clientSubscription, dests []Destination
restart: //Remote server might go down, in that case we restart with next destination in the group
cs.cMu.Lock()
cs.stop = make(chan struct{}, 1)
cs.q = queue.NewPriorityQueue(1, false)
cs.q = sdc.NewLimitedQueue(1, false, clientCfg.OutputQueueSz)
cs.opened = true
cs.client = nil
cs.cMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions dialout/dialout_client/dialout_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func TestGNMIDialOutPublish(t *testing.T) {
Encoding: pb.Encoding_JSON_IETF,
Unidirectional: true,
TLS: &tls.Config{InsecureSkipVerify: true},
OutputQueueSz: 10 * (1024 * 1024),
}
ctx, cancel := context.WithCancel(context.Background())

Expand Down
3 changes: 3 additions & 0 deletions dialout/dialout_client_cli/dialout_client_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ var (
Encoding: gpb.Encoding_JSON_IETF,
Unidirectional: true,
TLS: &tls.Config{},
OutputQueueSz: 10 * (1024 * 1024),
}
outputQueueSz = flag.Uint64("output_queue_size", 10, "Output Queue Maximum Size per Subscribe Session (MB)")
)

func init() {
Expand All @@ -42,6 +44,7 @@ func main() {
<-c
cancel()
}()
clientCfg.OutputQueueSz = *outputQueueSz * uint64(1024*1024)
log.V(1).Infof("Starting telemetry publish client")
err := dc.DialOutRun(ctx, &clientCfg)
log.V(1).Infof("Exiting telemetry publish client: %v", err)
Expand Down
36 changes: 15 additions & 21 deletions gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gnmi

import (
"fmt"
"github.com/Workiva/go-datastructures/queue"
log "github.com/golang/glog"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client"
Expand All @@ -15,6 +14,8 @@ import (
"sync"
)

var OutputQueSize uint64

// Client contains information about a subscribe client that has connected to the server.
type Client struct {
addr net.Addr
Expand All @@ -25,7 +26,7 @@ type Client struct {
stop chan struct{}
once chan struct{}
mu sync.RWMutex
q *queue.PriorityQueue
q *sdc.LimitedQueue
subscribe *gnmipb.SubscriptionList
// Wait for all sub go routine to finish
w sync.WaitGroup
Expand All @@ -42,7 +43,7 @@ var connectionManager *ConnectionManager

// NewClient returns a new initialized client.
func NewClient(addr net.Addr) *Client {
pq := queue.NewPriorityQueue(1, false)
pq := sdc.NewLimitedQueue(1, false, OutputQueSize)
return &Client{
addr: addr,
q: pq,
Expand Down Expand Up @@ -228,7 +229,10 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer, config *Config) (err er
c.Close()
// Wait until all child go routines exited
c.w.Wait()
return grpc.Errorf(codes.InvalidArgument, "%s", err)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "%s", err)
}
return err
}

// Closing of client queue is triggered upon end of stream receive or stream error
Expand All @@ -238,11 +242,11 @@ func (c *Client) Close() {
c.mu.Lock()
defer c.mu.Unlock()
log.V(1).Infof("Client %s Close, sendMsg %v recvMsg %v errors %v", c, c.sendMsg, c.recvMsg, c.errors)
if c.q != nil {
if c.q.Disposed() {
if c.q.Q != nil {
if c.q.Q.Disposed() {
return
}
c.q.Dispose()
c.q.Q.Dispose()
}
if c.stop != nil {
close(c.stop)
Expand Down Expand Up @@ -298,12 +302,8 @@ func (c *Client) recv(stream gnmipb.GNMI_SubscribeServer) {
func (c *Client) send(stream gnmipb.GNMI_SubscribeServer, dc sdc.Client) error {
for {
var val *sdc.Value
items, err := c.q.Get(1)
item, err := c.q.DequeueItem()

if items == nil {
log.V(1).Infof("%v", err)
return err
}
if err != nil {
c.errors++
log.V(1).Infof("%v", err)
Expand All @@ -312,18 +312,12 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer, dc sdc.Client) error {

var resp *gnmipb.SubscribeResponse

switch v := items[0].(type) {
case sdc.Value:
if resp, err = sdc.ValToResp(v); err != nil {
c.errors++
return err
}
val = &v
default:
log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c)
if resp, err = sdc.ValToResp(item); err != nil {
c.errors++
return err
}

val = &item
c.sendMsg++
err = stream.Send(resp)
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type Config struct {
ConfigTableName string
Vrf string
EnableCrl bool
MaxNumSubscribers uint64
// Path to the directory where image is stored.
ImgDir string
}
Expand Down Expand Up @@ -477,6 +478,12 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error {
c.setConnectionManager(s.config.Threshold)

s.cMu.Lock()
if uint64(len(s.clients)) >= s.config.MaxNumSubscribers {
log.V(2).Infof("Max clients reached. Rejecting new client %s", c)
c.Close()
s.cMu.Unlock()
return grpc.Errorf(codes.ResourceExhausted, "Maximum number of subscriptions reached")
}
if oc, ok := s.clients[c.String()]; ok {
log.V(2).Infof("Delete duplicate client %s", oc)
oc.Close()
Expand All @@ -485,10 +492,13 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error {
s.clients[c.String()] = c
s.cMu.Unlock()

defer func() {
s.cMu.Lock()
delete(s.clients, c.String())
s.cMu.Unlock()
}()

err := c.Run(stream, s.config)
s.cMu.Lock()
delete(s.clients, c.String())
s.cMu.Unlock()

log.Flush()
return err
Expand Down
63 changes: 62 additions & 1 deletion gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func createClient(t *testing.T, port int) *grpc.ClientConn {

func createServer(t *testing.T, port int64) *Server {
t.Helper()
OutputQueSize = 10 * uint64(1024*1024)
certificate, err := testcert.NewCert()
if err != nil {
t.Fatalf("could not load server key pair: %s", err)
Expand All @@ -136,6 +137,7 @@ func createServer(t *testing.T, port int64) *Server {
EnableTranslibWrite: true,
EnableNativeWrite: true,
Threshold: 100,
MaxNumSubscribers: 10,
ImgDir: "/tmp",
}
s, err := NewServer(cfg, opts)
Expand All @@ -159,6 +161,7 @@ func createReadServer(t *testing.T, port int64) *Server {
cfg := &Config{
Port: port,
EnableTranslibWrite: false,
MaxNumSubscribers: 10,
ImgDir: "/tmp",
}
s, err := NewServer(cfg, opts)
Expand All @@ -183,6 +186,7 @@ func createRejectServer(t *testing.T, port int64) *Server {
Port: port,
EnableTranslibWrite: true,
Threshold: 2,
MaxNumSubscribers: 10,
ImgDir: "/tmp",
}
s, err := NewServer(cfg, opts)
Expand All @@ -208,6 +212,7 @@ func createAuthServer(t *testing.T, port int64) *Server {
Port: port,
EnableTranslibWrite: true,
UserAuth: AuthTypes{"password": true, "cert": true, "jwt": true},
MaxNumSubscribers: 10,
ImgDir: "/tmp",
}
s, err := NewServer(cfg, opts)
Expand Down Expand Up @@ -235,6 +240,27 @@ func createInvalidServer(t *testing.T, port int64) *Server {
return s
}

func createMaxSubscribeServer(t *testing.T, port int64) *Server {
t.Helper()
OutputQueSize = 10 * uint64(1024*1024)
certificate, err := testcert.NewCert()
if err != nil {
t.Fatalf("could not load server key pair: %s", err)
}
tlsCfg := &tls.Config{
ClientAuth: tls.RequestClientCert,
Certificates: []tls.Certificate{certificate},
}

opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
cfg := &Config{Port: port, EnableTranslibWrite: true, EnableNativeWrite: true, Threshold: 100, MaxNumSubscribers: 0}
s, err := NewServer(cfg, opts)
if err != nil {
t.Errorf("Failed to create gNMI server: %v", err)
}
return s
}

func createKeepAliveServer(t *testing.T, port int64) *Server {
t.Helper()
certificate, err := testcert.NewCert()
Expand All @@ -259,6 +285,7 @@ func createKeepAliveServer(t *testing.T, port int64) *Server {
EnableTranslibWrite: true,
EnableNativeWrite: true,
Threshold: 100,
MaxNumSubscribers: 10,
ImgDir: "/tmp",
}

Expand Down Expand Up @@ -5343,7 +5370,6 @@ func TestGNMINative(t *testing.T) {
ns, _ := sdcfg.GetDbDefaultNamespace()
initFullConfigDb(t, ns)
initFullCountersDb(t, ns)

path, _ := os.Getwd()
path = filepath.Dir(path)

Expand Down Expand Up @@ -5488,6 +5514,41 @@ func TestInvalidServer(t *testing.T) {
}
}

func TestMaxSubscribeServer(t *testing.T) {
mock1 := gomonkey.ApplyFunc(dbus.SystemBus, func() (conn *dbus.Conn, err error) {
return &dbus.Conn{}, nil
})
defer mock1.Reset()
mock2 := gomonkey.ApplyMethod(reflect.TypeOf(&dbus.Object{}), "Go", func(obj *dbus.Object, method string, flags dbus.Flags, ch chan *dbus.Call, args ...interface{}) *dbus.Call {
ret := &dbus.Call{}
ret.Err = nil
ret.Body = make([]interface{}, 2)
ret.Body[0] = int32(0)
ch <- ret
return &dbus.Call{}
})
defer mock2.Reset()
mock3 := gomonkey.ApplyFunc(sdc.RunPyCode, func(text string) error { return nil })
defer mock3.Reset()

sdcfg.Init()
s := createMaxSubscribeServer(t, 8080)
go runServer(t, s)
defer s.Stop()
ns, _ := sdcfg.GetDbDefaultNamespace()
initFullConfigDb(t, ns)
initFullCountersDb(t, ns)
path, _ := os.Getwd()
path = filepath.Dir(path)

cmd := exec.Command("bash", "-c", "cd "+path+" && "+"pytest -m 'not multidb and not multins'")
_, err := cmd.Output()
if err == nil {
t.Errorf("Supposed to receive: Maximum number of subscriptions reached")
}
s.Stop()
}

func TestParseOrigin(t *testing.T) {
var test_paths []*gnmipb.Path
var err error
Expand Down
Loading