Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 80fa1bc

Browse files
authored
Fix panic when specifying a nil session ID (#232)
Propagate the session ID selected by the broker when a nil session ID is specified. This requires an updated go-amqp to work. Removed validation of EnqueuedSequenceNumber as it's not guaranteed to be returned.
1 parent 319bf88 commit 80fa1bc

File tree

5 files changed

+31
-15
lines changed

5 files changed

+31
-15
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.12
55
require (
66
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
77
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
8-
github.com/Azure/go-amqp v0.13.7
8+
github.com/Azure/go-amqp v0.13.8
99
github.com/Azure/go-autorest/autorest v0.11.18
1010
github.com/Azure/go-autorest/autorest/adal v0.9.13
1111
github.com/Azure/go-autorest/autorest/date v0.3.0

go.sum

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGib
33
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
44
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
55
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
6-
github.com/Azure/go-amqp v0.13.7 h1:ukcCtx138ZmOfHbdALuh9yoJhGtOY3+yaKApfzNvhSk=
7-
github.com/Azure/go-amqp v0.13.7/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI=
6+
github.com/Azure/go-amqp v0.13.8 h1:EGDxD/Iyzs65DX0h5Tc8k9czVBjJ4lmZ16E8aQD/d7Y=
7+
github.com/Azure/go-amqp v0.13.8/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI=
88
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
99
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
10-
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
1110
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
1211
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
1312
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
@@ -16,15 +15,13 @@ github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZ
1615
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
1716
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
1817
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
19-
github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c=
2018
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
2119
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
2220
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
2321
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
2422
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
2523
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
2624
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
27-
github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE=
2825
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
2926
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
3027
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
@@ -35,7 +32,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
3532
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3633
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
3734
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
38-
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
3935
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
4036
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
4137
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
@@ -100,7 +96,6 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
10096
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
10197
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
10298
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
103-
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
10499
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
105100
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
106101
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

queue_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,6 @@ func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
508508
HandlerFunc(func(ctx context.Context, msg *Message) error {
509509
sp := msg.SystemProperties
510510
assert.NotNil(t, sp.LockedUntil, "LockedUntil")
511-
assert.NotNil(t, sp.EnqueuedSequenceNumber, "EnqueuedSequenceNumber")
512511
assert.NotNil(t, sp.EnqueuedTime, "EnqueuedTime")
513512
assert.NotNil(t, sp.SequenceNumber, "SequenceNumber")
514513
assert.NotNil(t, sp.PartitionID, "PartitionID")

receiver.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ package servicebus
2424

2525
import (
2626
"context"
27+
"errors"
28+
"fmt"
2729
"sync"
2830
"time"
2931

@@ -33,6 +35,8 @@ import (
3335
"github.com/devigned/tab"
3436
)
3537

38+
const sessionFilterName = "com.microsoft:session-filter"
39+
3640
type (
3741
// Receiver provides connection, session and link handling for a receiving to an entity path
3842
Receiver struct {
@@ -408,33 +412,43 @@ func (r *Receiver) newSessionAndLink(ctx context.Context) error {
408412
opts = append(opts, amqp.LinkSenderSettle(amqp.ModeSettled))
409413
}
410414

411-
if opt, ok := r.getSessionFilterLinkOption(); ok {
412-
opts = append(opts, opt)
415+
sessionOpt, useSessionOpt := r.getSessionFilterLinkOption()
416+
if useSessionOpt {
417+
opts = append(opts, sessionOpt)
413418
}
414419

415420
amqpReceiver, err := amqpSession.NewReceiver(opts...)
416421
if err != nil {
417422
tab.For(ctx).Error(err)
418423
return err
419424
}
420-
421425
r.receiver = amqpReceiver
426+
if useSessionOpt {
427+
rawsid := r.receiver.LinkSourceFilterValue(sessionFilterName)
428+
if rawsid == nil && r.sessionID == nil {
429+
return errors.New("failed to create a receiver. no unlocked sessions available")
430+
} else if rawsid != nil && r.sessionID != nil && rawsid != *r.sessionID {
431+
return fmt.Errorf("failed to create a receiver for session %s, it may be locked by another receiver", rawsid)
432+
} else if r.sessionID == nil {
433+
sid := rawsid.(string)
434+
r.sessionID = &sid
435+
}
436+
}
422437
return nil
423438
}
424439

425440
func (r *Receiver) getSessionFilterLinkOption() (amqp.LinkOption, bool) {
426-
const name = "com.microsoft:session-filter"
427441
const code = uint64(0x00000137000000C)
428442

429443
if !r.useSessions {
430444
return nil, false
431445
}
432446

433447
if r.sessionID == nil {
434-
return amqp.LinkSourceFilter(name, code, nil), true
448+
return amqp.LinkSourceFilter(sessionFilterName, code, nil), true
435449
}
436450

437-
return amqp.LinkSourceFilter(name, code, r.sessionID), true
451+
return amqp.LinkSourceFilter(sessionFilterName, code, r.sessionID), true
438452
}
439453

440454
func messageID(msg *amqp.Message) interface{} {

session.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,10 @@ func (qs *QueueSession) ensureReceiver(ctx context.Context) error {
336336
}
337337

338338
qs.receiver = r
339+
if qs.sessionID == nil {
340+
// propagate the acquired session ID from the receiver
341+
qs.sessionID = qs.receiver.sessionID
342+
}
339343
return nil
340344
}
341345

@@ -474,6 +478,10 @@ func (ss *SubscriptionSession) ensureReceiver(ctx context.Context) error {
474478
}
475479

476480
ss.receiver = r
481+
if ss.sessionID == nil {
482+
// propagate the acquired session ID from the receiver
483+
ss.sessionID = ss.receiver.sessionID
484+
}
477485
return nil
478486
}
479487

0 commit comments

Comments
 (0)