Skip to content

Commit 908e2b3

Browse files
Deduplicate Subscriptions
Signed-off-by: Niranjani Vivek <niranjaniv@google.com>
1 parent e980d11 commit 908e2b3

File tree

4 files changed

+678
-29
lines changed

4 files changed

+678
-29
lines changed
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
log "github.com/golang/glog"
6+
"github.com/golang/protobuf/proto"
7+
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
)
12+
13+
var superSubs = superSubscriptions{
14+
mu: &sync.Mutex{},
15+
subs: map[*superSubscription]bool{},
16+
}
17+
18+
type superSubscriptions struct {
19+
mu *sync.Mutex
20+
subs map[*superSubscription]bool
21+
}
22+
23+
// superSubscription is used to deduplicate subscriptions. Stream Subscriptions
24+
// become part of a superSubscription and whenever a Sample is processed, the
25+
// response is sent to all clients that are part of the superSubscription.
26+
type superSubscription struct {
27+
mu *sync.RWMutex
28+
clients map[*TranslClient]struct{}
29+
request *gnmipb.SubscriptionList
30+
primaryClient *TranslClient
31+
tickers map[int]*time.Ticker // map of interval duration (nanoseconds) to ticker.
32+
sharedUpdates atomic.Uint64
33+
exclusiveUpdates atomic.Uint64
34+
}
35+
36+
// ------------- Super Subscription Functions -------------
37+
// createSuperSubscription takes a SubscriptionList and returns a new
38+
// superSubscription for that SubscriptionList. This function expects the
39+
// caller to already hold superSubs.mu before calling createSuperSubscription.
40+
func createSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
41+
if subscription == nil {
42+
return nil
43+
}
44+
newSuperSub := &superSubscription{
45+
mu: &sync.RWMutex{},
46+
clients: map[*TranslClient]struct{}{},
47+
request: subscription,
48+
primaryClient: nil,
49+
tickers: map[int]*time.Ticker{},
50+
sharedUpdates: atomic.Uint64{},
51+
exclusiveUpdates: atomic.Uint64{},
52+
}
53+
if _, ok := superSubs.subs[newSuperSub]; ok {
54+
// This should never happen.
55+
log.V(0).Infof("Super Subscription (%p) for %v already exists but a new has been created!", newSuperSub, subscription)
56+
}
57+
superSubs.subs[newSuperSub] = true
58+
return newSuperSub
59+
}
60+
61+
// findSuperSubscription takes a SubscriptionList and tries to find an
62+
// existing superSubscription for that SubscriptionList. If one is found,
63+
// the superSubscription is returned. Else, nil is returned. This function
64+
// expects the caller to already hold superSubs.mu before calling findSuperSubscription.
65+
func findSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
66+
if subscription == nil {
67+
return nil
68+
}
69+
for sub, _ := range superSubs.subs {
70+
if sub.request == nil {
71+
continue
72+
}
73+
if proto.Equal(sub.request, subscription) {
74+
return sub
75+
}
76+
}
77+
return nil
78+
}
79+
80+
// deleteSuperSub removes superSub from the superSubs map.
81+
// If the superSub is removed from the TranslClient, there
82+
// should be no remaining references to the superSub. This
83+
// function expects the caller to already hold superSubs.mu
84+
// before calling deleteSuperSubscription.
85+
func deleteSuperSubscription(superSub *superSubscription) {
86+
if superSub == nil {
87+
log.V(0).Info("deleteSuperSubscription called on a nil Super Subscription!")
88+
return
89+
}
90+
tickerCleanup(superSub.tickers)
91+
delete(superSubs.subs, superSub)
92+
}
93+
94+
// ------------- Super Subscription Methods -------------
95+
// sendNotifications takes a value and adds it to the notification
96+
// queue for each subscription in the superSubscription.
97+
func (ss *superSubscription) sendNotifications(v *Value) {
98+
if v == nil {
99+
return
100+
}
101+
ss.mu.RLock()
102+
defer ss.mu.RUnlock()
103+
for client, _ := range ss.clients {
104+
client.q.Put(*v)
105+
}
106+
}
107+
108+
// populateTickers populates the ticker_info objects in the intervalToTickerInfoMap with the
109+
// shared tickers. If tickers don't exist yet, they are created.
110+
func (ss *superSubscription) populateTickers(intervalToTickerInfoMap map[int][]*ticker_info) error {
111+
if intervalToTickerInfoMap == nil {
112+
return fmt.Errorf("Invalid intervalToTickerInfoMap passed in: %v", intervalToTickerInfoMap)
113+
}
114+
ss.mu.Lock()
115+
defer ss.mu.Unlock()
116+
if len(ss.tickers) == 0 {
117+
// Create the tickers.
118+
for interval, tInfos := range intervalToTickerInfoMap {
119+
ticker := time.NewTicker(time.Duration(interval) * time.Nanosecond)
120+
ss.tickers[interval] = ticker
121+
for _, tInfo := range tInfos {
122+
tInfo.t = ticker
123+
}
124+
}
125+
return nil
126+
}
127+
// Use the existing tickers.
128+
if len(ss.tickers) != len(intervalToTickerInfoMap) {
129+
return fmt.Errorf("Length of intervalToTickerInfoMap does not match length of existing tickers for Super Subscription! existing tickers=%v, intervalToTickerInfoMap=%v", ss.tickers, intervalToTickerInfoMap)
130+
}
131+
for interval, tInfos := range intervalToTickerInfoMap {
132+
ticker, ok := ss.tickers[interval]
133+
if !ok {
134+
return fmt.Errorf("Interval in intervalToTickerInfoMap not found in existing tickers for Super Subscription! interval=%v", interval)
135+
}
136+
for _, tInfo := range tInfos {
137+
tInfo.t = ticker
138+
}
139+
}
140+
return nil
141+
}
142+
func (ss *superSubscription) String() string {
143+
return fmt.Sprintf("[{%p} NumClients=%d, SharedUpdates=%d, ExclusiveUpdates=%d, Request=%v]", ss, len(ss.clients), ss.sharedUpdates.Load(), ss.exclusiveUpdates.Load(), ss.request)
144+
}
145+
146+
// ------------- TranslClient Methods -------------
147+
// isPrimary returns true if the client is the primary client of its superSubscription.
148+
func (c *TranslClient) isPrimary() bool {
149+
if c == nil || c.superSub == nil {
150+
return false
151+
}
152+
c.superSub.mu.RLock()
153+
defer c.superSub.mu.RUnlock()
154+
return c.superSub.primaryClient == c
155+
}
156+
157+
// leaveSuperSubscription removes the client from the superSubscription.
158+
// If there are no remaining clients in the superSubscription, it is deleted.
159+
func (c *TranslClient) leaveSuperSubscription() {
160+
if c == nil || c.superSub == nil {
161+
return
162+
}
163+
superSubs.mu.Lock()
164+
defer superSubs.mu.Unlock()
165+
c.superSub.mu.Lock()
166+
defer c.superSub.mu.Unlock()
167+
delete(c.superSub.clients, c)
168+
if len(c.superSub.clients) == 0 {
169+
deleteSuperSubscription(c.superSub)
170+
log.V(2).Infof("SuperSubscription (%s) closing!", c.superSub)
171+
} else if c.superSub.primaryClient == c {
172+
// Set a new primary client.
173+
for client := range c.superSub.clients {
174+
c.superSub.primaryClient = client
175+
client.wakeChan <- true
176+
break
177+
}
178+
log.V(2).Infof("SuperSubscription (%s): %p is now the primary client", c.superSub, c.superSub.primaryClient)
179+
}
180+
}
181+
182+
// addClientToSuperSubscription adds a client to a superSubscription.
183+
func (c *TranslClient) addClientToSuperSubscription(subscription *gnmipb.SubscriptionList) {
184+
if c == nil || subscription == nil {
185+
return
186+
}
187+
superSubs.mu.Lock()
188+
defer superSubs.mu.Unlock()
189+
superSub := findSuperSubscription(subscription)
190+
if superSub == nil {
191+
superSub = createSuperSubscription(subscription)
192+
}
193+
superSub.mu.Lock()
194+
defer superSub.mu.Unlock()
195+
c.superSub = superSub
196+
superSub.clients[c] = struct{}{}
197+
if superSub.primaryClient == nil {
198+
superSub.primaryClient = c
199+
}
200+
log.V(2).Infof("SuperSubscription (%s): added new client=%p", superSub, c)
201+
}

0 commit comments

Comments
 (0)