Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions amqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ async def _handle_message_delivery(
f"[MQTT-3.3.2-2] - {client_session.client_id} invalid TOPIC sent in PUBLISH message, closing connection",
)
return False
if app_message.topic.startswith("$"):
if app_message.topic.startswith("$") and not self.config.get("allow_dollar_topics", False):
self.logger.warning(
f"[MQTT-4.7.2-1] - {client_session.client_id} cannot use a topic with a leading $ character."
)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ async def _publish_retained_messages_for_subscription(self, subscription: tuple[
)

def _matches(self, topic: str, a_filter: str) -> bool:
if topic.startswith("$") and (a_filter.startswith(("+", "#"))):
if topic.startswith("$") and (a_filter.startswith(("+", "#"))) and not self.config.get("allow_dollar_topics", False):
self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #")
return False

Expand Down
2 changes: 2 additions & 0 deletions amqtt/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class BrokerConfig(Dictable):
[`AnonymousAuthPlugin`](../plugins/packaged_plugins.md#anonymous-auth-plugin) and
[`FileAuthPlugin`](../plugins/packaged_plugins.md#password-file-auth-plugin) for recommended configuration.*"""
topic_check: dict[str, Any] | None = None
"""Controls whether the broker accepts client-sent topics starting with a dollar sign (`$`)."""
allow_dollar_topics: bool | None = False
"""*Deprecated field used to config EntryPoint-loaded plugins. See
[`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and
[`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*"""
Expand Down
106 changes: 106 additions & 0 deletions tests/test_dollar_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,40 @@ async def test_publish_to_dollar_sign_topics():
await asyncio.sleep(0.1)
await b.shutdown()


@pytest.mark.asyncio
async def test_publish_to_dollar_sign_topics_if_allowed():
"""Applications can use a topic with a leading $ character for their own purposes if it is allowed."""

cfg = {
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
'allow_dollar_topics': True,
'plugins': {'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True}},
}

b = Broker(config=cfg)
await b.start()
await asyncio.sleep(0.1)
c = MQTTClient(config={'auto_reconnect': False})
await c.connect()
await asyncio.sleep(0.1)
await c.subscribe(
[('$#', QOS_0),
('#', QOS_0)]
)
await asyncio.sleep(0.1)
await c.publish('$MY', b'message should not be blocked')
await asyncio.sleep(0.1)

msg = await c.deliver_message()
assert msg.topic == '$MY'
assert msg.data == b'message should not be blocked'

await c.disconnect()
await asyncio.sleep(0.1)
await b.shutdown()


@pytest.mark.asyncio
async def test_hash_will_not_receive_dollar():
"""A subscription to “#” will not receive any messages published to a topic beginning with a $ [MQTT-4.7.2-1]."""
Expand Down Expand Up @@ -73,6 +107,39 @@ async def test_hash_will_not_receive_dollar():
await b.shutdown()


@pytest.mark.asyncio
async def test_hash_will_receive_dollar_if_allowed():
"""A subscription to “#” will not receive messages published to a topic beginning with a $ if it is allowed."""

cfg = {
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
'allow_dollar_topics': True,
'plugins': {
'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True},
'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2}
}
}

b = Broker(config=cfg)
await b.start()
await asyncio.sleep(0.1)
c = MQTTClient(config={'auto_reconnect': False})
await c.connect()
await asyncio.sleep(0.1)
await c.subscribe(
[('#', QOS_0)]
)
await asyncio.sleep(0.1)

msg = await c.deliver_message()
assert msg.topic == '$SYS/broker/version'
assert b"aMQTT" in msg.data

await c.disconnect()
await asyncio.sleep(0.1)
await b.shutdown()


@pytest.mark.asyncio
async def test_plus_will_not_receive_dollar():
"""A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients [MQTT-4.7.2-1]"""
Expand Down Expand Up @@ -108,3 +175,42 @@ async def test_plus_will_not_receive_dollar():
await c.disconnect()
await asyncio.sleep(0.1)
await b.shutdown()


@pytest.mark.asyncio
async def test_plus_will_receive_dollar_if_allowed():
"""A subscription to “+/monitor/Clients” will receive any messages published to “$SYS/monitor/Clients if it is allowed."""
# BrokerSysPlugin doesn't use $SYS/monitor/Clients, so this is an equivalent test with $SYS/broker topics

cfg = {
'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}},
'allow_dollar_topics': True,
'plugins': {
'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True},
'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2}
}
}

b = Broker(config=cfg)
await b.start()
await asyncio.sleep(0.1)
c = MQTTClient(config={'auto_reconnect': False})
await c.connect()
await asyncio.sleep(0.1)
await c.subscribe(
[('+/broker/#', QOS_0),
('+/broker/time', QOS_0),
('+/broker/clients/#', QOS_0),
('+/broker/+/maximum', QOS_0)
]
)
await asyncio.sleep(0.1)


msg = await c.deliver_message()
assert msg.topic == '$SYS/broker/version'
assert b"aMQTT" in msg.data

await c.disconnect()
await asyncio.sleep(0.1)
await b.shutdown()