24
24
from threading import Event , local
25
25
26
26
import pika
27
- from pika .exceptions import ChannelClosedByBroker
28
27
29
28
from ..broker import Broker , Consumer , MessageProxy
30
29
from ..common import current_millis , dq_name , q_name , xq_name
@@ -324,7 +323,7 @@ def enqueue(self, message, *, delay=None):
324
323
attempts = 1
325
324
while True :
326
325
try :
327
- self .declare_queue (queue_name , ensure = True )
326
+ self .declare_queue (message . queue_name , ensure = True )
328
327
self .logger .debug ("Enqueueing message %r on queue %r." , message .message_id , queue_name )
329
328
self .emit_before ("enqueue" , message , delay )
330
329
self .channel .basic_publish (
@@ -345,13 +344,6 @@ def enqueue(self, message, *, delay=None):
345
344
# next caller/attempt may initiate new ones of each.
346
345
del self .connection
347
346
348
- # When a queue has been deleted, attempt to get it
349
- # redeclared by removing it from the known set. The next
350
- # time a message is enqueued -- which could be when we
351
- # retry this block -- it will be redeclared.
352
- if isinstance (e , ChannelClosedByBroker ) and e .reply_code == 404 :
353
- self .queues .remove (q_name (queue_name ))
354
-
355
347
attempts += 1
356
348
if attempts > MAX_ENQUEUE_ATTEMPTS :
357
349
raise ConnectionClosed (e ) from None
0 commit comments