Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit f536d5c

Browse files
authored
Use amqp HandleMessage Func (#207)
* use amqp HandleMessage Func * add missing MIT license header * bump amqp
1 parent 9ac8a37 commit f536d5c

File tree

4 files changed

+139
-50
lines changed

4 files changed

+139
-50
lines changed

amqphandler.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package servicebus
2+
3+
// MIT License
4+
//
5+
// Copyright (c) Microsoft Corporation. All rights reserved.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in all
15+
// copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
// SOFTWARE
24+
25+
import (
26+
"context"
27+
28+
"github.com/Azure/go-amqp"
29+
"github.com/devigned/tab"
30+
)
31+
32+
type amqpHandler interface {
33+
Handle(ctx context.Context, msg *amqp.Message) error
34+
}
35+
36+
// amqpAdapterHandler is a middleware handler that translates amqp messages into servicebus messages
37+
type amqpAdapterHandler struct {
38+
next Handler
39+
receiver *Receiver
40+
}
41+
42+
func newAmqpAdapterHandler(receiver *Receiver, next Handler) *amqpAdapterHandler {
43+
return &amqpAdapterHandler{
44+
next: next,
45+
receiver: receiver,
46+
}
47+
}
48+
49+
func (h *amqpAdapterHandler) Handle(ctx context.Context, msg *amqp.Message) error {
50+
const optName = "sb.amqpHandler.Handle"
51+
52+
event, err := messageFromAMQPMessage(msg)
53+
if err != nil {
54+
_, span := h.receiver.startConsumerSpanFromContext(ctx, optName)
55+
span.Logger().Error(err)
56+
h.receiver.lastError = err
57+
if h.receiver.doneListening != nil {
58+
h.receiver.doneListening()
59+
}
60+
return err
61+
}
62+
63+
ctx, span := tab.StartSpanWithRemoteParent(ctx, optName, event)
64+
defer span.End()
65+
66+
id := messageID(msg)
67+
if idStr, ok := id.(string); ok {
68+
span.AddAttributes(tab.StringAttribute("amqp.message.id", idStr))
69+
}
70+
71+
if err := h.next.Handle(ctx, event); err != nil {
72+
// stop handling messages since the message consumer ran into an unexpected error
73+
h.receiver.lastError = err
74+
if h.receiver.doneListening != nil {
75+
h.receiver.doneListening()
76+
}
77+
return err
78+
}
79+
80+
// nothing more to be done. The message was settled when it was accepted by the Receiver
81+
if h.receiver.mode == ReceiveAndDeleteMode {
82+
return nil
83+
}
84+
85+
// nothing more to be done. The Receiver has no default disposition, so the handler is solely responsible for
86+
// disposition
87+
if h.receiver.DefaultDisposition == nil {
88+
return nil
89+
}
90+
91+
// default disposition is set, so try to send the disposition. If the message disposition has already been set, the
92+
// underlying AMQP library will ignore the second disposition respecting the disposition of the handler func.
93+
if err := h.receiver.DefaultDisposition(ctx); err != nil {
94+
// if an error is returned by the default disposition, then we must alert the message consumer as we can't
95+
// be sure the final message disposition.
96+
tab.For(ctx).Error(err)
97+
h.receiver.lastError = err
98+
if h.receiver.doneListening != nil {
99+
h.receiver.doneListening()
100+
}
101+
return nil
102+
}
103+
return nil
104+
}

go.mod

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ module github.com/Azure/azure-service-bus-go
33
go 1.12
44

55
require (
6-
github.com/Azure/azure-amqp-common-go/v3 v3.0.1
7-
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible
8-
github.com/Azure/go-amqp v0.13.1
9-
github.com/Azure/go-autorest/autorest v0.11.7
10-
github.com/Azure/go-autorest/autorest/adal v0.9.4
6+
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
7+
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
8+
github.com/Azure/go-amqp v0.13.4
9+
github.com/Azure/go-autorest/autorest v0.11.18
10+
github.com/Azure/go-autorest/autorest/adal v0.9.13
1111
github.com/Azure/go-autorest/autorest/date v0.3.0
1212
github.com/Azure/go-autorest/autorest/to v0.4.0
13-
github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect
1413
github.com/devigned/tab v0.1.1
1514
github.com/joho/godotenv v1.3.0
1615
github.com/mitchellh/mapstructure v1.3.3

go.sum

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,33 @@
1-
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA=
2-
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
3-
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible h1:aFlw3lP7ZHQi4m1kWCpcwYtczhDkGhDoRaMTaxcOf68=
4-
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
1+
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q=
2+
github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
3+
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
4+
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
55
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
6-
github.com/Azure/go-amqp v0.13.1 h1:dXnEJ89Hf7wMkcBbLqvocZlM4a3uiX9uCxJIvU77+Oo=
7-
github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
6+
github.com/Azure/go-amqp v0.13.4 h1:PtnBdGcNBcuSOdbBPEov6EKROFOOCXMio7lxm7RerZg=
7+
github.com/Azure/go-amqp v0.13.4/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI=
88
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
99
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
1010
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
1111
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
12-
github.com/Azure/go-autorest/autorest v0.11.7 h1:lHnVA0bNUzUw2tYgGiwmOrlBi/VgmaTYfMbsww/7o2A=
13-
github.com/Azure/go-autorest/autorest v0.11.7/go.mod h1:V6p3pKZx1KKkJubbxnDWrzNhEIfOy/pTGasLqzHIPHs=
12+
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
13+
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
1414
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
15-
github.com/Azure/go-autorest/autorest/adal v0.9.1 h1:xjPqigMQe2+0DAJ5A6MLUPp5D2r2Io8qHCuCMMI/yJU=
16-
github.com/Azure/go-autorest/autorest/adal v0.9.1/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
17-
github.com/Azure/go-autorest/autorest/adal v0.9.4 h1:1/DtH4Szusk4psLBrJn/gocMRIf1ji30WAz3GfyULRQ=
18-
github.com/Azure/go-autorest/autorest/adal v0.9.4/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE=
15+
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
16+
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
1917
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
2018
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
2119
github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c=
2220
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
21+
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
2322
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
2423
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
2524
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
26-
github.com/Azure/go-autorest/autorest/validation v0.3.0 h1:3I9AAI63HfcLtphd9g39ruUwRI+Ca+z/f36KHPFRUss=
27-
github.com/Azure/go-autorest/autorest/validation v0.3.0/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
25+
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
26+
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
2827
github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE=
2928
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
29+
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
30+
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
3031
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
3132
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
3233
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -36,6 +37,8 @@ github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
3637
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
3738
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
3839
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
40+
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
41+
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
3942
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
4043
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
4144
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
@@ -99,6 +102,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
99102
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
100103
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
101104
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
105+
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
106+
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
102107
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
103108
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
104109
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

receiver.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,12 @@ func (r *Receiver) ReceiveOne(ctx context.Context, handler Handler) error {
197197
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.ReceiveOne")
198198
defer span.End()
199199

200-
amqpMsg, err := r.listenForMessage(ctx)
200+
err := r.listenForMessage(ctx, newAmqpAdapterHandler(r, handler))
201201
if err != nil {
202202
tab.For(ctx).Error(err)
203203
return err
204204
}
205205

206-
r.handleMessage(ctx, amqpMsg, handler)
207-
208206
return nil
209207
}
210208

@@ -216,24 +214,14 @@ func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle
216214
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.Listen")
217215
defer span.End()
218216

219-
messages := make(chan *amqp.Message)
220-
go r.listenForMessages(ctx, messages)
221-
go r.handleMessages(ctx, messages, handler)
217+
go r.listenForMessages(ctx, newAmqpAdapterHandler(r, handler))
222218

223219
return &ListenerHandle{
224220
r: r,
225221
ctx: ctx,
226222
}
227223
}
228224

229-
func (r *Receiver) handleMessages(ctx context.Context, messages chan *amqp.Message, handler Handler) {
230-
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.handleMessages")
231-
defer span.End()
232-
for msg := range messages {
233-
r.handleMessage(ctx, msg, handler)
234-
}
235-
}
236-
237225
func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
238226
const optName = "sb.Receiver.handleMessage"
239227

@@ -290,21 +278,19 @@ func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
290278
}
291279
}
292280

