Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:


- name: golangci-lint
uses: golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v8
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: latest
Expand Down
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: "2"
linters:
settings:
staticcheck:
checks: [ "all", "-ST1000", "-ST1003", "-ST1006", "-ST1020", "-ST1021" ]
3 changes: 3 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Envelope interface {
// WithTimeout returns a TimeoutEnvelope with a context using the given timeout
WithTimeout(duration time.Duration) TimeoutEnvelope

// WithContext returns a TimeoutEnvelope which uses the given context for timeout/cancellation
WithContext(c context.Context) TimeoutEnvelope

// Send sends the envelope on the given Channel
Send(sender Sender) error

Expand Down
4 changes: 2 additions & 2 deletions classic_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func newClassicListener(identity *identity.TokenId, endpoint transport.Address,
closeNotify := make(chan struct{})

poolConfig := goroutines.PoolConfig{
QueueSize: uint32(config.ConnectOptions.MaxQueuedConnects),
QueueSize: uint32(config.MaxQueuedConnects),
MinWorkers: 1,
MaxWorkers: uint32(config.ConnectOptions.MaxOutstandingConnects),
MaxWorkers: uint32(config.MaxOutstandingConnects),
IdleTime: 10 * time.Second,
PanicHandler: func(err interface{}) {
pfxlog.Logger().
Expand Down
12 changes: 6 additions & 6 deletions datagram_underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ func (self *DatagramUnderlay) SetWriteDeadline(deadline time.Time) error {
return self.peer.SetWriteDeadline(deadline)
}

func (impl *DatagramUnderlay) init(id string, connectionId string, headers Headers) {
impl.id = id
impl.connectionId = connectionId
impl.headers = headers
func (self *DatagramUnderlay) init(id string, connectionId string, headers Headers) {
self.id = id
self.connectionId = connectionId
self.headers = headers
}

func (impl *DatagramUnderlay) getPeer() transport.Conn {
return impl.peer
func (self *DatagramUnderlay) getPeer() transport.Conn {
return self.peer
}

func (self *DatagramUnderlay) rxHello() (*Message, error) {
Expand Down
23 changes: 21 additions & 2 deletions envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (self *priorityEnvelopeImpl) WithTimeout(duration time.Duration) TimeoutEnv
}
}

func (self *priorityEnvelopeImpl) WithContext(c context.Context) TimeoutEnvelope {
ctx, cancelF := context.WithCancel(c)
return &envelopeImpl{
msg: self.msg,
p: self.p,
context: ctx,
cancelF: cancelF,
}
}

type envelopeImpl struct {
msg *Message
p Priority
Expand Down Expand Up @@ -139,6 +149,11 @@ func (self *envelopeImpl) WithTimeout(duration time.Duration) TimeoutEnvelope {
return self
}

func (self *envelopeImpl) WithContext(c context.Context) TimeoutEnvelope {
self.context, self.cancelF = context.WithCancel(c)
return self
}

func (self *envelopeImpl) Send(sender Sender) error {
return sender.Send(self)
}
Expand Down Expand Up @@ -270,7 +285,7 @@ type errorEnvelope struct {
ctx context.Context
}

func (self *errorEnvelope) SetSequence(seq int32) {}
func (self *errorEnvelope) SetSequence(int32) {}

func (self *errorEnvelope) Sequence() int32 {
return 0
Expand All @@ -280,7 +295,7 @@ func (self *errorEnvelope) Msg() *Message {
return nil
}

func (self *errorEnvelope) ReplyTo(msg *Message) Envelope {
func (self *errorEnvelope) ReplyTo(*Message) Envelope {
return self
}

Expand Down Expand Up @@ -316,6 +331,10 @@ func (self *errorEnvelope) WithTimeout(time.Duration) TimeoutEnvelope {
return self
}

func (self *errorEnvelope) WithContext(context.Context) TimeoutEnvelope {
return self
}

func (self *errorEnvelope) Send(Sender) error {
return self.ctx.Err()
}
Expand Down
5 changes: 3 additions & 2 deletions existing_conn_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func newExistingImpl(peer net.Conn, version uint32) *existingConnImpl {
readF := ReadV2
marshalF := MarshalV2

if version == 2 {
switch version {
case 2:
readF = ReadV2
marshalF = MarshalV2
} else if version == 3 { // currently only used for testing fallback to a common protocol version
case 3:
readF = ReadV2
marshalF = marshalV3
}
Expand Down
4 changes: 2 additions & 2 deletions impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ func (channel *channelImpl) tx(sendable Sendable, writeTimeout time.Duration) er
return nil
}

func (ch *channelImpl) GetTimeSinceLastRead() time.Duration {
return time.Duration(info.NowInMilliseconds()-atomic.LoadInt64(&ch.lastRead)) * time.Millisecond
func (channel *channelImpl) GetTimeSinceLastRead() time.Duration {
return time.Duration(info.NowInMilliseconds()-atomic.LoadInt64(&channel.lastRead)) * time.Millisecond
}

type waiter struct {
Expand Down
58 changes: 29 additions & 29 deletions memory/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,37 @@ type memoryImpl struct {
closed bool
}

func (impl *memoryImpl) GetLocalAddr() net.Addr {
return addr("local:" + impl.connectionId)
func (self *memoryImpl) GetLocalAddr() net.Addr {
return addr("local:" + self.connectionId)
}

func (impl *memoryImpl) GetRemoteAddr() net.Addr {
return addr("remote:" + impl.connectionId)
func (self *memoryImpl) GetRemoteAddr() net.Addr {
return addr("remote:" + self.connectionId)
}

func (impl *memoryImpl) SetWriteTimeout(time.Duration) error {
func (self *memoryImpl) SetWriteTimeout(time.Duration) error {
panic("SetWriteTimeout not implemented")
}

func (self *memoryImpl) SetWriteDeadline(deadline time.Time) error {
panic("SetWriteDeadline not implemented")
}

func (impl *memoryImpl) Rx() (*channel.Message, error) {
if impl.closed {
func (self *memoryImpl) Rx() (*channel.Message, error) {
if self.closed {
return nil, errors.New("underlay closed")
}

m := <-impl.rx
m := <-self.rx
if m == nil {
return nil, io.EOF
}

return m, nil
}

func (impl *memoryImpl) Tx(m *channel.Message) error {
if impl.closed {
func (self *memoryImpl) Tx(m *channel.Message) error {
if self.closed {
return errors.New("underlay closed")
}
defer func() {
Expand All @@ -88,48 +88,48 @@ func (impl *memoryImpl) Tx(m *channel.Message) error {
}
}()

impl.tx <- m
self.tx <- m

return nil
}

func (impl *memoryImpl) Id() string {
return impl.id.Token
func (self *memoryImpl) Id() string {
return self.id.Token
}

func (impl *memoryImpl) Headers() map[int32][]byte {
return impl.headers
func (self *memoryImpl) Headers() map[int32][]byte {
return self.headers
}

func (impl *memoryImpl) LogicalName() string {
func (self *memoryImpl) LogicalName() string {
return "memory"
}

func (impl *memoryImpl) ConnectionId() string {
return impl.connectionId
func (self *memoryImpl) ConnectionId() string {
return self.connectionId
}

func (impl *memoryImpl) Certificates() []*x509.Certificate {
func (self *memoryImpl) Certificates() []*x509.Certificate {
return nil
}

func (impl *memoryImpl) Label() string {
return fmt.Sprintf("u{%s}->i{%s}", impl.LogicalName(), impl.ConnectionId())
func (self *memoryImpl) Label() string {
return fmt.Sprintf("u{%s}->i{%s}", self.LogicalName(), self.ConnectionId())
}

func (impl *memoryImpl) Close() error {
impl.closeLock.Lock()
defer impl.closeLock.Unlock()
func (self *memoryImpl) Close() error {
self.closeLock.Lock()
defer self.closeLock.Unlock()

if !impl.closed {
impl.closed = true
close(impl.tx)
if !self.closed {
self.closed = true
close(self.tx)
}
return nil
}

func (impl *memoryImpl) IsClosed() bool {
return impl.closed
func (self *memoryImpl) IsClosed() bool {
return self.closed
}

func newMemoryImpl(tx, rx chan *channel.Message) *memoryImpl {
Expand Down
10 changes: 10 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope {
}
}

func (m *Message) WithContext(c context.Context) TimeoutEnvelope {
c, cancelF := context.WithCancel(c)
return &envelopeImpl{
msg: m,
p: Standard,
context: c,
cancelF: cancelF,
}
}

func (m *Message) Context() context.Context {
return context.Background()
}
Expand Down
4 changes: 2 additions & 2 deletions protobufs/protobufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (self TypedResponseImpl) Unmarshall(responseMsg *channel.Message, err error
if err != nil {
return err
}
if responseMsg.ContentType != self.TypedMessage.GetContentType() {
if responseMsg.ContentType != self.GetContentType() {
return errors.Errorf("unexpected response type %v. expected %v",
responseMsg.ContentType, self.TypedMessage.GetContentType())
responseMsg.ContentType, self.GetContentType())
}
if err = proto.Unmarshal(responseMsg.Body, self.TypedMessage); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.1
4.2
8 changes: 4 additions & 4 deletions websockets/ws_underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ func NewUnderlayFactory(id *identity.TokenId, peer *websocket.Conn, certs []*x50
}
}

func (impl *Underlay) GetLocalAddr() net.Addr {
return impl.peer.LocalAddr()
func (self *Underlay) GetLocalAddr() net.Addr {
return self.peer.LocalAddr()
}

func (impl *Underlay) GetRemoteAddr() net.Addr {
return impl.peer.RemoteAddr()
func (self *Underlay) GetRemoteAddr() net.Addr {
return self.peer.RemoteAddr()
}
func (self *Underlay) Create(time.Duration) (channel.Underlay, error) {
return self, nil
Expand Down
Loading