Skip to content

Commit 43e097a

Browse files
committed
Refactor plugin publishing to use base_msg instead of mosquitto_message_v5
1 parent ac5d7ba commit 43e097a

File tree

9 files changed

+173
-62
lines changed

9 files changed

+173
-62
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ add_executable(mosquitto
33
../plugins/acl-file/acl_check.c
44
../plugins/acl-file/acl_parse.c
55
../lib/alias_mosq.c ../lib/alias_mosq.h
6+
base_msg.c
67
bridge.c bridge_topic.c
78
broker_control.c
89
conf.c

src/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ endif
4646

4747
OBJS= mosquitto.o \
4848
acl_file.o \
49+
base_msg.o \
4950
bridge.o \
5051
bridge_topic.o \
5152
broker_control.o \

src/base_msg.c

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3+
4+
All rights reserved. This program and the accompanying materials
5+
are made available under the terms of the Eclipse Public License 2.0
6+
and Eclipse Distribution License v1.0 which accompany this distribution.
7+
8+
The Eclipse Public License is available at
9+
https://www.eclipse.org/legal/epl-2.0/
10+
and the Eclipse Distribution License is available at
11+
http://www.eclipse.org/org/documents/edl-v10.php.
12+
13+
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14+
15+
Contributors:
16+
Roger Light - initial implementation and documentation.
17+
*/
18+
19+
#include "mosquitto_broker_internal.h"
20+
21+
/* This is using the uthash internal hh.prev/hh.next pointers to make our own
22+
* double linked list. This function must not be used on a base_msg that is
23+
* stored in a hash table. */
24+
void base_msg__dl_append(struct mosquitto__base_msg **head, struct mosquitto__base_msg *add_msg)
25+
{
26+
if(*head){
27+
add_msg->hh.prev = db.plugin_msgs->hh.prev;
28+
((struct mosquitto__base_msg *)(*head)->hh.prev)->hh.next = add_msg;
29+
(*head)->hh.prev = add_msg;
30+
add_msg->hh.next = NULL;
31+
} else {
32+
(*head) = add_msg;
33+
(*head)->hh.prev = (*head);
34+
(*head)->hh.next = NULL;
35+
}
36+
}
37+
38+
/* This is using the uthash internal hh.prev/hh.next pointers to make our own
39+
* double linked list. This function must not be used on a base_msg that is
40+
* stored in a hash table. */
41+
void base_msg__dl_delete(struct mosquitto__base_msg **head, struct mosquitto__base_msg *del_msg)
42+
{
43+
if(del_msg->hh.prev == del_msg){
44+
*head = NULL;
45+
}else if(del_msg == *head){
46+
((struct mosquitto__base_msg *)del_msg->hh.next)->hh.prev = del_msg->hh.prev;
47+
*head = del_msg->hh.next;
48+
}else{
49+
((struct mosquitto__base_msg *)del_msg->hh.prev)->hh.next = del_msg->hh.next;
50+
if(del_msg->hh.next){
51+
((struct mosquitto__base_msg *)del_msg->hh.next)->hh.prev = del_msg->hh.prev;
52+
}else{
53+
(*head)->hh.prev = del_msg->hh.prev;
54+
}
55+
}
56+
}

src/loop.c

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -66,44 +66,20 @@ static struct lws_sorted_usec_list sul;
6666
#endif
6767

6868

69-
static int single_publish(struct mosquitto *context, struct mosquitto__message_v5 *pub_msg, uint32_t message_expiry)
69+
static int single_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint32_t message_expiry)
7070
{
71-
struct mosquitto__base_msg *base_msg;
7271
uint16_t mid;
7372

74-
base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg));
75-
if(base_msg == NULL){
76-
return MOSQ_ERR_NOMEM;
77-
}
78-
79-
base_msg->data.topic = pub_msg->topic;
80-
pub_msg->topic = NULL;
81-
base_msg->data.retain = 0;
82-
base_msg->data.payloadlen = (uint32_t)pub_msg->payloadlen;
83-
base_msg->data.payload = mosquitto_malloc(base_msg->data.payloadlen+1);
84-
if(base_msg->data.payload == NULL){
85-
db__msg_store_free(base_msg);
86-
return MOSQ_ERR_NOMEM;
87-
}
88-
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
89-
((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0;
90-
memcpy(base_msg->data.payload, pub_msg->payload, base_msg->data.payloadlen);
91-
92-
if(pub_msg->properties){
93-
base_msg->data.properties = pub_msg->properties;
94-
pub_msg->properties = NULL;
95-
}
96-
9773
if(db__message_store(context, base_msg, &message_expiry, mosq_mo_broker)){
9874
return 1;
9975
}
10076

101-
if(pub_msg->qos){
77+
if(base_msg->data.qos){
10278
mid = mosquitto__mid_generate(context);
10379
}else{
10480
mid = 0;
10581
}
106-
return db__message_insert_outgoing(context, 0, mid, (uint8_t)pub_msg->qos, 0, base_msg, 0, true, true);
82+
return db__message_insert_outgoing(context, 0, mid, base_msg->data.qos, 0, base_msg, 0, true, true);
10783
}
10884

10985

@@ -138,28 +114,34 @@ static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t
138114

139115
static void queue_plugin_msgs(void)
140116
{
141-
struct mosquitto__message_v5 *msg, *tmp;
117+
struct mosquitto__base_msg *base_msg, *base_tmp;
142118
struct mosquitto *context;
143119
uint32_t message_expiry;
144120

145-
DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){
146-
DL_DELETE(db.plugin_msgs, msg);
121+
for(base_msg = db.plugin_msgs; base_msg && (base_tmp = base_msg->hh.next, 1); base_msg = base_tmp){
122+
base_msg__dl_delete(&db.plugin_msgs, base_msg);
147123

148-
read_message_expiry_interval(&msg->properties, &message_expiry);
124+
read_message_expiry_interval(&base_msg->data.properties, &message_expiry);
149125

150-
if(msg->clientid){
151-
HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context);
126+
if(base_msg->data.source_id){
127+
HASH_FIND(hh_id, db.contexts_by_id, base_msg->data.source_id, strlen(base_msg->data.source_id), context);
128+
mosquitto_FREE(base_msg->data.source_id);
152129
if(context){
153-
single_publish(context, msg, message_expiry);
130+
single_publish(context, base_msg, message_expiry);
131+
}else{
132+
db__msg_store_free(base_msg);
154133
}
155134
}else{
156-
db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties);
135+
db__messages_easy_queue(NULL,
136+
base_msg->data.topic,
137+
base_msg->data.qos,
138+
base_msg->data.payloadlen,
139+
base_msg->data.payload,
140+
base_msg->data.retain,
141+
message_expiry,
142+
&base_msg->data.properties);
143+
db__msg_store_free(base_msg);
157144
}
158-
mosquitto_FREE(msg->topic);
159-
mosquitto_FREE(msg->payload);
160-
mosquitto_property_free_all(&msg->properties);
161-
mosquitto_FREE(msg->clientid);
162-
mosquitto_FREE(msg);
163145
}
164146
}
165147

@@ -197,7 +179,6 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
197179

198180
while(g_run){
199181
retain__expiry_check();
200-
queue_plugin_msgs();
201182
context__free_disused();
202183

203184
db.next_event_ms = 86400000;
@@ -262,6 +243,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
262243
}
263244
}
264245
#endif
246+
queue_plugin_msgs();
265247
}
266248

