Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit e882857

Browse files
Use link caching for $management operations (renew locks) (#248)
Make it so we use a single management link (most of the time) when doing dispositions or lock renewal (or other management related operations). Also, added in a simple stress test. It's a bit manual at the moment, but it initiates 1000 concurrent renew lock calls at once, so it's a decent test of our response routing.
1 parent 47f4640 commit e882857

File tree

8 files changed

+370
-39
lines changed

8 files changed

+370
-39
lines changed

changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Change Log
22

3+
## `v0.11.0`
4+
5+
- Use a singleton instance of the management link, avoiding creating a link per management
6+
link operations like dispositions or lock renewals.
7+
[PR#248](https://github.com/Azure/azure-service-bus-go/pull/248)
8+
39
## `v0.10.16`
410

511
- fixed an issue where links weren't being closed when retrying

entity.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type (
2222
}
2323

2424
receivingEntity struct {
25-
renewMessageLockMutex sync.Mutex
2625
*entity
2726
}
2827

@@ -192,9 +191,6 @@ func (re *receivingEntity) RenewLocks(ctx context.Context, messages ...*Message)
192191
ctx, span := re.startSpanFromContext(ctx, "sb.receivingEntity.RenewLocks")
193192
defer span.End()
194193

195-
re.renewMessageLockMutex.Lock()
196-
defer re.renewMessageLockMutex.Unlock()
197-
198194
client, err := re.entity.GetRPCClient(ctx)
199195
if err != nil {
200196
tab.For(ctx).Error(err)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ module github.com/Azure/azure-service-bus-go
33
go 1.12
44

55
require (
6-
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
6+
github.com/Azure/azure-amqp-common-go/v3 v3.1.2
77
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
8-
github.com/Azure/go-amqp v0.13.11
8+
github.com/Azure/go-amqp v0.13.13
99
github.com/Azure/go-autorest/autorest v0.11.18
1010
github.com/Azure/go-autorest/autorest/adal v0.9.13
1111
github.com/Azure/go-autorest/autorest/date v0.3.0

go.sum

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,23 @@
1-
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q=
2-
github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
1+
github.com/Azure/azure-amqp-common-go/v3 v3.1.2 h1:Xu9nboejX9a+7uOGiwEUlpD8/oN4A6f7ZmHTMmJpPkw=
2+
github.com/Azure/azure-amqp-common-go/v3 v3.1.2/go.mod h1:zN7QL/vfCsq3XQxQaTkg4ScO786CA2rQnZ1LXX7QryE=
33
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
44
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
5-
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
6-
github.com/Azure/go-amqp v0.13.11 h1:E28zKoWuzO4+D80iUD88BUorI5PqvIZ/S/77md3hIvA=
7-
github.com/Azure/go-amqp v0.13.11/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk=
5+
github.com/Azure/go-amqp v0.13.13 h1:OBPwCO50EzniOyZR0M4VbGJYDxceIy3SFOnKVMJktdY=
6+
github.com/Azure/go-amqp v0.13.13/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk=
87
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
98
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
10-
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
119
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
1210
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
13-
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
1411
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
1512
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
1613
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
1714
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
18-
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
1915
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
2016
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
2117
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
2218
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
2319
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
2420
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
25-
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
2621
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
2722
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
2823
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
@@ -32,7 +27,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
3227
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3328
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
3429
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
35-
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
3630
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
3731
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
3832
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@@ -95,8 +89,6 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
9589
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
9690
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
9791
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
98-
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
99-
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10092
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
10193
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10294
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
12+
servicebus "github.com/Azure/azure-service-bus-go"
13+
"github.com/joho/godotenv"
14+
)
15+
16+
const queuePrefetch = 1000
17+
const renewalsPerMessage = 2
18+
19+
func main() {
20+
ctx := context.Background()
21+
22+
godotenv.Load(".env")
23+
cs := os.Getenv("SERVICEBUS_CONNECTION_STRING")
24+
queueName := "samples"
25+
26+
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(cs))
27+
28+
if err != nil {
29+
log.Fatalf("Failed to create namespace client: %s", err.Error())
30+
}
31+
32+
sender, err := ns.NewSender(ctx, queueName)
33+
34+
if err != nil {
35+
log.Fatalf("Failed to create sender: %s", err.Error())
36+
}
37+
38+
ch := make(chan bool, 100)
39+
40+
for i := 0; i < queuePrefetch; i++ {
41+
ch <- true
42+
go func(i int) {
43+
defer func() { <-ch }()
44+
if err = sender.Send(ctx, &servicebus.Message{Data: []byte(fmt.Sprintf("hello world %d", i))}); err != nil {
45+
log.Fatalf("Failed to send message: %s", err.Error())
46+
}
47+
}(i)
48+
}
49+
50+
queue, err := ns.NewQueue(queueName, servicebus.QueueWithPrefetchCount(queuePrefetch))
51+
52+
if err != nil {
53+
log.Fatalf("Failed to create receiver: %s", err.Error())
54+
}
55+
56+
renewals := int32(0)
57+
outstandingRenewals := int32(0)
58+
failedRenewals := int32(0)
59+
lastRenewalWasFailure := int32(0)
60+
61+
wg := &sync.WaitGroup{}
62+
63+
go func() {
64+
ticker := time.NewTicker(time.Second * 5)
65+
66+
for {
67+
select {
68+
case <-ticker.C:
69+
log.Printf("Messages: [total: %d, outstanding: %d, failed: %d]", atomic.LoadInt32(&renewals), atomic.LoadInt32(&outstandingRenewals), atomic.LoadInt32(&failedRenewals))
70+
}
71+
}
72+
}()
73+
74+
err = queue.Receive(ctx, servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
75+
wg.Add(1)
76+
77+
go func() {
78+
worked := false
79+
80+
for i := 0; i < renewalsPerMessage; i++ {
81+
atomic.AddInt32(&outstandingRenewals, 1)
82+
if err := queue.RenewLocks(ctx, m); err != nil {
83+
worked = false
84+
atomic.AddInt32(&failedRenewals, 1)
85+
log.Printf("ERROR renewing: %+v", err)
86+
} else {
87+
worked = true
88+
}
89+
90+
atomic.AddInt32(&outstandingRenewals, -1)
91+
atomic.AddInt32(&renewals, 1)
92+
}
93+
94+
m.Complete(ctx)
95+
96+
if !worked {
97+
log.Printf("Last renewal was a failure here.")
98+
atomic.AddInt32(&lastRenewalWasFailure, 1)
99+
}
100+
101+
wg.Done()
102+
}()
103+
104+
return nil
105+
}))
106+
107+
log.Printf("Last Renewal was failure: %d", lastRenewalWasFailure)
108+
109+
if err != nil {
110+
log.Fatalf("Failed to receive messages")
111+
}
112+
113+
wg.Wait()
114+
}

