Skip to content

Commit 1f9c5ee

Browse files
authored
Merge pull request #1 from edadeal/delivery_wrapper
wrap delivery methods with concurrent safe alternatives
2 parents 218cee7 + 3fcb882 commit 1f9c5ee

File tree

1 file changed

+54
-17
lines changed

1 file changed

+54
-17
lines changed

lepus.go

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (c *Channel) ConsumeMessages(queue, consumer string, autoAck, exclusive, no
174174
go func() {
175175
for msg := range d {
176176
msg.Acknowledger = c
177-
wd <- Delivery{msg}
177+
wd <- Delivery{msg, 0}
178178
}
179179
}()
180180

@@ -184,12 +184,49 @@ func (c *Channel) ConsumeMessages(queue, consumer string, autoAck, exclusive, no
184184
// Delivery is a superset of amqp.Delivery
185185
type Delivery struct {
186186
amqp.Delivery
187+
acked int32
188+
}
189+
190+
// Nack is a concurrent safe wrapper around standart AMQP Nack
191+
func (d *Delivery) Nack(multiple, requeue bool) error {
192+
if atomic.CompareAndSwapInt32(&d.acked, 0, 1) {
193+
err := d.Delivery.Nack(multiple, requeue)
194+
if err != nil {
195+
atomic.StoreInt32(&d.acked, 0)
196+
}
197+
return err
198+
}
199+
return nil
200+
}
201+
202+
// Ack is a concurrent safe wrapper around standart AMQP Ack
203+
func (d *Delivery) Ack(multiple bool) error {
204+
if atomic.CompareAndSwapInt32(&d.acked, 0, 1) {
205+
err := d.Delivery.Ack(multiple)
206+
if err != nil {
207+
atomic.StoreInt32(&d.acked, 0)
208+
}
209+
return err
210+
}
211+
return nil
212+
}
213+
214+
// Reject is a concurrent safe wrapper around standart AMQP Reject
215+
func (d *Delivery) Reject(requeue bool) error {
216+
if atomic.CompareAndSwapInt32(&d.acked, 0, 1) {
217+
err := d.Delivery.Reject(requeue)
218+
if err != nil {
219+
atomic.StoreInt32(&d.acked, 0)
220+
}
221+
return err
222+
}
223+
return nil
187224
}
188225

189226
// NackDelayed nacks message without requeue and publishes it again
190227
// without modification back to tail of queue
191228
func (d *Delivery) NackDelayed(multiple, mandatory, immediate bool) (State, error) {
192-
ch, ok := d.Acknowledger.(*Channel)
229+
ch, ok := d.Delivery.Acknowledger.(*Channel)
193230
if !ok {
194231
return StateUnknown, errors.New("Acknowledger is not of type *lepus.Channel")
195232
}
@@ -199,20 +236,20 @@ func (d *Delivery) NackDelayed(multiple, mandatory, immediate bool) (State, erro
199236
return StateUnknown, err
200237
}
201238

202-
return ch.PublishAndWait(d.Exchange, d.RoutingKey, mandatory, immediate, amqp.Publishing{
203-
Headers: d.Headers,
204-
ContentType: d.ContentType,
205-
ContentEncoding: d.ContentEncoding,
206-
DeliveryMode: d.DeliveryMode,
207-
Priority: d.Priority,
208-
CorrelationId: d.CorrelationId,
209-
ReplyTo: d.ReplyTo,
210-
Expiration: d.Expiration,
211-
MessageId: d.MessageId,
212-
Timestamp: d.Timestamp,
213-
Type: d.Type,
214-
UserId: d.UserId,
215-
AppId: d.AppId,
216-
Body: d.Body,
239+
return ch.PublishAndWait(d.Delivery.Exchange, d.Delivery.RoutingKey, mandatory, immediate, amqp.Publishing{
240+
Headers: d.Delivery.Headers,
241+
ContentType: d.Delivery.ContentType,
242+
ContentEncoding: d.Delivery.ContentEncoding,
243+
DeliveryMode: d.Delivery.DeliveryMode,
244+
Priority: d.Delivery.Priority,
245+
CorrelationId: d.Delivery.CorrelationId,
246+
ReplyTo: d.Delivery.ReplyTo,
247+
Expiration: d.Delivery.Expiration,
248+
MessageId: d.Delivery.MessageId,
249+
Timestamp: d.Delivery.Timestamp,
250+
Type: d.Delivery.Type,
251+
UserId: d.Delivery.UserId,
252+
AppId: d.Delivery.AppId,
253+
Body: d.Delivery.Body,
217254
})
218255
}

0 commit comments

Comments
 (0)