Skip to content

Commit d1d0a7f

Browse files
[azservicebus] Add a helper function to get a Message from a ReceivedMessage (Azure#21472)
Fixes Azure#21469
1 parent 5b55feb commit d1d0a7f

File tree

4 files changed

+135
-22
lines changed

4 files changed

+135
-22
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- `ReceivedMessage` can be converted to a `Message` for easier re-sending, using `ReceivedMessage.Message()`. PR#21472
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/messaging/azservicebus/message.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,26 @@ type ReceivedMessage struct {
132132
deferred bool
133133
}
134134

135+
// Message creates a shallow copy of the fields from this message to an instance of
136+
// [Message].
137+
func (rm *ReceivedMessage) Message() *Message {
138+
return &Message{
139+
ApplicationProperties: rm.ApplicationProperties,
140+
Body: rm.Body,
141+
ContentType: rm.ContentType,
142+
CorrelationID: rm.CorrelationID,
143+
MessageID: &rm.MessageID,
144+
PartitionKey: rm.PartitionKey,
145+
ReplyTo: rm.ReplyTo,
146+
ReplyToSessionID: rm.ReplyToSessionID,
147+
ScheduledEnqueueTime: rm.ScheduledEnqueueTime,
148+
SessionID: rm.SessionID,
149+
Subject: rm.Subject,
150+
TimeToLive: rm.TimeToLive,
151+
To: rm.To,
152+
}
153+
}
154+
135155
// MessageState represents the current state of a message (Active, Scheduled, Deferred).
136156
type MessageState int32
137157

@@ -284,28 +304,6 @@ func (m *Message) toAMQPMessage() *amqp.Message {
284304
amqpMsg.Annotations[scheduledEnqueuedTimeAnnotation] = *m.ScheduledEnqueueTime
285305
}
286306

287-
// TODO: These are 'received' message properties so I believe their inclusion here was just an artifact of only
288-
// having one message type.
289-
290-
// if m.SystemProperties != nil {
291-
// // Set the raw annotations first (they may be nil) and add the explicit
292-
// // system properties second to ensure they're set properly.
293-
// amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, m.SystemProperties.Annotations)
294-
295-
// sysPropMap, err := encodeStructureToMap(m.SystemProperties)
296-
// if err != nil {
297-
// return nil, err
298-
// }
299-
// amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, sysPropMap)
300-
// }
301-
302-
// if m.LockToken != nil {
303-
// if amqpMsg.DeliveryAnnotations == nil {
304-
// amqpMsg.DeliveryAnnotations = make(amqp.Annotations)
305-
// }
306-
// amqpMsg.DeliveryAnnotations[lockTokenName] = *m.LockToken
307-
// }
308-
309307
return amqpMsg
310308
}
311309

