Skip to content

Add BufferChannel for fsm incoming/outgoing channels. #2869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,296 changes: 1,324 additions & 972 deletions api/gobgp.pb.go

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions api/gobgp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ message PeerConf {
bool admin_down = 15;
bool send_software_version = 16;
bool allow_aspath_loop_local = 17;
uint32 incoming_channel_timeout = 18;
ChannelConfig incoming_channel = 19;
uint32 outgoing_channel_timeout = 20;
ChannelConfig outgoing_channel = 21;
}

message PeerGroupConf {
Expand All @@ -635,6 +639,10 @@ message PeerGroupConf {
bool route_flap_damping = 8;
uint32 send_community = 9;
bool send_software_version = 10;
uint32 incoming_channel_timeout = 11;
ChannelConfig incoming_channel = 12;
uint32 outgoing_channel_timeout = 13;
ChannelConfig outgoing_channel = 14;
}

message PeerGroupState {
Expand Down Expand Up @@ -696,6 +704,10 @@ message PeerState {
repeated google.protobuf.Any remote_cap = 18;
repeated google.protobuf.Any local_cap = 19;
string router_id = 20;
uint64 incoming_channel_dropped = 21;
ChannelState incoming_channel = 22;
uint32 outgoing_channel_dropped = 23;
ChannelState outgoing_channel = 24;
}

message Messages {
Expand Down Expand Up @@ -1153,3 +1165,19 @@ message SetLogLevelRequest {
}
Level level = 1;
}

enum ChannelType { INFINITE = 0; BUFFER = 1; }

message ChannelConfig {
ChannelType type = 1;
uint64 size = 2;
}

message ChannelState {
uint64 in = 1;
uint64 notifications = 2;
uint64 collected = 3;
uint64 rewritten = 4;
uint64 retries = 5;
uint64 out = 6;
}
35 changes: 35 additions & 0 deletions cmd/gobgp/neighbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,41 @@ func showNeighbor(args []string) error {
}
}
}

incomingChannelType := "Infinite"
if p.Conf.IncomingChannel.Type == api.ChannelType_BUFFER {
incomingChannelType = "Buffer"
}
fmt.Printf(" IncomingChannel: %s\n", incomingChannelType)
if p.State != nil && p.State.IncomingChannel != nil {
if incomingChannelType == "Buffer" {
fmt.Printf(" In: %d\n", p.State.IncomingChannel.In)
fmt.Printf(" Notifications: %d\n", p.State.IncomingChannel.Notifications)
fmt.Printf(" Collected: %d\n", p.State.IncomingChannel.Collected)
fmt.Printf(" Rewritten: %d\n", p.State.IncomingChannel.Rewritten)
fmt.Printf(" Retries: %d\n", p.State.IncomingChannel.Retries)
fmt.Printf(" Out: %d\n", p.State.IncomingChannel.Out)
}
}
fmt.Printf(" Dropped: %d\n", p.State.IncomingChannelDropped)

outgoingChannelType := "Infinite"
if p.Conf.OutgoingChannel.Type == api.ChannelType_BUFFER {
outgoingChannelType = "Buffer"
}
fmt.Printf(" OutgoingChannel: %s\n", outgoingChannelType)
if p.State != nil && p.State.OutgoingChannel != nil {
if outgoingChannelType == "Buffer" {
fmt.Printf(" In: %d\n", p.State.OutgoingChannel.In)
fmt.Printf(" Notifications: %d\n", p.State.OutgoingChannel.Notifications)
fmt.Printf(" Collected: %d\n", p.State.OutgoingChannel.Collected)
fmt.Printf(" Rewritten: %d\n", p.State.OutgoingChannel.Rewritten)
fmt.Printf(" Retries: %d\n", p.State.OutgoingChannel.Retries)
fmt.Printf(" Out: %d\n", p.State.OutgoingChannel.Out)
}
}
fmt.Printf(" Dropped: %d\n", p.State.OutgoingChannelDropped)

return nil
}

Expand Down
235 changes: 235 additions & 0 deletions internal/pkg/channels/buffer_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package channels

import (
"sync/atomic"
"time"

api "github.com/osrg/gobgp/v3/api"
"github.com/osrg/gobgp/v3/internal/pkg/table"
"github.com/osrg/gobgp/v3/pkg/packet/bgp"
)

