From f74a66a958f690ecf06b165a80cae0fc63181b22 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 13:40:05 -0400 Subject: [PATCH 01/22] Update instructions for producer.py. --- README.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4eb15ab..a7f9fe6 100644 --- a/README.md +++ b/README.md @@ -103,12 +103,16 @@ $ celery --app myproject worker --queues important --loglevel=debug --without-he Sending tasks from Python and receiving them on Go side. ```sh -$ python producer.py --protocol=1 +$ python producer.py $ go run ./consumer/ {"msg":"waiting for tasks..."} received a=fizz b=bazz ``` +To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument: +```sh +$ python producer.py --protocol=1 +```
@@ -214,6 +218,10 @@ $ go run ./consumer/ received a=fizz b=bazz ``` +To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument: +```sh +$ python producer.py --protocol=1 +```
## Testing From 303d9af524e2acbf53715170e764037eab5f7b20 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 13:40:40 -0400 Subject: [PATCH 02/22] Use proper channel.Consume() with a timeout. --- rabbitmq/broker.go | 102 +++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 54 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 12a0bb0..1f586fc 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -175,70 +175,64 @@ func (br *Broker) Observe(queues []string) { // Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ. // After a timeout it returns nil, nil. func (br *Broker) Receive() ([]byte, error) { - - const retryIntervalMs = 100 - - try_receive := func() (msg amqp.Delivery, ok bool, err error) { - queue := br.queues[0] - // Put the Celery queue name to the end of the slice for fair processing. - broker.Move2back(br.queues, queue) - my_msg, my_ok, my_err := br.channel.Get(queue, true) - if my_err != nil { - log.Printf("Failed to g a message: %s", my_err) - } - return my_msg, my_ok, my_err - } - - startTime := time.Now() - timeoutTime := startTime.Add(br.receiveTimeout) - msg, ok, err := try_receive() + queue := br.queues[0] + // Put the Celery queue name to the end of the slice for fair processing. + broker.Move2back(br.queues, queue) + + delivery, err := br.channel.Consume( + queue, // queue + "", // consumer + true, // autoAck + false, // exclusive + false, // noLocal (ignored) + false, // noWait + nil, // args + ) if err != nil { + err_str := fmt.Errorf("%w", err) + log.Printf("channel.Consume(): %s", err_str) return nil, nil } - for !ok { - if time.Now().After(timeoutTime) { - return nil, nil + + select { + case msg := <-delivery: + if br.rawMode { + return msg.Body, nil } - time.Sleep(retryIntervalMs * time.Millisecond) - msg, ok, err = try_receive() + // Marshal msg from RabbitMQ Celery format to internal Celery format. + + properties := make(map[string]interface{}) + properties["correlation_id"] = msg.CorrelationId + properties["reply_to"] = msg.ReplyTo + properties["delivery_mode"] = msg.DeliveryMode + delivery_info := make(map[string]interface{}) + properties["delivery_info"] = delivery_info + delivery_info["exchange"] = msg.Exchange + delivery_info["routing_key"] = msg.RoutingKey + properties["priority"] = msg.Priority + properties["body_encoding"] = "base64" + properties["delivery_tag"] = msg.DeliveryTag + + imsg := make(map[string]interface{}) + imsg["body"] = msg.Body + imsg["content-encoding"] = msg.ContentEncoding + imsg["content-type"] = msg.ContentType + imsg["headers"] = msg.Headers + imsg["properties"] = properties + + var result []byte + result, err = json.Marshal(imsg) if err != nil { + err_str := fmt.Errorf("%w", err) + log.Printf("json encode: %s", err_str) return nil, nil } - } - if br.rawMode { - return msg.Body, nil - } + return result, nil - // Marshal msg from RabbitMQ Celery format to internal Celery format. - - properties := make(map[string]interface{}) - properties["correlation_id"] = msg.CorrelationId - properties["reply_to"] = msg.ReplyTo - properties["delivery_mode"] = msg.DeliveryMode - delivery_info := make(map[string]interface{}) - properties["delivery_info"] = delivery_info - delivery_info["exchange"] = msg.Exchange - delivery_info["routing_key"] = msg.RoutingKey - properties["priority"] = msg.Priority - properties["body_encoding"] = "base64" - properties["delivery_tag"] = msg.DeliveryTag - - imsg := make(map[string]interface{}) - imsg["body"] = msg.Body - imsg["content-encoding"] = msg.ContentEncoding - imsg["content-type"] = msg.ContentType - imsg["headers"] = msg.Headers - imsg["properties"] = properties - - var result []byte - result, err = json.Marshal(imsg) - if err != nil { - err_str := fmt.Errorf("%w", err) - log.Printf("json encode: %s", err_str) + case <-time.After(br.receiveTimeout): + // Receive timeout return nil, nil } - - return result, nil } From b49ab3c68bdee585049a603254daa994710da827 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 13:43:45 -0400 Subject: [PATCH 03/22] Trigger action. --- x.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 x.txt diff --git a/x.txt b/x.txt new file mode 100644 index 0000000..e69de29 From 037fd6a2f416f740ad2fb5e630e0021f8466a59c Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 13:45:01 -0400 Subject: [PATCH 04/22] Remove temp file. --- x.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 x.txt diff --git a/x.txt b/x.txt deleted file mode 100644 index e69de29..0000000 From 6d43b9d897c3397ce8a26c38f7792fc82206125e Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 14:45:23 -0400 Subject: [PATCH 05/22] Only create one Delivery per queue; keep them in a map by queue name. --- rabbitmq/broker.go | 49 +++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 1f586fc..4f465a5 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -33,6 +33,7 @@ type Broker struct { queues []string conn *amqp.Connection channel *amqp.Channel + delivery map[string]<-chan amqp.Delivery ctx context.Context } @@ -83,15 +84,15 @@ func NewBroker(options ...BrokerOption) *Broker { } } - if br.channel == nil { - channel, err := br.conn.Channel() - br.channel = channel - if err != nil { - log.Panicf("Failed to open a channel: %s", err) - return nil - } + channel, err := br.conn.Channel() + br.channel = channel + if err != nil { + log.Panicf("Failed to open a channel: %s", err) + return nil } + br.delivery = make(map[string]<-chan amqp.Delivery) + return &br } @@ -179,20 +180,24 @@ func (br *Broker) Receive() ([]byte, error) { // Put the Celery queue name to the end of the slice for fair processing. broker.Move2back(br.queues, queue) - delivery, err := br.channel.Consume( - queue, // queue - "", // consumer - true, // autoAck - false, // exclusive - false, // noLocal (ignored) - false, // noWait - nil, // args - ) - if err != nil { - err_str := fmt.Errorf("%w", err) - log.Printf("channel.Consume(): %s", err_str) - return nil, nil - } + delivery, ok := br.delivery[queue] + if !ok { + delivery, err := br.channel.Consume( + queue, // queue + "", // consumer + true, // autoAck + false, // exclusive + false, // noLocal (ignored) + false, // noWait + nil, // args + ) + if err != nil { + err_str := fmt.Errorf("%w", err) + log.Printf("channel.Consume(): %s", err_str) + return nil, nil + } + br.delivery[queue] = delivery + } select { case msg := <-delivery: @@ -222,7 +227,7 @@ func (br *Broker) Receive() ([]byte, error) { imsg["properties"] = properties var result []byte - result, err = json.Marshal(imsg) + result, err := json.Marshal(imsg) if err != nil { err_str := fmt.Errorf("%w", err) log.Printf("json encode: %s", err_str) From 5c4eacaf9ad0280d85d12c6fb678cc369225ba40 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 14:47:53 -0400 Subject: [PATCH 06/22] Only create one Delivery per queue; keep them in a map by queue name. --- rabbitmq/broker.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 4f465a5..3b0f0d6 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -33,7 +33,7 @@ type Broker struct { queues []string conn *amqp.Connection channel *amqp.Channel - delivery map[string]<-chan amqp.Delivery + delivery map[string]<-chan amqp.Delivery ctx context.Context } @@ -91,7 +91,7 @@ func NewBroker(options ...BrokerOption) *Broker { return nil } - br.delivery = make(map[string]<-chan amqp.Delivery) + br.delivery = make(map[string]<-chan amqp.Delivery) return &br } @@ -180,24 +180,24 @@ func (br *Broker) Receive() ([]byte, error) { // Put the Celery queue name to the end of the slice for fair processing. broker.Move2back(br.queues, queue) - delivery, ok := br.delivery[queue] - if !ok { - delivery, err := br.channel.Consume( - queue, // queue - "", // consumer - true, // autoAck - false, // exclusive - false, // noLocal (ignored) - false, // noWait - nil, // args - ) - if err != nil { - err_str := fmt.Errorf("%w", err) - log.Printf("channel.Consume(): %s", err_str) - return nil, nil - } - br.delivery[queue] = delivery - } + delivery, ok := br.delivery[queue] + if !ok { + delivery, err := br.channel.Consume( + queue, // queue + "", // consumer + true, // autoAck + false, // exclusive + false, // noLocal (ignored) + false, // noWait + nil, // args + ) + if err != nil { + err_str := fmt.Errorf("%w", err) + log.Printf("channel.Consume(): %s", err_str) + return nil, nil + } + br.delivery[queue] = delivery + } select { case msg := <-delivery: From e57cab1f7a8cff90aa6038dba3b140265b00fba6 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 15:28:43 -0400 Subject: [PATCH 07/22] Make delivery map when instantiating the struct. --- rabbitmq/broker.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 3b0f0d6..09edfd4 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -68,6 +68,7 @@ func NewBroker(options ...BrokerOption) *Broker { amqpUri: DefaultAmqpUri, receiveTimeout: DefaultReceiveTimeout * time.Second, rawMode: false, + delivery: make(map[string]<-chan amqp.Delivery), ctx: context.Background(), } for _, opt := range options { @@ -91,8 +92,6 @@ func NewBroker(options ...BrokerOption) *Broker { return nil } - br.delivery = make(map[string]<-chan amqp.Delivery) - return &br } From a9fc753a9701b34b0628e3b71f1f7635ab1588c9 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 15:39:16 -0400 Subject: [PATCH 08/22] Move channel.Consume() to the Observe() function. --- rabbitmq/broker.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 09edfd4..42e6971 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -168,6 +168,24 @@ func (br *Broker) Observe(queues []string) { ) if err != nil { log.Panicf("Failed to declare a queue: %s", err) + } else { + for _, queue := range queues { + delivery, err := br.channel.Consume( + queue, // queue + "", // consumer + true, // autoAck + false, // exclusive + false, // noLocal (ignored) + false, // noWait + nil, // args + ) + if err != nil { + err_str := fmt.Errorf("%w", err) + log.Panicf("channel.Consume() failed for queue %s: %s", queue, err_str) + } else { + br.delivery[queue] = delivery + } + } } } } @@ -179,24 +197,7 @@ func (br *Broker) Receive() ([]byte, error) { // Put the Celery queue name to the end of the slice for fair processing. broker.Move2back(br.queues, queue) - delivery, ok := br.delivery[queue] - if !ok { - delivery, err := br.channel.Consume( - queue, // queue - "", // consumer - true, // autoAck - false, // exclusive - false, // noLocal (ignored) - false, // noWait - nil, // args - ) - if err != nil { - err_str := fmt.Errorf("%w", err) - log.Printf("channel.Consume(): %s", err_str) - return nil, nil - } - br.delivery[queue] = delivery - } + delivery := br.delivery[queue] select { case msg := <-delivery: From 5e406f9ad1514d74837b9d1936c64bb932a1733d Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 15:44:09 -0400 Subject: [PATCH 09/22] Re-order assignments. --- rabbitmq/broker.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 42e6971..0b03f07 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -76,21 +76,20 @@ func NewBroker(options ...BrokerOption) *Broker { } if br.conn == nil { - br.channel = nil conn, err := amqp.Dial(br.amqpUri) - br.conn = conn if err != nil { log.Panicf("Failed to connect to RabbitMQ: %s", err) return nil } + br.conn = conn } channel, err := br.conn.Channel() - br.channel = channel if err != nil { log.Panicf("Failed to open a channel: %s", err) return nil } + br.channel = channel return &br } From 8cba31ed073fd759f6fbba352b3b6bb122de8643 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 15:48:05 -0400 Subject: [PATCH 10/22] Trigger pipeline. --- x.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 x.txt diff --git a/x.txt b/x.txt new file mode 100644 index 0000000..e69de29 From a65d55ce89616a8ee3085f57542a24565234bda8 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 15:58:28 -0400 Subject: [PATCH 11/22] Remove temp file. --- x.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 x.txt diff --git a/x.txt b/x.txt deleted file mode 100644 index e69de29..0000000 From af330b29acf477fc1f3affa9622b93d0013613b4 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 16:01:27 -0400 Subject: [PATCH 12/22] Use newer rabbitmq image. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8464e25..58dca14 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: redis-version: 7 - uses: namoshek/rabbitmq-github-action@v1 with: - version: '3.8.9' + version: '4.1.1' ports: '5672:5672' - run: go test -count=1 -v ./... lint: From 9c09c889150cc0d5a5007b6248c7c1e681b8d527 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Mon, 30 Jun 2025 16:05:53 -0400 Subject: [PATCH 13/22] Return any JSON encoding error. --- rabbitmq/broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 0b03f07..3735504 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -230,7 +230,7 @@ func (br *Broker) Receive() ([]byte, error) { if err != nil { err_str := fmt.Errorf("%w", err) log.Printf("json encode: %s", err_str) - return nil, nil + return nil, err } return result, nil From a72e1806273bfe1239dae9c235ddfc2ed8a32da7 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 08:30:43 -0400 Subject: [PATCH 14/22] Copmletely initialize delivery_info before assigning to properties. --- rabbitmq/broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 3735504..c397ee1 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -211,9 +211,9 @@ func (br *Broker) Receive() ([]byte, error) { properties["reply_to"] = msg.ReplyTo properties["delivery_mode"] = msg.DeliveryMode delivery_info := make(map[string]interface{}) - properties["delivery_info"] = delivery_info delivery_info["exchange"] = msg.Exchange delivery_info["routing_key"] = msg.RoutingKey + properties["delivery_info"] = delivery_info properties["priority"] = msg.Priority properties["body_encoding"] = "base64" properties["delivery_tag"] = msg.DeliveryTag From 0939bdfce4bc075e858158330faf78a2f138a47d Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 09:10:08 -0400 Subject: [PATCH 15/22] Clean up delivery_info map. --- rabbitmq/broker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index c397ee1..3179f20 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -210,10 +210,10 @@ func (br *Broker) Receive() ([]byte, error) { properties["correlation_id"] = msg.CorrelationId properties["reply_to"] = msg.ReplyTo properties["delivery_mode"] = msg.DeliveryMode - delivery_info := make(map[string]interface{}) - delivery_info["exchange"] = msg.Exchange - delivery_info["routing_key"] = msg.RoutingKey - properties["delivery_info"] = delivery_info + properties["delivery_info"] = map[string]interface{}{ + "exchange": msg.Exchange, + "routing_key": msg.RoutingKey, + } properties["priority"] = msg.Priority properties["body_encoding"] = "base64" properties["delivery_tag"] = msg.DeliveryTag From f365f896daf2d0b227db4be345ecfcbc7a9af777 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 09:11:30 -0400 Subject: [PATCH 16/22] Add unit test for RabbitMQ broker. --- celery_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/celery_test.go b/celery_test.go index adf987e..7ecf0f7 100644 --- a/celery_test.go +++ b/celery_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/marselester/gopher-celery/goredis" + "github.com/marselester/gopher-celery/rabbitmq" "github.com/marselester/gopher-celery/protocol" ) @@ -243,6 +244,50 @@ func TestGoredisProduceAndConsume100times(t *testing.T) { } } +func TestRabbitmqProduceAndConsume100times(t *testing.T) { + app := NewApp( + WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))), + WithLogger(log.NewJSONLogger(os.Stderr)), + ) + for i := 0; i < 100; i++ { + err := app.Delay( + "myproject.apps.myapp.tasks.mytask", + "important", + 2, + 3, + ) + if err != nil { + t.Fatal(err) + } + } + + // The test finishes either when ctx times out or all the tasks finish. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + + var sum int32 + app.Register( + "myproject.apps.myapp.tasks.mytask", + "important", + func(ctx context.Context, p *TaskParam) error { + p.NameArgs("a", "b") + atomic.AddInt32( + &sum, + int32(p.MustInt("a")+p.MustInt("b")), + ) + return nil + }, + ) + if err := app.Run(ctx); err != nil { + t.Error(err) + } + + var want int32 = 500 + if want != sum { + t.Errorf("expected sum %d got %d", want, sum) + } +} + func TestConsumeSequentially(t *testing.T) { app := NewApp( WithLogger(log.NewJSONLogger(os.Stderr)), From 2a47ae0a47b09cd6ea1fbf926f62e322d6b01d39 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 09:17:02 -0400 Subject: [PATCH 17/22] Increase test timeout. --- celery_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/celery_test.go b/celery_test.go index 7ecf0f7..7ff02cd 100644 --- a/celery_test.go +++ b/celery_test.go @@ -262,7 +262,7 @@ func TestRabbitmqProduceAndConsume100times(t *testing.T) { } // The test finishes either when ctx times out or all the tasks finish. - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second) t.Cleanup(cancel) var sum int32 From fbdcb9e36bb776aef93980e521e388cc87675a6d Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 12:54:25 -0400 Subject: [PATCH 18/22] Fix messages disappearing. --- celery_test.go | 10 +++--- rabbitmq/broker.go | 87 +++++++++++++++++++++++++++++----------------- 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/celery_test.go b/celery_test.go index 7ff02cd..9b2565e 100644 --- a/celery_test.go +++ b/celery_test.go @@ -245,10 +245,11 @@ func TestGoredisProduceAndConsume100times(t *testing.T) { } func TestRabbitmqProduceAndConsume100times(t *testing.T) { - app := NewApp( - WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))), + app := NewApp( + WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))), WithLogger(log.NewJSONLogger(os.Stderr)), - ) + ) + for i := 0; i < 100; i++ { err := app.Delay( "myproject.apps.myapp.tasks.mytask", @@ -262,7 +263,7 @@ func TestRabbitmqProduceAndConsume100times(t *testing.T) { } // The test finishes either when ctx times out or all the tasks finish. - ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) t.Cleanup(cancel) var sum int32 @@ -286,6 +287,7 @@ func TestRabbitmqProduceAndConsume100times(t *testing.T) { if want != sum { t.Errorf("expected sum %d got %d", want, sum) } + } func TestConsumeSequentially(t *testing.T) { diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 3179f20..884668b 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -3,7 +3,6 @@ package rabbitmq import ( - "context" "encoding/base64" "encoding/json" "fmt" @@ -34,7 +33,6 @@ type Broker struct { conn *amqp.Connection channel *amqp.Channel delivery map[string]<-chan amqp.Delivery - ctx context.Context } // WithAmqpUri sets the AMQP connection URI to RabbitMQ. @@ -69,7 +67,6 @@ func NewBroker(options ...BrokerOption) *Broker { receiveTimeout: DefaultReceiveTimeout * time.Second, rawMode: false, delivery: make(map[string]<-chan amqp.Delivery), - ctx: context.Background(), } for _, opt := range options { opt(&br) @@ -134,8 +131,7 @@ func (br *Broker) Send(m []byte, q string) error { replyTo = properties_in["reply_to"].(string) } - err := br.channel.PublishWithContext( - br.ctx, + err := br.channel.Publish( "", // exchange q, // routing key false, // mandatory @@ -149,6 +145,7 @@ func (br *Broker) Send(m []byte, q string) error { ReplyTo: replyTo, Body: body, }) + return err } @@ -157,33 +154,43 @@ func (br *Broker) Send(m []byte, q string) error { func (br *Broker) Observe(queues []string) { br.queues = queues for _, queue := range queues { - _, err := br.channel.QueueDeclare( - queue, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + durable := true + autoDelete := false + exclusive := false + noWait := false + + // Check whether the queue exists. + _, err := br.channel.QueueDeclarePassive( + queue, + durable, + autoDelete, + exclusive, + noWait, + nil, ) + + // If the queue doesn't exist, attempt to create it. if err != nil { - log.Panicf("Failed to declare a queue: %s", err) - } else { - for _, queue := range queues { - delivery, err := br.channel.Consume( - queue, // queue - "", // consumer - true, // autoAck - false, // exclusive - false, // noLocal (ignored) - false, // noWait - nil, // args - ) + // QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens. + if br.channel.IsClosed() { + channel, err := br.conn.Channel() if err != nil { - err_str := fmt.Errorf("%w", err) - log.Panicf("channel.Consume() failed for queue %s: %s", queue, err_str) - } else { - br.delivery[queue] = delivery + log.Panicf("Failed to open a channel: %s", err) } + br.channel = channel + } + + _, err := br.channel.QueueDeclare( + queue, + durable, + autoDelete, + exclusive, + noWait, + nil, + ) + + if err != nil { + log.Panicf("Failed to declare a queue: %s", err) } } } @@ -196,10 +203,29 @@ func (br *Broker) Receive() ([]byte, error) { // Put the Celery queue name to the end of the slice for fair processing. broker.Move2back(br.queues, queue) - delivery := br.delivery[queue] + var err error + + delivery, delivery_exists := br.delivery[queue] + if !delivery_exists { + delivery, err = br.channel.Consume( + queue, // queue + "", // consumer + true, // autoAck + false, // exclusive + false, // noLocal (ignored) + false, // noWait + nil, // args + ) + + if err != nil { + return nil, err + } + + br.delivery[queue] = delivery + } select { - case msg := <-delivery: + case msg := <-br.delivery[queue]: if br.rawMode { return msg.Body, nil } @@ -232,7 +258,6 @@ func (br *Broker) Receive() ([]byte, error) { log.Printf("json encode: %s", err_str) return nil, err } - return result, nil case <-time.After(br.receiveTimeout): From 6d7f4477894d11707eec95792e6c731ee3d4480c Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 12:55:44 -0400 Subject: [PATCH 19/22] Fix lint. --- celery_test.go | 2 +- rabbitmq/broker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/celery_test.go b/celery_test.go index 9b2565e..1e42941 100644 --- a/celery_test.go +++ b/celery_test.go @@ -12,8 +12,8 @@ import ( "github.com/go-kit/log" "github.com/marselester/gopher-celery/goredis" - "github.com/marselester/gopher-celery/rabbitmq" "github.com/marselester/gopher-celery/protocol" + "github.com/marselester/gopher-celery/rabbitmq" ) func TestExecuteTaskPanic(t *testing.T) { diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 884668b..ae5a9c8 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -237,7 +237,7 @@ func (br *Broker) Receive() ([]byte, error) { properties["reply_to"] = msg.ReplyTo properties["delivery_mode"] = msg.DeliveryMode properties["delivery_info"] = map[string]interface{}{ - "exchange": msg.Exchange, + "exchange": msg.Exchange, "routing_key": msg.RoutingKey, } properties["priority"] = msg.Priority From 290c625e13e79898d39e388a60ce6af0460ab38e Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 13:00:30 -0400 Subject: [PATCH 20/22] Fix lint. --- rabbitmq/broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index ae5a9c8..75a9929 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -204,6 +204,7 @@ func (br *Broker) Receive() ([]byte, error) { broker.Move2back(br.queues, queue) var err error + var delivery <-chan amqp.Delivery delivery, delivery_exists := br.delivery[queue] if !delivery_exists { From 394cea98f160d3aa722568798d3f090b52c8c629 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 13:03:48 -0400 Subject: [PATCH 21/22] Fix lint. --- rabbitmq/broker.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 75a9929..a101e47 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -204,7 +204,6 @@ func (br *Broker) Receive() ([]byte, error) { broker.Move2back(br.queues, queue) var err error - var delivery <-chan amqp.Delivery delivery, delivery_exists := br.delivery[queue] if !delivery_exists { @@ -226,7 +225,7 @@ func (br *Broker) Receive() ([]byte, error) { } select { - case msg := <-br.delivery[queue]: + case msg := <-delivery: if br.rawMode { return msg.Body, nil } From 3d94e696914221b0641b4a9a0dd343135aff15b9 Mon Sep 17 00:00:00 2001 From: Ron Cemer Date: Tue, 1 Jul 2025 13:20:03 -0400 Subject: [PATCH 22/22] Fix unit tests. --- celery_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/celery_test.go b/celery_test.go index 1e42941..187fd1b 100644 --- a/celery_test.go +++ b/celery_test.go @@ -250,10 +250,15 @@ func TestRabbitmqProduceAndConsume100times(t *testing.T) { WithLogger(log.NewJSONLogger(os.Stderr)), ) + queue := "rabbitmq_broker_test" + + // Create the queue, if it doesn't exist. + app.conf.broker.Observe([]string{queue}) + for i := 0; i < 100; i++ { err := app.Delay( "myproject.apps.myapp.tasks.mytask", - "important", + queue, 2, 3, ) @@ -269,7 +274,7 @@ func TestRabbitmqProduceAndConsume100times(t *testing.T) { var sum int32 app.Register( "myproject.apps.myapp.tasks.mytask", - "important", + queue, func(ctx context.Context, p *TaskParam) error { p.NameArgs("a", "b") atomic.AddInt32(