Skip to content

Commit 830ac8f

Browse files
roncemerRon Cemer
andauthored
Use proper channel.Consume() with a timeout, instead of timed retries. (#17)
* Update instructions for producer.py. * Use proper channel.Consume() with a timeout. * Trigger action. * Remove temp file. * Only create one Delivery per queue; keep them in a map by queue name. * Only create one Delivery per queue; keep them in a map by queue name. * Make delivery map when instantiating the struct. * Move channel.Consume() to the Observe() function. * Re-order assignments. * Trigger pipeline. * Remove temp file. * Use newer rabbitmq image. * Return any JSON encoding error. * Copmletely initialize delivery_info before assigning to properties. * Clean up delivery_info map. * Add unit test for RabbitMQ broker. * Increase test timeout. * Fix messages disappearing. * Fix lint. * Fix unit tests. --------- Co-authored-by: Ron Cemer <ron.cemer@lancium.com>
1 parent 358caea commit 830ac8f

File tree

4 files changed

+161
-78
lines changed

4 files changed

+161
-78
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
redis-version: 7
2020
- uses: namoshek/rabbitmq-github-action@v1
2121
with:
22-
version: '3.8.9'
22+
version: '4.1.1'
2323
ports: '5672:5672'
2424
- run: go test -count=1 -v ./...
2525
lint:

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,16 @@ $ celery --app myproject worker --queues important --loglevel=debug --without-he
103103
<summary>Sending tasks from Python and receiving them on Go side.</summary>
104104

105105
```sh
106-
$ python producer.py --protocol=1
106+
$ python producer.py
107107
$ go run ./consumer/
108108
{"msg":"waiting for tasks..."}
109109
received a=fizz b=bazz
110110
```
111111

112+
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
113+
```sh
114+
$ python producer.py --protocol=1
115+
```
112116
</details>
113117

114118
<details>
@@ -214,6 +218,10 @@ $ go run ./consumer/
214218
received a=fizz b=bazz
215219
```
216220

221+
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
222+
```sh
223+
$ python producer.py --protocol=1
224+
```
217225
</details>
218226

219227
## Testing

celery_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/marselester/gopher-celery/goredis"
1515
"github.com/marselester/gopher-celery/protocol"
16+
"github.com/marselester/gopher-celery/rabbitmq"
1617
)
1718

1819
func TestExecuteTaskPanic(t *testing.T) {
@@ -243,6 +244,57 @@ func TestGoredisProduceAndConsume100times(t *testing.T) {
243244
}
244245
}
245246

247+
func TestRabbitmqProduceAndConsume100times(t *testing.T) {
248+
app := NewApp(
249+
WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))),
250+
WithLogger(log.NewJSONLogger(os.Stderr)),
251+
)
252+
253+
queue := "rabbitmq_broker_test"
254+
255+
// Create the queue, if it doesn't exist.
256+
app.conf.broker.Observe([]string{queue})
257+
258+
for i := 0; i < 100; i++ {
259+
err := app.Delay(
260+
"myproject.apps.myapp.tasks.mytask",
261+
queue,
262+
2,
263+
3,
264+
)
265+
if err != nil {
266+
t.Fatal(err)
267+
}
268+
}
269+
270+
// The test finishes either when ctx times out or all the tasks finish.
271+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
272+
t.Cleanup(cancel)
273+
274+
var sum int32
275+
app.Register(
276+
"myproject.apps.myapp.tasks.mytask",
277+
queue,
278+
func(ctx context.Context, p *TaskParam) error {
279+
p.NameArgs("a", "b")
280+
atomic.AddInt32(
281+
&sum,
282+
int32(p.MustInt("a")+p.MustInt("b")),
283+
)
284+
return nil
285+
},
286+
)
287+
if err := app.Run(ctx); err != nil {
288+
t.Error(err)
289+
}
290+
291+
var want int32 = 500
292+
if want != sum {
293+
t.Errorf("expected sum %d got %d", want, sum)
294+
}
295+
296+
}
297+
246298
func TestConsumeSequentially(t *testing.T) {
247299
app := NewApp(
248300
WithLogger(log.NewJSONLogger(os.Stderr)),

rabbitmq/broker.go

Lines changed: 99 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package rabbitmq
44

55
import (
6-
"context"
76
"encoding/base64"
87
"encoding/json"
98
"fmt"
@@ -33,7 +32,7 @@ type Broker struct {
3332
queues []string
3433
conn *amqp.Connection
3534
channel *amqp.Channel
36-
ctx context.Context
35+
delivery map[string]<-chan amqp.Delivery
3736
}
3837

3938
// WithAmqpUri sets the AMQP connection URI to RabbitMQ.
@@ -67,30 +66,27 @@ func NewBroker(options ...BrokerOption) *Broker {
6766
amqpUri: DefaultAmqpUri,
6867
receiveTimeout: DefaultReceiveTimeout * time.Second,
6968
rawMode: false,
70-
ctx: context.Background(),
69+
delivery: make(map[string]<-chan amqp.Delivery),
7170
}
7271
for _, opt := range options {
7372
opt(&br)
7473
}
7574

7675
if br.conn == nil {
77-
br.channel = nil
7876
conn, err := amqp.Dial(br.amqpUri)
79-
br.conn = conn
8077
if err != nil {
8178
log.Panicf("Failed to connect to RabbitMQ: %s", err)
8279
return nil
8380
}
81+
br.conn = conn
8482
}
8583

86-
if br.channel == nil {
87-
channel, err := br.conn.Channel()
88-
br.channel = channel
89-
if err != nil {
90-
log.Panicf("Failed to open a channel: %s", err)
91-
return nil
92-
}
84+
channel, err := br.conn.Channel()
85+
if err != nil {
86+
log.Panicf("Failed to open a channel: %s", err)
87+
return nil
9388
}
89+
br.channel = channel
9490

9591
return &br
9692
}
@@ -135,8 +131,7 @@ func (br *Broker) Send(m []byte, q string) error {
135131
replyTo = properties_in["reply_to"].(string)
136132
}
137133

138-
err := br.channel.PublishWithContext(
139-
br.ctx,
134+
err := br.channel.Publish(
140135
"", // exchange
141136
q, // routing key
142137
false, // mandatory
@@ -150,6 +145,7 @@ func (br *Broker) Send(m []byte, q string) error {
150145
ReplyTo: replyTo,
151146
Body: body,
152147
})
148+
153149
return err
154150
}
155151

@@ -158,87 +154,114 @@ func (br *Broker) Send(m []byte, q string) error {
158154
func (br *Broker) Observe(queues []string) {
159155
br.queues = queues
160156
for _, queue := range queues {
161-
_, err := br.channel.QueueDeclare(
162-
queue, // name
163-
true, // durable
164-
false, // delete when unused
165-
false, // exclusive
166-
false, // no-wait
167-
nil, // arguments
157+
durable := true
158+
autoDelete := false
159+
exclusive := false
160+
noWait := false
161+
162+
// Check whether the queue exists.
163+
_, err := br.channel.QueueDeclarePassive(
164+
queue,
165+
durable,
166+
autoDelete,
167+
exclusive,
168+
noWait,
169+
nil,
168170
)
171+
172+
// If the queue doesn't exist, attempt to create it.
169173
if err != nil {
170-
log.Panicf("Failed to declare a queue: %s", err)
174+
// QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens.
175+
if br.channel.IsClosed() {
176+
channel, err := br.conn.Channel()
177+
if err != nil {
178+
log.Panicf("Failed to open a channel: %s", err)
179+
}
180+
br.channel = channel
181+
}
182+
183+
_, err := br.channel.QueueDeclare(
184+
queue,
185+
durable,
186+
autoDelete,
187+
exclusive,
188+
noWait,
189+
nil,
190+
)
191+
192+
if err != nil {
193+
log.Panicf("Failed to declare a queue: %s", err)
194+
}
171195
}
172196
}
173197
}
174198

175199
// Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ.
176200
// After a timeout it returns nil, nil.
177201
func (br *Broker) Receive() ([]byte, error) {
202+
queue := br.queues[0]
203+
// Put the Celery queue name to the end of the slice for fair processing.
204+
broker.Move2back(br.queues, queue)
178205

179-
const retryIntervalMs = 100
206+
var err error
207+
208+
delivery, delivery_exists := br.delivery[queue]
209+
if !delivery_exists {
210+
delivery, err = br.channel.Consume(
211+
queue, // queue
212+
"", // consumer
213+
true, // autoAck
214+
false, // exclusive
215+
false, // noLocal (ignored)
216+
false, // noWait
217+
nil, // args
218+
)
180219

181-
try_receive := func() (msg amqp.Delivery, ok bool, err error) {
182-
queue := br.queues[0]
183-
// Put the Celery queue name to the end of the slice for fair processing.
184-
broker.Move2back(br.queues, queue)
185-
my_msg, my_ok, my_err := br.channel.Get(queue, true)
186-
if my_err != nil {
187-
log.Printf("Failed to g a message: %s", my_err)
220+
if err != nil {
221+
return nil, err
188222
}
189-
return my_msg, my_ok, my_err
190-
}
191223

192-
startTime := time.Now()
193-
timeoutTime := startTime.Add(br.receiveTimeout)
194-
msg, ok, err := try_receive()
195-
if err != nil {
196-
return nil, nil
224+
br.delivery[queue] = delivery
197225
}
198-
for !ok {
199-
if time.Now().After(timeoutTime) {
200-
return nil, nil
226+
227+
select {
228+
case msg := <-delivery:
229+
if br.rawMode {
230+
return msg.Body, nil
201231
}
202-
time.Sleep(retryIntervalMs * time.Millisecond)
203232

204-
msg, ok, err = try_receive()
205-
if err != nil {
206-
return nil, nil
233+
// Marshal msg from RabbitMQ Celery format to internal Celery format.
234+
235+
properties := make(map[string]interface{})
236+
properties["correlation_id"] = msg.CorrelationId
237+
properties["reply_to"] = msg.ReplyTo
238+
properties["delivery_mode"] = msg.DeliveryMode
239+
properties["delivery_info"] = map[string]interface{}{
240+
"exchange": msg.Exchange,
241+
"routing_key": msg.RoutingKey,
207242
}
208-
}
243+
properties["priority"] = msg.Priority
244+
properties["body_encoding"] = "base64"
245+
properties["delivery_tag"] = msg.DeliveryTag
209246

210-
if br.rawMode {
211-
return msg.Body, nil
212-
}
247+
imsg := make(map[string]interface{})
248+
imsg["body"] = msg.Body
249+
imsg["content-encoding"] = msg.ContentEncoding
250+
imsg["content-type"] = msg.ContentType
251+
imsg["headers"] = msg.Headers
252+
imsg["properties"] = properties
213253

214-
// Marshal msg from RabbitMQ Celery format to internal Celery format.
215-
216-
properties := make(map[string]interface{})
217-
properties["correlation_id"] = msg.CorrelationId
218-
properties["reply_to"] = msg.ReplyTo
219-
properties["delivery_mode"] = msg.DeliveryMode
220-
delivery_info := make(map[string]interface{})
221-
properties["delivery_info"] = delivery_info
222-
delivery_info["exchange"] = msg.Exchange
223-
delivery_info["routing_key"] = msg.RoutingKey
224-
properties["priority"] = msg.Priority
225-
properties["body_encoding"] = "base64"
226-
properties["delivery_tag"] = msg.DeliveryTag
227-
228-
imsg := make(map[string]interface{})
229-
imsg["body"] = msg.Body
230-
imsg["content-encoding"] = msg.ContentEncoding
231-
imsg["content-type"] = msg.ContentType
232-
imsg["headers"] = msg.Headers
233-
imsg["properties"] = properties
234-
235-
var result []byte
236-
result, err = json.Marshal(imsg)
237-
if err != nil {
238-
err_str := fmt.Errorf("%w", err)
239-
log.Printf("json encode: %s", err_str)
254+
var result []byte
255+
result, err := json.Marshal(imsg)
256+
if err != nil {
257+
err_str := fmt.Errorf("%w", err)
258+
log.Printf("json encode: %s", err_str)
259+
return nil, err
260+
}
261+
return result, nil
262+
263+
case <-time.After(br.receiveTimeout):
264+
// Receive timeout
240265
return nil, nil
241266
}
242-
243-
return result, nil
244267
}

0 commit comments

Comments
 (0)