const (
// default input channel size.
// specifies a buffer size for input channel.
defaultInSize = 64
)

type BufferMessageInterface interface {
PathList() []*table.Path
SetPathList(pathList []*table.Path)
}

type pathKey struct {
rf bgp.RouteFamily
nlri string
pathIdentifier uint32
}

type BufferChannel struct {
input chan any
output chan any

in atomic.Uint64
notifications atomic.Uint64
collected atomic.Uint64
rewritten atomic.Uint64
retries atomic.Uint64
out atomic.Uint64

pathIdxs map[pathKey]int
last BufferMessageInterface

consumerStop chan struct{}
}

func NewBufferChannel(inSize int) *BufferChannel {
bc := &BufferChannel{
output: make(chan any),
pathIdxs: make(map[pathKey]int),
consumerStop: make(chan struct{}),
}

if inSize == 0 {
// if inSize not set, use default value
inSize = defaultInSize
}
bc.input = make(chan any, inSize)

go bc.serve()
return bc
}

func (bc *BufferChannel) Push(m any, timeout time.Duration) bool {
var timerCh <-chan time.Time
if timeout != 0 {
timer := time.NewTimer(timeout)
defer timer.Stop()

timerCh = timer.C
}

select {
case bc.input <- m:
case <-bc.consumerStop:
case <-timerCh:
// message was dropped
return false
}

return true
}

func (bc *BufferChannel) Out() <-chan any {
return bc.output
}

func (bc *BufferChannel) Stats() *api.ChannelState {
return &api.ChannelState{
In: bc.in.Load(),
Notifications: bc.notifications.Load(),
Collected: bc.collected.Load(),
Rewritten: bc.rewritten.Load(),
Retries: bc.retries.Load(),
Out: bc.out.Load(),
}
}

func (bc *BufferChannel) Clean() {
bc.Close()
// drain all remaining items
for range bc.output {
}
}

func (bc *BufferChannel) Close() {
close(bc.input)
}

func (bc *BufferChannel) SetConsumerClosed() {
close(bc.consumerStop)
}

func (bc *BufferChannel) serve() {
for {
var out chan any
if bc.last != nil {
out = bc.output
}

select {
case elem, open := <-bc.input:
if !open {
close(bc.output)
return
}

bc.onInput(elem)
case out <- bc.last:
bc.out.Add(1)

clear(bc.pathIdxs)
bc.last = nil
}
}
}

func (bc *BufferChannel) onInput(anyElem any) {
bc.in.Add(1)

elem, ok := anyElem.(BufferMessageInterface)
if !ok || len(elem.PathList()) == 0 {
// pass not BufferChannel's element or notification to output with blocking channel
bc.notifications.Add(1)

if bc.last != nil {
bc.output <- bc.last
bc.out.Add(1)

clear(bc.pathIdxs)
bc.last = nil
}

bc.output <- anyElem
bc.out.Add(1)
return
}

if bc.last != nil {
bc.collect(elem)
return
}

select {
case bc.output <- elem:
// done
bc.out.Add(1)
default:
// try output later
bc.retries.Add(1)

bc.collect(elem)
}
}

func (bc *BufferChannel) collect(elem BufferMessageInterface) {
bc.collected.Add(1)

pathList := elem.PathList()

if bc.last == nil {
// first

for idx, path := range pathList {
if path == nil || path.IsEOR() {
continue
}

key := pathKey{
rf: path.GetRouteFamily(),
pathIdentifier: path.GetNlri().PathIdentifier(),
nlri: table.TableKey(path.GetNlri()),
}

bc.pathIdxs[key] = idx
}
} else {
// merge

nextPathsList := bc.last.PathList()

for _, path := range pathList {
if path == nil {
continue
}

if path.IsEOR() {
nextPathsList = append(nextPathsList, path)
continue
}

key := pathKey{
rf: path.GetRouteFamily(),
pathIdentifier: path.GetNlri().PathIdentifier(),
nlri: table.TableKey(path.GetNlri()),
}

idx, ok := bc.pathIdxs[key]
if !ok {
// new path

bc.pathIdxs[key] = len(nextPathsList)
nextPathsList = append(nextPathsList, path)
} else {
// rewrite path
bc.rewritten.Add(1)

nextPathsList[idx] = path
}
}

elem.SetPathList(nextPathsList)
}

bc.last = elem
}
Loading