Skip to content

Commit e647c94

Browse files
author
Timur Aitov
committed
Add BufferChannel for fsm incoming/outgoing channels.
Added the ability to select a type of fsm incoming/outgoing channels: 1. InfiniteChannel (old behavior, default) 2. BufferChannel BufferChannel allows to accumulate and combine paths in a separate thread, in the case when the reading side (for example, BgpServer.Serve) is busy. A map with NLRI key is also used to combine multiple changes of one path into a single update.
1 parent 9bc8e11 commit e647c94

File tree

20 files changed

+3414
-1626
lines changed

20 files changed

+3414
-1626
lines changed

api/gobgp.pb.go

Lines changed: 1324 additions & 972 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/gobgp.proto

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,10 @@ message PeerConf {
622622
bool admin_down = 15;
623623
bool send_software_version = 16;
624624
bool allow_aspath_loop_local = 17;
625+
uint32 incoming_channel_timeout = 18;
626+
ChannelConfig incoming_channel = 19;
627+
uint32 outgoing_channel_timeout = 20;
628+
ChannelConfig outgoing_channel = 21;
625629
}
626630

627631
message PeerGroupConf {
@@ -635,6 +639,10 @@ message PeerGroupConf {
635639
bool route_flap_damping = 8;
636640
uint32 send_community = 9;
637641
bool send_software_version = 10;
642+
uint32 incoming_channel_timeout = 11;
643+
ChannelConfig incoming_channel = 12;
644+
uint32 outgoing_channel_timeout = 13;
645+
ChannelConfig outgoing_channel = 14;
638646
}
639647

640648
message PeerGroupState {
@@ -696,6 +704,10 @@ message PeerState {
696704
repeated google.protobuf.Any remote_cap = 18;
697705
repeated google.protobuf.Any local_cap = 19;
698706
string router_id = 20;
707+
uint64 incoming_channel_dropped = 21;
708+
ChannelState incoming_channel = 22;
709+
uint32 outgoing_channel_dropped = 23;
710+
ChannelState outgoing_channel = 24;
699711
}
700712

701713
message Messages {
@@ -1153,3 +1165,19 @@ message SetLogLevelRequest {
11531165
}
11541166
Level level = 1;
11551167
}
1168+
1169+
enum ChannelType { INFINITE = 0; BUFFER = 1; }
1170+
1171+
message ChannelConfig {
1172+
ChannelType type = 1;
1173+
uint64 size = 2;
1174+
}
1175+
1176+
message ChannelState {
1177+
uint64 in = 1;
1178+
uint64 notifications = 2;
1179+
uint64 collected = 3;
1180+
uint64 rewritten = 4;
1181+
uint64 retries = 5;
1182+
uint64 out = 6;
1183+
}

cmd/gobgp/neighbor.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,41 @@ func showNeighbor(args []string) error {
531531
}
532532
}
533533
}
534+
535+
incomingChannelType := "Infinite"
536+
if p.Conf.IncomingChannel.Type == api.ChannelType_BUFFER {
537+
incomingChannelType = "Buffer"
538+
}
539+
fmt.Printf(" IncomingChannel: %s\n", incomingChannelType)
540+
if p.State != nil && p.State.IncomingChannel != nil {
541+
if incomingChannelType == "Buffer" {
542+
fmt.Printf(" In: %d\n", p.State.IncomingChannel.In)
543+
fmt.Printf(" Notifications: %d\n", p.State.IncomingChannel.Notifications)
544+
fmt.Printf(" Collected: %d\n", p.State.IncomingChannel.Collected)
545+
fmt.Printf(" Rewritten: %d\n", p.State.IncomingChannel.Rewritten)
546+
fmt.Printf(" Retries: %d\n", p.State.IncomingChannel.Retries)
547+
fmt.Printf(" Out: %d\n", p.State.IncomingChannel.Out)
548+
}
549+
}
550+
fmt.Printf(" Dropped: %d\n", p.State.IncomingChannelDropped)
551+
552+
outgoingChannelType := "Infinite"
553+
if p.Conf.OutgoingChannel.Type == api.ChannelType_BUFFER {
554+
outgoingChannelType = "Buffer"
555+
}
556+
fmt.Printf(" OutgoingChannel: %s\n", outgoingChannelType)
557+
if p.State != nil && p.State.OutgoingChannel != nil {
558+
if outgoingChannelType == "Buffer" {
559+
fmt.Printf(" In: %d\n", p.State.OutgoingChannel.In)
560+
fmt.Printf(" Notifications: %d\n", p.State.OutgoingChannel.Notifications)
561+
fmt.Printf(" Collected: %d\n", p.State.OutgoingChannel.Collected)
562+
fmt.Printf(" Rewritten: %d\n", p.State.OutgoingChannel.Rewritten)
563+
fmt.Printf(" Retries: %d\n", p.State.OutgoingChannel.Retries)
564+
fmt.Printf(" Out: %d\n", p.State.OutgoingChannel.Out)
565+
}
566+
}
567+
fmt.Printf(" Dropped: %d\n", p.State.OutgoingChannelDropped)
568+
534569
return nil
535570
}
536571

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package channels
2+
3+
import (
4+
"sync/atomic"
5+
"time"
6+
7+
api "github.com/osrg/gobgp/v3/api"
8+
"github.com/osrg/gobgp/v3/internal/pkg/table"
9+
"github.com/osrg/gobgp/v3/pkg/packet/bgp"
10+
)
11+
12+
const (
13+
// default input channel size.
14+
// specifies a buffer size for input channel.
15+
defaultInSize = 64
16+
)
17+
18+
type BufferMessageInterface interface {
19+
PathList() []*table.Path
20+
SetPathList(pathList []*table.Path)
21+
}
22+
23+
type pathKey struct {
24+
rf bgp.RouteFamily
25+
nlri string
26+
pathIdentifier uint32
27+
}
28+
29+
type BufferChannel struct {
30+
input chan any
31+
output chan any
32+
33+
in atomic.Uint64
34+
notifications atomic.Uint64
35+
collected atomic.Uint64
36+
rewritten atomic.Uint64
37+
retries atomic.Uint64
38+
out atomic.Uint64
39+
40+
pathIdxs map[pathKey]int
41+
last BufferMessageInterface
42+
43+
consumerStop chan struct{}
44+
}
45+
46+
func NewBufferChannel(inSize int) *BufferChannel {
47+
bc := &BufferChannel{
48+
output: make(chan any),
49+
pathIdxs: make(map[pathKey]int),
50+
consumerStop: make(chan struct{}),
51+
}
52+
53+
if inSize == 0 {
54+
// if inSize not set, use default value
55+
inSize = defaultInSize
56+
}
57+
bc.input = make(chan any, inSize)
58+
59+
go bc.serve()
60+
return bc
61+
}
62+
63+
func (bc *BufferChannel) Push(m any, timeout time.Duration) bool {
64+
var timerCh <-chan time.Time
65+
if timeout != 0 {
66+
timer := time.NewTimer(timeout)
67+
defer timer.Stop()
68+
69+
timerCh = timer.C
70+
}
71+
72+
select {
73+
case bc.input <- m:
74+
case <-bc.consumerStop:
75+
case <-timerCh:
76+
// message was dropped
77+
return false
78+
}
79+
80+
return true
81+
}
82+
83+
func (bc *BufferChannel) Out() <-chan any {
84+
return bc.output
85+
}
86+
87+
func (bc *BufferChannel) Stats() *api.ChannelState {
88+
return &api.ChannelState{
89+
In: bc.in.Load(),
90+
Notifications: bc.notifications.Load(),
91+
Collected: bc.collected.Load(),
92+
Rewritten: bc.rewritten.Load(),
93+
Retries: bc.retries.Load(),
94+
Out: bc.out.Load(),
95+
}
96+
}
97+
98+
func (bc *BufferChannel) Clean() {
99+
bc.Close()
100+
// drain all remaining items
101+
for range bc.output {
102+
}
103+
}
104+
105+
func (bc *BufferChannel) Close() {
106+
close(bc.input)
107+
}
108+
109+
func (bc *BufferChannel) SetConsumerClosed() {
110+
close(bc.consumerStop)
111+
}
112+
113+
func (bc *BufferChannel) serve() {
114+
for {
115+
var out chan any
116+
if bc.last != nil {
117+
out = bc.output
118+
}
119+
120+
select {
121+
case elem, open := <-bc.input:
122+
if !open {
123+
close(bc.output)
124+
return
125+
}
126+
127+
bc.onInput(elem)
128+
case out <- bc.last:
129+
bc.out.Add(1)
130+
131+
clear(bc.pathIdxs)
132+
bc.last = nil
133+
}
134+
}
135+
}
136+
137+
func (bc *BufferChannel) onInput(anyElem any) {
138+
bc.in.Add(1)
139+
140+
elem, ok := anyElem.(BufferMessageInterface)
141+
if !ok || len(elem.PathList()) == 0 {
142+
// pass not BufferChannel's element or notification to output with blocking channel
143+
bc.notifications.Add(1)
144+
145+
if bc.last != nil {
146+
bc.output <- bc.last
147+
bc.out.Add(1)
148+
149+
clear(bc.pathIdxs)
150+
bc.last = nil
151+
}
152+
153+
bc.output <- anyElem
154+
bc.out.Add(1)
155+
return
156+
}
157+
158+
if bc.last != nil {
159+
bc.collect(elem)
160+
return
161+
}
162+
163+
select {
164+
case bc.output <- elem:
165+
// done
166+
bc.out.Add(1)
167+
default:
168+
// try output later
169+
bc.retries.Add(1)
170+
171+
bc.collect(elem)
172+
}
173+
}
174+
175+
func (bc *BufferChannel) collect(elem BufferMessageInterface) {
176+
bc.collected.Add(1)
177+
178+
pathList := elem.PathList()
179+
180+
if bc.last == nil {
181+
// first
182+
183+
for idx, path := range pathList {
184+
if path == nil || path.IsEOR() {
185+
continue
186+
}
187+
188+
key := pathKey{
189+
rf: path.GetRouteFamily(),
190+
pathIdentifier: path.GetNlri().PathIdentifier(),
191+
nlri: table.TableKey(path.GetNlri()),
192+
}
193+
194+
bc.pathIdxs[key] = idx
195+
}
196+
} else {
197+
// merge
198+
199+
nextPathsList := bc.last.PathList()
200+
201+
for _, path := range pathList {
202+
if path == nil {
203+
continue
204+
}
205+
206+
if path.IsEOR() {
207+
nextPathsList = append(nextPathsList, path)
208+
continue
209+
}
210+
211+
key := pathKey{
212+
rf: path.GetRouteFamily(),
213+
pathIdentifier: path.GetNlri().PathIdentifier(),
214+
nlri: table.TableKey(path.GetNlri()),
215+
}
216+
217+
idx, ok := bc.pathIdxs[key]
218+
if !ok {
219+
// new path
220+
221+
bc.pathIdxs[key] = len(nextPathsList)
222+
nextPathsList = append(nextPathsList, path)
223+
} else {
224+
// rewrite path
225+
bc.rewritten.Add(1)
226+
227+
nextPathsList[idx] = path
228+
}
229+
}
230+
231+
elem.SetPathList(nextPathsList)
232+
}
233+
234+
bc.last = elem
235+
}

0 commit comments

Comments
 (0)