-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Expand file tree
/
Copy pathctrl-broker.py
More file actions
executable file
·96 lines (76 loc) · 3.09 KB
/
ctrl-broker.py
File metadata and controls
executable file
·96 lines (76 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python3
# mosquitto_ctrl broker
from mosq_test_helper import *
import json
import shutil
def write_config(filename, ports):
with open(filename, 'w') as f:
f.write("enable_control_api true\n")
f.write(f"global_plugin {mosq_plugins.DYNSEC_PLUGIN_PATH}\n")
f.write(f"plugin_opt_config_file {Path(str(ports[0]), 'dynamic-security.json')}\n")
f.write("allow_anonymous false\n")
f.write(f"listener {ports[0]}\n")
f.write(f"listener {ports[1]}\n")
f.write(f"certfile {Path(ssl_dir, 'server.crt')}\n")
f.write(f"keyfile {Path(ssl_dir, 'server.key')}\n")
def ctrl_cmd(cmd, args, ports, response=None):
opts = ["-u", "admin",
"-P", "admin",
"-V", "5"]
if response is None:
opts += [
"-p", str(ports[0]),
"-q", "1"
]
capture_output = False
else:
opts += ["-p", str(ports[1])]
opts += ["--cafile", str(Path(ssl_dir, "all-ca.crt"))]
capture_output = True
proc = subprocess.run([mosquitto_ctrl_path]
+ opts + [cmd] + args,
env=env, capture_output=True, encoding='utf-8')
if response is not None:
if proc.stdout != response:
raise ValueError(proc.stdout)
if proc.returncode != 0:
raise ValueError(args)
rc = 0
ports = mosq_test.get_port(2)
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, ports)
env = mosq_test.env_add_ld_library_path()
if not os.path.exists(str(ports[0])):
os.mkdir(str(ports[0]))
# Generate initial dynsec file
ctrl_cmd("dynsec", ["init", Path(str(ports[0]), "dynamic-security.json"), "admin", "admin"], ports)
ctrl_cmd("broker", ["help"], ports)
# Then start broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=ports[0])
try:
ctrl_cmd("dynsec", ["addRoleACL", "admin", "publishClientSend", "$CONTROL/#", "allow"], ports)
ctrl_cmd("dynsec", ["addRoleACL", "admin", "publishClientReceive", "$CONTROL/#", "allow"], ports)
ctrl_cmd("dynsec", ["addRoleACL", "admin", "subscribePattern", "$CONTROL/#", "allow"], ports)
ctrl_cmd("broker", ["listListeners"], ports, response=f"Listener 1:\n Port: {ports[0]}\n Protocol: mqtt\n TLS: false\n\nListener 2:\n Port: {ports[1]}\n Protocol: mqtt\n TLS: true\n\n")
ctrl_cmd("broker", ["listPlugins"], ports, response="Plugin: dynamic-security\nControl endpoints: $CONTROL/dynamic-security/v1\n")
rc = 0
except mosq_test.TestError:
pass
except Exception as err:
print(err)
finally:
os.remove(conf_file)
try:
os.remove(Path(str(ports[0]), "dynamic-security.json"))
pass
except FileNotFoundError:
pass
shutil.rmtree(f"{ports[0]}")
mosq_test.terminate_broker(broker)
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
if rc == 0: rc=1
(_, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)