Skip to content

Commit 37bd236

Browse files
committed
Make topic-sending recipes work
This was apparently never used or tested?
1 parent 75cf450 commit 37bd236

File tree

2 files changed

+5
-2
lines changed

2 files changed

+5
-2
lines changed

src/workflows/recipe/wrapper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,10 @@ def _send_to_destination(
265265
message = mangle_for_sending(message)
266266
self._retry_transport(
267267
broadcast,
268-
self.recipe[destination]["topic"],
268+
self.recipe[destination]["exchange"],
269269
message,
270270
headers=header,
271+
topic=self.recipe[destination].get("topic"),
271272
**dest_kwargs,
272273
)
273274

src/workflows/transport/pika_transport.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ def _broadcast(
543543
delay=None,
544544
expiration: int | None = None,
545545
transaction: int | None = None,
546+
topic: str | None = None,
546547
**kwargs,
547548
):
548549
"""Send a message to a fanout exchange.
@@ -554,6 +555,7 @@ def _broadcast(
554555
delay: Delay transport of message by this many seconds
555556
expiration: Optional TTL expiration time, in seconds, relative to sending time
556557
transaction: Transaction ID if message should be part of a transaction
558+
topic: The routing key, if posting to a topic exchange
557559
kwargs: Arbitrary arguments for other transports. Ignored.
558560
"""
559561
assert delay is None, "Delay Not implemented"
@@ -572,7 +574,7 @@ def _broadcast(
572574

573575
self._pika_thread.send(
574576
exchange=destination,
575-
routing_key="",
577+
routing_key=topic or "",
576578
body=message,
577579
properties=properties,
578580
mandatory=False,

0 commit comments

Comments
 (0)