267249
return MOSQ_ERR_SUCCESS;

src/mosquitto_broker_internal.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ struct mosquitto_db {
502502
#ifdef WITH_KQUEUE
503503
int kqueuefd;
504504
#endif
505-
struct mosquitto__message_v5 *plugin_msgs;
505+
struct mosquitto__base_msg *plugin_msgs;
506506
#ifdef WITH_TLS
507507
/* tls_keylog can't be in the config struct because it is used
508508
before the config is allocated. Config probably
@@ -739,6 +739,8 @@ void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mos
739739
uint64_t db__new_msg_id(void);
740740
void db__expire_all_messages(struct mosquitto *context);
741741
void db__check_acl_of_all_messages(struct mosquitto *context);
742+
void base_msg__dl_append(struct mosquitto__base_msg **head, struct mosquitto__base_msg *add_msg);
743+
void base_msg__dl_delete(struct mosquitto__base_msg **head, struct mosquitto__base_msg *del_msg);
742744

743745
/* ============================================================
744746
* Subscription functions

src/plugin_public.c

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ BROKER_EXPORT int mosquitto_broker_publish(
219219
bool retain,
220220
mosquitto_property *properties)
221221
{
222-
struct mosquitto__message_v5 *msg;
222+
struct mosquitto__base_msg *base_msg;
223223

224224
if(topic == NULL
225225
|| payloadlen < 0
@@ -229,35 +229,31 @@ BROKER_EXPORT int mosquitto_broker_publish(
229229
return MOSQ_ERR_INVAL;
230230
}
231231

232-
msg = mosquitto_malloc(sizeof(struct mosquitto__message_v5));
233-
if(msg == NULL){
232+
base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg));
233+
if(base_msg == NULL){
234234
return MOSQ_ERR_NOMEM;
235235
}
236236

237-
msg->next = NULL;
238-
msg->prev = NULL;
239237
if(clientid){
240-
msg->clientid = mosquitto_strdup(clientid);
241-
if(msg->clientid == NULL){
242-
mosquitto_FREE(msg);
238+
base_msg->data.source_id = mosquitto_strdup(clientid);
239+
if(base_msg->data.source_id == NULL){
240+
mosquitto_FREE(base_msg);
243241
return MOSQ_ERR_NOMEM;
244242
}
245-
}else{
246-
msg->clientid = NULL;
247243
}
248-
msg->topic = mosquitto_strdup(topic);
249-
if(msg->topic == NULL){
250-
mosquitto_FREE(msg->clientid);
251-
mosquitto_FREE(msg);
244+
base_msg->data.topic = mosquitto_strdup(topic);
245+
if(base_msg->data.topic == NULL){
246+
mosquitto_FREE(base_msg->data.source_id);
247+
mosquitto_FREE(base_msg);
252248
return MOSQ_ERR_NOMEM;
253249
}
254-
msg->payloadlen = payloadlen;
255-
msg->payload = payload;
256-
msg->qos = qos;
257-
msg->retain = retain;
258-
msg->properties = properties;
250+
base_msg->data.payloadlen = (uint32_t)payloadlen;
251+
base_msg->data.payload = payload;
252+
base_msg->data.qos = (uint8_t)qos;
253+
base_msg->data.retain = retain;
254+
base_msg->data.properties = properties;
259255

260-
DL_APPEND(db.plugin_msgs, msg);
256+
base_msg__dl_append(&db.plugin_msgs, base_msg);
261257

262258
loop__update_next_event(1);
263259
return MOSQ_ERR_SUCCESS;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env python3
2+
3+
# Test whether a client subscribed to a topic receives its own message sent to that topic.
4+
5+
from mosq_test_helper import *
6+
7+
def write_config(filename, port):
8+
with open(filename, 'w') as f:
9+
f.write(f"listener {port}\n")
10+
f.write("allow_anonymous true\n")
11+
f.write("allow_duplicate_messages false\n")
12+
13+
def do_test(start_broker, proto_ver):
14+
rc = 1
15+
connect_packet = mosq_test.gen_connect("subpub-qos1-test", proto_ver=proto_ver)
16+
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
17+
18+
mid = 1
19+
subscribe1_packet = mosq_test.gen_subscribe(mid, "subpub/+/topic", 0, proto_ver=proto_ver)
20+
suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)
21+
22+
mid = 2
23+
subscribe2_packet = mosq_test.gen_subscribe(mid, "subpub/topic/+", 0, proto_ver=proto_ver)
24+
suback2_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)
25+
26+
publish_packet = mosq_test.gen_publish("subpub/topic/topic", qos=0, payload="message", proto_ver=proto_ver)
27+
28+
port = mosq_test.get_port()
29+
conf_file = os.path.basename(__file__).replace('.py', '.conf')
30+
write_config(conf_file, port)
31+
32+
if start_broker:
33+
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
34+
35+
try:
36+
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
37+
38+
mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback 1")
39+
mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback 2")
40+
41+
sock.send(publish_packet)
42+
mosq_test.expect_packet(sock, "publish 1", publish_packet)
43+
mosq_test.do_ping(sock)
44+
rc = 0
45+
46+
sock.close()
47+
except mosq_test.TestError:
48+
pass
49+
finally:
50+
if start_broker:
51+
broker.terminate()
52+
if mosq_test.wait_for_subprocess(broker):
53+
print("broker not terminated")
54+
if rc == 0: rc=1
55+
os.remove(conf_file)
56+
if rc:
57+
(stdo, stde) = broker.communicate()
58+
print(stde.decode('utf-8'))
59+
print("proto_ver=%d" % (proto_ver))
60+
exit(rc)
61+
else:
62+
return rc
63+
64+
65+
def all_tests(start_broker=False):
66+
rc = do_test(start_broker, proto_ver=4)
67+
if rc:
68+
return rc;
69+
70+
if __name__ == '__main__':
71+
all_tests(True)

test/broker/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ msg_sequence_test:
5151
./02-shared-qos0-v5.py
5252
./02-subhier-crash.py
5353
./02-subpub-b2c-topic-alias.py
54+
./02-subpub-duplicate-sub.py
5455
./02-subpub-qos0-long-topic.py
5556
./02-subpub-qos0-oversize-payload.py
5657
./02-subpub-qos0-queued-bytes.py

test/broker/test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
(1, './02-shared-qos0-v5.py'),
3333
(1, './02-subhier-crash.py'),
3434
(1, './02-subpub-b2c-topic-alias.py'),
35+
(1, './02-subpub-duplicate-sub.py'),
3536
(1, './02-subpub-qos0-long-topic.py'),
3637
(1, './02-subpub-qos0-oversize-payload.py'),
3738
(1, './02-subpub-qos0-queued-bytes.py'),

0 commit comments

Comments
 (0)