Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ type multiChannelImpl struct {
// https://github.com/golang/go/issues/36606
lastRead int64

ownerId string
channelId string
logicalName string
certs concurrenz.AtomicValue[[]*x509.Certificate]
headers concurrenz.AtomicValue[map[int32][]byte]
ownerId string
channelId string
logicalName string
fallbackUnderlay atomic.Pointer[Underlay]

options *Options
waiters waiterMap
Expand Down Expand Up @@ -115,8 +114,7 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) {
impl.flags.Set(flagInjectUnderlayType, config.InjectUnderlayTypeIntoMessages)

impl.ownerId = config.Underlay.Id()
impl.certs.Store(config.Underlay.Certificates())
impl.headers.Store(config.Underlay.Headers())
impl.fallbackUnderlay.Store(&config.Underlay)
impl.underlays.Append(config.Underlay)

groupSecret := config.Underlay.Headers()[GroupSecretHeader]
Expand Down Expand Up @@ -164,8 +162,7 @@ func (self *multiChannelImpl) AcceptUnderlay(underlay Underlay) error {
return fmt.Errorf("new underlay for '%s' not accepted: multi-channel is closed", self.ConnectionId())
}

self.certs.Store(underlay.Certificates())
self.headers.Store(underlay.Headers())
self.fallbackUnderlay.Store(&underlay)
self.underlays.Append(underlay)

self.startMultiplex(underlay)
Expand Down Expand Up @@ -223,19 +220,15 @@ func (self *multiChannelImpl) ConnectionId() string {
}

func (self *multiChannelImpl) Certificates() []*x509.Certificate {
return self.certs.Load()
return self.Underlay().Certificates()
}

func (self *multiChannelImpl) Headers() map[int32][]byte {
return self.headers.Load()
return self.Underlay().Headers()
}

func (self *multiChannelImpl) Label() string {
if u := self.Underlay(); u != nil {
return fmt.Sprintf("ch{%s}->%s", self.LogicalName(), u.Label())
} else {
return fmt.Sprintf("ch{%s}->{}", self.LogicalName())
}
return fmt.Sprintf("ch{%s}->%s", self.LogicalName(), self.Underlay().Label())
}

func (self *multiChannelImpl) GetOptions() *Options {
Expand Down Expand Up @@ -329,10 +322,7 @@ func (self *multiChannelImpl) IsClosed() bool {
}

func (self *multiChannelImpl) Underlay() Underlay {
if underlays := self.underlays.Value(); len(underlays) > 0 {
return underlays[0]
}
return nil
return *self.fallbackUnderlay.Load()
}

func (self *multiChannelImpl) Rx(m *Message) {
Expand Down Expand Up @@ -453,6 +443,13 @@ func (self *multiChannelImpl) closeUnderlay(underlay Underlay, notifier *CloseNo
}
return false
})
if *self.fallbackUnderlay.Load() == underlay {
underlays := self.underlays.Value()
if len(underlays) > 0 {
lastUnderlay := underlays[len(underlays)-1]
self.fallbackUnderlay.Store(&lastUnderlay)
}
}
self.lock.Unlock()

if underlayRemoved {
Expand Down Expand Up @@ -617,7 +614,7 @@ func (self *UnderlayConstraints) countsShowValidState(ch MultiChannel, counts ma
WithField("underlays", counts).
Info("not enough total open underlays, closing multi-underlay channel")
if err := ch.Close(); err != nil {
pfxlog.Logger().WithError(err).Error("error closing underlay")
pfxlog.Logger().WithError(err).Error("error closing channel")
}
}
return false
Expand Down