sdk/messaging/azservicebus/message_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,67 @@ func TestMessageWithIncorrectBody(t *testing.T) {
216216
}, "receiving_link")
217217
require.Nil(t, message.Body)
218218
}
219+
220+
func TestReceivedMessageToMessage(t *testing.T) {
221+
rm := &ReceivedMessage{
222+
ApplicationProperties: map[string]any{
223+
"hello": "world",
224+
},
225+
Body: []byte("body content"),
226+
ContentType: to.Ptr("content type"),
227+
CorrelationID: to.Ptr("correlation ID"),
228+
DeadLetterErrorDescription: to.Ptr("dead letter error description"),
229+
DeadLetterReason: to.Ptr("dead letter reason"),
230+
DeadLetterSource: to.Ptr("dead letter source"),
231+
DeliveryCount: 9,
232+
EnqueuedSequenceNumber: to.Ptr[int64](101),
233+
EnqueuedTime: mustParseTime("2023-01-01T01:02:03Z"),
234+
ExpiresAt: mustParseTime("2023-01-02T01:02:03Z"),
235+
LockedUntil: mustParseTime("2023-01-03T01:02:03Z"),
236+
LockToken: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
237+
MessageID: "message ID",
238+
PartitionKey: to.Ptr("partition key"),
239+
ReplyTo: to.Ptr("reply to"),
240+
ReplyToSessionID: to.Ptr("reply to session id"),
241+
ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"),
242+
SequenceNumber: to.Ptr[int64](102),
243+
SessionID: to.Ptr("session id"),
244+
State: 10,
245+
Subject: to.Ptr("subject"),
246+
TimeToLive: to.Ptr(time.Second),
247+
To: to.Ptr("to"),
248+
RawAMQPMessage: &AMQPAnnotatedMessage{}, // doesn't exist on `Message`, ignored.
249+
}
250+
251+
msg := rm.Message()
252+
253+
expectedMsg := &Message{
254+
ApplicationProperties: map[string]any{
255+
"hello": "world",
256+
},
257+
Body: []byte("body content"),
258+
ContentType: to.Ptr("content type"),
259+
CorrelationID: to.Ptr("correlation ID"),
260+
MessageID: to.Ptr("message ID"),
261+
PartitionKey: to.Ptr("partition key"),
262+
ReplyTo: to.Ptr("reply to"),
263+
ReplyToSessionID: to.Ptr("reply to session id"),
264+
ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"),
265+
SessionID: to.Ptr("session id"),
266+
Subject: to.Ptr("subject"),
267+
TimeToLive: to.Ptr(time.Second),
268+
To: to.Ptr("to"),
269+
}
270+
271+
require.Equal(t, msg, expectedMsg)
272+
}
273+
274+
func mustParseTime(str string) *time.Time {
275+
tm, err := time.Parse(time.RFC3339, str)
276+
277+
if err != nil {
278+
panic(err)
279+
}
280+
281+
return &tm
282+
}

sdk/messaging/azservicebus/receiver_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,55 @@ func TestReceiverUnauthorizedCreds(t *testing.T) {
892892
})
893893
}
894894

895+
func TestReceiveAndSendAndReceive(t *testing.T) {
896+
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
897+
defer cleanup()
898+
899+
sender, err := serviceBusClient.NewSender(queueName, nil)
900+
require.NoError(t, err)
901+
defer sender.Close(context.Background())
902+
903+
scheduledEnqueuedTime := time.Now()
904+
905+
err = sender.SendMessage(context.Background(), &Message{
906+
Body: []byte("body text"),
907+
ApplicationProperties: map[string]any{
908+
"hello": "world",
909+
},
910+
ContentType: to.Ptr("application/text"),
911+
CorrelationID: to.Ptr("correlation ID"),
912+
MessageID: to.Ptr("message id"),
913+
PartitionKey: to.Ptr("session id"),
914+
ReplyTo: to.Ptr("reply to"),
915+
ReplyToSessionID: to.Ptr("reply to session id"),
916+
ScheduledEnqueueTime: &scheduledEnqueuedTime,
917+
SessionID: to.Ptr("session id"),
918+
Subject: to.Ptr("subject"),
919+
TimeToLive: to.Ptr(time.Minute),
920+
To: to.Ptr("to"),
921+
}, nil)
922+
require.NoError(t, err)
923+
924+
receiver, err := serviceBusClient.NewReceiverForQueue(queueName, &ReceiverOptions{
925+
ReceiveMode: ReceiveModeReceiveAndDelete,
926+
})
927+
require.NoError(t, err)
928+
929+
msgs, err := receiver.ReceiveMessages(context.Background(), 1, nil)
930+
require.NoError(t, err)
931+
require.Equal(t, "body text", string(msgs[0].Body))
932+
933+
// re-send
934+
err = sender.SendMessage(context.Background(), msgs[0].Message(), nil)
935+
require.NoError(t, err)
936+
937+
// re-receive
938+
rereceivedMsgs, err := receiver.ReceiveMessages(context.Background(), 1, nil)
939+
require.NoError(t, err)
940+
941+
require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending")
942+
}
943+
895944
type receivedMessageSlice []*ReceivedMessage
896945

897946
func (messages receivedMessageSlice) Len() int {

0 commit comments

Comments
 (0)