293-
func (r *Receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Message) {
281+
func (r *Receiver) listenForMessages(ctx context.Context, handler amqpHandler) {
294282
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessages")
295283
defer span.End()
296284

297285
for {
298-
msg, err := r.listenForMessage(ctx)
286+
err := r.listenForMessage(ctx, handler)
299287
if err == nil {
300-
msgChan <- msg
301288
continue
302289
}
303290

304291
select {
305292
case <-ctx.Done():
306293
tab.For(ctx).Debug("context done")
307-
close(msgChan)
308294
return
309295
default:
310296
_, retryErr := common.Retry(10, 10*time.Second, func() (interface{}, error) {
@@ -332,7 +318,6 @@ func (r *Receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
332318
if err := r.Close(ctx); err != nil {
333319
tab.For(ctx).Error(err)
334320
}
335-
close(msgChan)
336321
return
337322
}
338323
}
@@ -345,30 +330,26 @@ func (r *Receiver) setLastError(err error) {
345330
r.lastErrorMu.Unlock()
346331
}
347332

348-
func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error) {
333+
func (r *Receiver) listenForMessage(ctx context.Context, handler amqpHandler) error {
349334
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessage")
350335
defer span.End()
351336

352337
var receiver *amqp.Receiver
353338
r.clientMu.RLock()
354339
if r.receiver == nil {
355340
r.clientMu.RUnlock()
356-
return nil, r.connClosedError(ctx)
341+
return r.connClosedError(ctx)
357342
}
358343
receiver = r.receiver
359344
r.clientMu.RUnlock()
360-
msg, err := receiver.Receive(ctx)
345+
err := receiver.HandleMessage(ctx, func(message *amqp.Message) error {
346+
return handler.Handle(ctx, message)
347+
})
361348
if err != nil {
362349
tab.For(ctx).Debug(err.Error())
363-
return nil, err
364-
}
365-
366-
id := messageID(msg)
367-
if idStr, ok := id.(string); ok {
368-
span.AddAttributes(tab.StringAttribute("amqp.message.id", idStr))
350+
return err
369351
}
370-
371-
return msg, nil
352+
return nil
372353
}
373354

374355
func (r *Receiver) connClosedError(ctx context.Context) error {

0 commit comments

Comments
 (0)