Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (self CloseHandlerF) HandleClose(ch Channel) {
self(ch)
}

type MessageSourceF func() (Sendable, error)
type MessageSourceF func(notifer *CloseNotifier) (Sendable, error)

type UnderlayHandler interface {
// Start is called after the MultiChannel has been created with the first underlay
Expand Down
2 changes: 2 additions & 0 deletions impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/debugz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/foundation/v2/sequence"
"github.com/pkg/errors"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewChannelWithUnderlay(logicalName string, underlay Underlay, bindHandler B
return nil, err
}

debugz.DumpLocalStack()
impl.startMultiplex()

return impl, nil
Expand Down
47 changes: 37 additions & 10 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) {
return nil, err
}

go impl.Rxer(config.Underlay)
go impl.Txer(config.Underlay)
impl.startMultiplex(config.Underlay)
go impl.underlayHandler.Start(impl)

impl.underlayHandler.HandleUnderlayAccepted(impl, config.Underlay)
Expand All @@ -146,14 +145,19 @@ func (self *multiChannelImpl) AcceptUnderlay(underlay Underlay) bool {
self.headers.Store(underlay.Headers())
self.underlays.Append(underlay)

go self.Rxer(underlay)
go self.Txer(underlay)
self.startMultiplex(underlay)

self.underlayHandler.HandleUnderlayAccepted(self, underlay)

return true
}

func (self *multiChannelImpl) startMultiplex(underlay Underlay) {
notifier := NewCloseNotifier()
go self.Rxer(underlay, notifier)
go self.Txer(underlay, notifier)
}

func (self *multiChannelImpl) GetUnderlayCountsByType() map[string]int {
result := map[string]int{}
for _, u := range self.underlays.Value() {
Expand Down Expand Up @@ -402,12 +406,14 @@ func (self *multiChannelImpl) Tx(underlay Underlay, sendable Sendable, writeTime
return nil
}

func (self *multiChannelImpl) CloseUnderlay(underlay Underlay) {
func (self *multiChannelImpl) closeUnderlay(underlay Underlay, notifier *CloseNotifier) {
self.lock.Lock()
if err := underlay.Close(); err != nil {
pfxlog.Logger().WithField("context", self.Label()).WithError(err).Error("error closing underlay")
}

notifier.NotifyClosed()

underlayRemoved := false
self.underlays.DeleteIf(func(element Underlay) bool {
if underlay == element {
Expand All @@ -427,8 +433,8 @@ func (self *multiChannelImpl) GetTimeSinceLastRead() time.Duration {
return time.Duration(info.NowInMilliseconds()-atomic.LoadInt64(&self.lastRead)) * time.Millisecond
}

func (self *multiChannelImpl) Txer(underlay Underlay) {
defer self.CloseUnderlay(underlay)
func (self *multiChannelImpl) Txer(underlay Underlay, notifier *CloseNotifier) {
defer self.closeUnderlay(underlay, notifier)

log := pfxlog.ContextLogger(self.Label())

Expand All @@ -440,7 +446,7 @@ func (self *multiChannelImpl) Txer(underlay Underlay) {
messageSource := self.underlayHandler.GetMessageSource(underlay)

for {
sendable, err := messageSource()
sendable, err := messageSource(notifier)
if err != nil {
return
}
Expand All @@ -456,8 +462,8 @@ func (self *multiChannelImpl) Txer(underlay Underlay) {
}
}

func (self *multiChannelImpl) Rxer(underlay Underlay) {
defer self.CloseUnderlay(underlay)
func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) {
defer self.closeUnderlay(underlay, notifier)

log := pfxlog.ContextLogger(self.Label())
log.Debug("started")
Expand Down Expand Up @@ -592,3 +598,24 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF
}
}
}

func NewCloseNotifier() *CloseNotifier {
return &CloseNotifier{
c: make(chan struct{}),
}
}

type CloseNotifier struct {
c chan struct{}
notified atomic.Bool
}

func (self *CloseNotifier) NotifyClosed() {
if self.notified.CompareAndSwap(false, true) {
close(self.c)
}
}

func (self *CloseNotifier) GetCloseNotify() <-chan struct{} {
return self.c
}
8 changes: 6 additions & 2 deletions multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (self *priorityChannelBase) GetPrioritySender() Sender {
return self.prioritySender
}

func (self *priorityChannelBase) GetNextMsgDefault() (Sendable, error) {
func (self *priorityChannelBase) GetNextMsgDefault(notifier *CloseNotifier) (Sendable, error) {
select {
case msg := <-self.defaultMsgChan:
return msg, nil
Expand All @@ -229,17 +229,21 @@ func (self *priorityChannelBase) GetNextMsgDefault() (Sendable, error) {
return msg, nil
case <-self.GetCloseNotify():
return nil, io.EOF
case <-notifier.GetCloseNotify():
return nil, io.EOF
}
}

func (self *priorityChannelBase) GetNextPriorityMsg() (Sendable, error) {
func (self *priorityChannelBase) GetNextPriorityMsg(notifier *CloseNotifier) (Sendable, error) {
select {
case msg := <-self.priorityMsgChan:
return msg, nil
case msg := <-self.retryMsgChan:
return msg, nil
case <-self.GetCloseNotify():
return nil, io.EOF
case <-notifier.GetCloseNotify():
return nil, io.EOF
}
}

Expand Down
Loading