namespace.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ type (
7575
initRefresh sync.Once
7676
// populated with the result from auto-refresh, to be called elsewhere
7777
cancelRefresh func() <-chan struct{}
78+
79+
// for testing
80+
81+
// alias for 'amqp.Dial'
82+
amqpDial func(addr string, opts ...amqp.ConnOption) (*amqp.Client, error)
7883
}
7984

8085
// NamespaceOption provides structure for configuring a new Service Bus namespace
@@ -189,6 +194,7 @@ func NamespaceWithTokenProvider(provider auth.TokenProvider) NamespaceOption {
189194
func NewNamespace(opts ...NamespaceOption) (*Namespace, error) {
190195
ns := &Namespace{
191196
Environment: azure.PublicCloud,
197+
amqpDial: amqp.Dial,
192198
}
193199

194200
for _, opt := range opts {
@@ -235,7 +241,7 @@ func (ns *Namespace) newClient(ctx context.Context) (*amqp.Client, error) {
235241
return amqp.New(nConn, append(defaultConnOptions, amqp.ConnServerHostname(ns.getHostname()))...)
236242
}
237243

238-
return amqp.Dial(ns.getAMQPHostURI(), defaultConnOptions...)
244+
return ns.amqpDial(ns.getAMQPHostURI(), defaultConnOptions...)
239245
}
240246

241247
// negotiateClaim performs initial authentication and starts periodic refresh of credentials.

0 commit comments

Comments
 (0)