@@ -12,6 +12,7 @@ import (
1212 "github.com/go-kit/log"
1313
1414 "github.com/marselester/gopher-celery/goredis"
15+ "github.com/marselester/gopher-celery/rabbitmq"
1516 "github.com/marselester/gopher-celery/protocol"
1617)
1718
@@ -243,6 +244,50 @@ func TestGoredisProduceAndConsume100times(t *testing.T) {
243244 }
244245}
245246
247+ func TestRabbitmqProduceAndConsume100times (t * testing.T ) {
248+ app := NewApp (
249+ WithBroker (rabbitmq .NewBroker (rabbitmq .WithAmqpUri ("amqp://guest:guest@localhost:5672/" ))),
250+ WithLogger (log .NewJSONLogger (os .Stderr )),
251+ )
252+ for i := 0 ; i < 100 ; i ++ {
253+ err := app .Delay (
254+ "myproject.apps.myapp.tasks.mytask" ,
255+ "important" ,
256+ 2 ,
257+ 3 ,
258+ )
259+ if err != nil {
260+ t .Fatal (err )
261+ }
262+ }
263+
264+ // The test finishes either when ctx times out or all the tasks finish.
265+ ctx , cancel := context .WithTimeout (context .Background (), time .Second )
266+ t .Cleanup (cancel )
267+
268+ var sum int32
269+ app .Register (
270+ "myproject.apps.myapp.tasks.mytask" ,
271+ "important" ,
272+ func (ctx context.Context , p * TaskParam ) error {
273+ p .NameArgs ("a" , "b" )
274+ atomic .AddInt32 (
275+ & sum ,
276+ int32 (p .MustInt ("a" )+ p .MustInt ("b" )),
277+ )
278+ return nil
279+ },
280+ )
281+ if err := app .Run (ctx ); err != nil {
282+ t .Error (err )
283+ }
284+
285+ var want int32 = 500
286+ if want != sum {
287+ t .Errorf ("expected sum %d got %d" , want , sum )
288+ }
289+ }
290+
246291func TestConsumeSequentially (t * testing.T ) {
247292 app := NewApp (
248293 WithLogger (log .NewJSONLogger (os .Stderr )),
0 commit comments