Skip to content

Commit ff5f767

Browse files
authored
Merge pull request #12 from dist-sys/refactor_v0.6.2
Refactor v0.6.2
2 parents 8711723 + e516ac2 commit ff5f767

File tree

11 files changed

+165
-48
lines changed

11 files changed

+165
-48
lines changed

build.gradle

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
mainClassName = 'mqttloader.Loader'
7-
applicationDefaultJvmArgs = ['-Djava.util.logging.config.file=logging.properties', '-Dfile.encoding=UTF-8']
7+
applicationDefaultJvmArgs = ['-Djava.util.logging.config.file=src/dist/logging.properties', '-Dfile.encoding=UTF-8', '--illegal-access=deny']
88

99
repositories {
1010
mavenCentral()
@@ -20,3 +20,13 @@ dependencies {
2020
run {
2121
args '-h'.split('\\s+')
2222
}
23+
24+
CreateStartScripts startScripts = project.startScripts
25+
startScripts.with {
26+
doLast {
27+
unixScript.text = unixScript.text.replaceFirst('-Djava.util.logging.config.file=src/dist/logging.properties',
28+
'-Djava.util.logging.config.file=\\$APP_HOME/logging.properties')
29+
windowsScript.text = windowsScript.text.replaceFirst('-Djava.util.logging.config.file=src/dist/logging.properties',
30+
'-Djava.util.logging.config.file=%~dp0../logging.properties')
31+
}
32+
}

doc/usage_en.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ For example, the following command uses a public MQTT broker provided by HiveMQ.
4747
`$ ./mqttloader -b tcp://broker.hivemq.com:1883 -p 1 -s 1 -m 10`
4848

4949
### Run on multiple machines
50-
You can run MQTTLoader on multiple machines, e.g., run publishers and subscribers separately on different machines.
50+
You can run MQTTLoader on multiple machines.
51+
Running both publishers and subscribers on a single machine may cause mutual influence, e.g., the subscribers' receiving load lowers the publishers' throughput.
52+
By running publishers and subscribers separately on different machines, you can avoid such mutual influence.
53+
5154
For example, on a host A, you can run MQTTLoader as follows:
5255

5356
`$ ./mqttloader -b tcp://<IP>:<PORT> -p 0 -s 1 -st 20 -n <NTP-SERVER>`
@@ -98,6 +101,9 @@ MQTTLoader starts to terminate when all of the following conditions are met.
98101
MQTTLoader also starts to terminate when the time specified by the parameter `-et` elapses, even if there are in-flight messages.
99102
Thus, `-et` should be long sufficiently.
100103

104+
If you want to do measurement with fixed time period, you can set the measurement time by the parameter `-et`.
105+
Note that you need to set sufficiently large value to the parameter `-m`.
106+
101107
By setting the parameter `-n`, MQTTLoader obtains the offset time from the specified NTP server and reflects it to calculate throughput and latency.
102108
It might be useful for running multiple MQTTLoader on different machines.
103109

@@ -121,6 +127,8 @@ Maximum latency[ms]: 81
121127
Average latency[ms]: 42.23691
122128
```
123129
For each publisher, MQTTLoader counts the number of messages sent for each second.
130+
If QoS level is set to 1 or 2, counting is done when receiving PUBACK or PUBCOMP respectively.
131+
124132
After completion, MQTTLoader collects the counted numbers from all publishers and calculates the maximum throughput, the average throughput, and the number of published messages.
125133
`Throughput[msg/s]` is the list of throughputs, which are the sum of each second for all publishers.
126134
Note that these calculation exclude the beginning and trailing seconds that have 0 messages.
@@ -159,6 +167,8 @@ Thus, when running multiple MQTTLoader on different machines (e.g., publishers o
159167
By specifying the file name with `-tf` parameter, you can obtain throughput data like the following.
160168

161169
```
170+
Measurement start time: 2020-09-01 18:33:38.122 JST
171+
Measurement end time: 2020-09-01 18:33:54.104 JST
162172
SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000
163173
0, 11955, 11218
164174
1, 16427, 16414
@@ -173,6 +183,8 @@ The data that used to calculate the summary data in the standard output is writt
173183
By specifying the file name with `-lf` parameter, you can obtain latency data like the following.
174184

175185
```
186+
Measurement start time: 2020-09-01 18:33:38.122 JST
187+
Measurement end time: 2020-09-01 18:33:54.104 JST
176188
mqttloaderclient-sub000000, mqttloaderclient-sub000001
177189
7, 7
178190
4, 4

doc/usage_jp.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ MQTTLoaderの動作を確認するだけなら、パブリックブローカを
4848
`$ ./mqttloader -b tcp://broker.hivemq.com:1883 -p 1 -s 1 -m 10`
4949

5050
### 複数台での実行
51-
publisherとsubscriberを別マシンで動かす、等、複数台のマシン上でMQTTLoaderを動かすこともできます。
51+
複数台のマシン上でMQTTLoaderを動かすこともできます。
52+
1台のマシン上でpublisherとsubscriberを動かした場合、subscriberの受信負荷によってpublisherの送信スループットが低下する等の可能性があります。
53+
publisherとsubscriberを別マシンで動かすことで、負荷が相互に影響することを避けることができます。
54+
5255
例えば、ホストA上で以下のように実行します。
5356

5457
`$ ./mqttloader -b tcp://<IP>:<PORT> -p 0 -s 1 -st 20 -n <NTP-SERVER>`
@@ -99,6 +102,8 @@ MQTTLoaderは、以下の条件をすべて満たすと、クライアントを
99102
また、MQTTLoaderは、パラメータ`-et`によって指定される時間が経過すると、メッセージ送受信中であっても、終了します。
100103
送受信を中断したくない場合は、`-et`は長めに設定しておくと良いでしょう。
101104

105+
一定時間の測定を行いたい場合には、`-et`を用いて測定時間を設定し、`-m`で十分に大きな値を設定します。
106+
102107
パラメータ`-n`を設定すると、MQTTLoaderは指定されたNTPサーバから時刻のオフセット情報(NTPサーバ時刻からのずれ)を取得し、スループットやレイテンシの計算にそれを反映します。
103108
複数のMQTTLoaderを異なるマシン上で実行する場合に、利用を検討してください。
104109

@@ -123,6 +128,8 @@ Average latency[ms]: 42.23691
123128
```
124129

125130
MQTTLoaderは、各publisherごとに、毎秒の送信メッセージ数をカウントします。
131+
QoSレベルが1または2の場合は、それぞれ、PUBACKおよびPUBCOMPを受信したタイミングでカウントされます。
132+
126133
全てのメッセージ送信が完了したら、MQTTLoaderは全publisherからカウントしたメッセージ数の情報を集めて集計し、最大スループット、平均スループット、送信メッセージ数を計算します。
127134
`Throughput[msg/s]`の項は、スループット値の列挙です。列挙されているそれぞれの値は、各秒における全publisherの送信メッセージ数を足し合わせたものです。
128135
なお、測定開始時および終了時に送信メッセージ数が0の期間がある場合は、スループットの計算からは除外されます。
@@ -162,6 +169,8 @@ subscriberに関しても、上記と同様にして、受信メッセージの
162169
パラメータ`-tf`でファイル名を指定することで、以下のようなスループットの詳細データをファイルに書き出すことができます。
163170

164171
```
172+
Measurement start time: 2020-09-01 18:33:38.122 JST
173+
Measurement end time: 2020-09-01 18:33:44.104 JST
165174
SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000
166175
0, 11955, 11218
167176
1, 16427, 16414
@@ -176,6 +185,8 @@ SLOT, mqttloaderclient-pub000000, mqttloaderclient-sub000000
176185
パラメータ`-lf`でファイル名を指定することで、以下のようなレイテンシの詳細データをファイルに書き出すことができます。
177186

178187
```
188+
Measurement start time: 2020-09-01 18:33:38.122 JST
189+
Measurement end time: 2020-09-01 18:33:44.104 JST
179190
mqttloaderclient-sub000000, mqttloaderclient-sub000001
180191
7, 7
181192
4, 4

src/main/java/mqttloader/Loader.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import java.net.InetAddress;
2323
import java.net.SocketException;
2424
import java.net.UnknownHostException;
25+
import java.text.SimpleDateFormat;
2526
import java.util.ArrayList;
27+
import java.util.Date;
2628
import java.util.Iterator;
2729
import java.util.Timer;
2830
import java.util.TreeMap;
@@ -52,10 +54,12 @@ public class Loader {
5254
private ArrayList<IClient> publishers = new ArrayList<>();
5355
private ArrayList<IClient> subscribers = new ArrayList<>();
5456
public static volatile long startTime;
57+
private long endTime;
5558
public static volatile long offset = 0;
5659
public static volatile long lastRecvTime;
5760
public static CountDownLatch countDownLatch;
5861
public static Logger logger = Logger.getLogger(Loader.class.getName());
62+
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
5963

6064
public Loader(String[] args) {
6165
setOptions(args);
@@ -91,17 +95,27 @@ public Loader(String[] args) {
9195
}
9296

9397
int execTime = Integer.valueOf(cmd.getOptionValue(Opt.EXEC_TIME.getName(), Opt.EXEC_TIME.getDefaultValue()));
98+
long holdTime = startTime - Util.getTime();
99+
if(holdTime > 0) execTime += (int)holdTime;
94100
try {
95101
countDownLatch.await(execTime, TimeUnit.SECONDS);
96102
} catch (InterruptedException e) {
97103
e.printStackTrace();
98104
}
99105

106+
if(countDownLatch.getCount()>0) {
107+
logger.info("Measurement timed out.");
108+
} else {
109+
logger.info("Measurement completed.");
110+
}
111+
100112
timer.cancel();
101113

102114
logger.info("Terminating clients.");
103115
disconnectClients();
104116

117+
endTime = Util.getTime();
118+
105119
logger.info("Printing results.");
106120
dataCleansing();
107121

@@ -164,13 +178,22 @@ private void prepareClients() {
164178
int pubInterval = Integer.valueOf(cmd.getOptionValue(Opt.INTERVAL.getName(), Opt.INTERVAL.getDefaultValue()));
165179

166180
for(int i=0;i<numPub;i++){
181+
if(i == 0) {
182+
logger.info("Publishers start to connect.");
183+
}
184+
167185
if(version==5){
168186
publishers.add(new Publisher(i, broker, pubQos, retain, topic, payloadSize, numMessage, pubInterval));
169187
}else{
170188
publishers.add(new PublisherV3(i, broker, pubQos, retain, topic, payloadSize, numMessage, pubInterval));
171189
}
172190
}
191+
173192
for(int i=0;i<numSub;i++){
193+
if(i == 0) {
194+
logger.info("Subscribers start to connect.");
195+
}
196+
174197
if(version==5){
175198
subscribers.add(new Subscriber(i, broker, subQos, shSub, topic));
176199
}else{
@@ -179,11 +202,15 @@ private void prepareClients() {
179202
}
180203
}
181204

205+
/**
206+
* Start measurement by running publishers.
207+
*/
182208
private void startMeasurement() {
183209
String ntpServer = cmd.getOptionValue(Opt.NTP.getName(), Opt.NTP.getDefaultValue());
184210
if(ntpServer != null) {
185211
logger.info("Getting time information from NTP server.");
186212
NTPUDPClient client = new NTPUDPClient();
213+
client.setDefaultTimeout(5000);
187214
InetAddress address = null;
188215
TimeInfo ti = null;
189216
try {
@@ -197,9 +224,14 @@ private void startMeasurement() {
197224
} catch (IOException e) {
198225
e.printStackTrace();
199226
}
200-
ti.computeDetails();
201-
offset = ti.getOffset();
202-
logger.info("Offset is "+offset+" milliseconds.");
227+
228+
if(ti != null) {
229+
ti.computeDetails();
230+
offset = ti.getOffset();
231+
logger.info("Offset is "+offset+" milliseconds.");
232+
} else {
233+
logger.warning("Failed to get time information from NTP server.");
234+
}
203235
}
204236

205237
// delay: Give ScheduledExecutorService time to setup scheduling.
@@ -212,11 +244,20 @@ private void startMeasurement() {
212244
}
213245

214246
private void disconnectClients() {
215-
for(IClient pub: publishers){
216-
pub.disconnect();
247+
for(int i=0;i<publishers.size();i++){
248+
if(i == 0) {
249+
logger.info("Publishers start to disconnect.");
250+
}
251+
252+
publishers.get(i).disconnect();
217253
}
218-
for(IClient sub: subscribers){
219-
sub.disconnect();
254+
255+
for(int i=0;i<subscribers.size();i++){
256+
if(i == 0) {
257+
logger.info("Subscribers start to disconnect.");
258+
}
259+
260+
subscribers.get(i).disconnect();
220261
}
221262
}
222263

@@ -366,6 +407,11 @@ private void printLatency() {
366407
private void thToFile(){
367408
StringBuilder sb = new StringBuilder();
368409

410+
String sTime = sdf.format(new Date(startTime));
411+
String eTime = sdf.format(new Date(endTime));
412+
sb.append("Measurement start time: "+sTime+"\n");
413+
sb.append("Measurement end time: "+eTime+"\n");
414+
369415
sb.append("SLOT");
370416
for(int i=0;i<publishers.size();i++){
371417
sb.append(", "+publishers.get(i).getClientId());
@@ -425,6 +471,11 @@ private void thToFile(){
425471
private void ltToFile(){
426472
StringBuilder sb = new StringBuilder();
427473

474+
String sTime = sdf.format(new Date(startTime));
475+
String eTime = sdf.format(new Date(endTime));
476+
sb.append("Measurement start time: "+sTime+"\n");
477+
sb.append("Measurement end time: "+eTime+"\n");
478+
428479
for(int i=0;i<subscribers.size();i++){
429480
if(i>0) sb.append(", ");
430481
sb.append(subscribers.get(i).getClientId());

src/main/java/mqttloader/LogFormatter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public String format(LogRecord record) {
2727
return String.format(
2828
"[%s] %s %s#%s %s%n",
2929
record.getLevel().getName(),
30-
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(record.getMillis()),
30+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z").format(record.getMillis()),
3131
record.getSourceClassName(),
3232
record.getSourceMethodName(),
3333
record.getMessage()

src/main/java/mqttloader/RecvTimeoutTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public RecvTimeoutTask(Timer timer, int subTimeout) {
3535
public void run() {
3636
long remainingTime = subTimeout*1000 - (Util.getTime() - lastRecvTime); // <timeout> - <elapsed time>
3737
if (remainingTime <= 0) {
38+
Loader.logger.info("Receiving messages on subscribers timed out.");
3839
countDownLatch.countDown();
3940
} else {
4041
timer.schedule(new RecvTimeoutTask(timer, subTimeout), remainingTime);

src/main/java/mqttloader/client/Publisher.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public Publisher(int clientNumber, String broker, int qos, boolean retain, Strin
6565
client.connect(options);
6666
Loader.logger.info("Publisher client is connected: "+clientId);
6767
} catch (MqttException e) {
68+
Loader.logger.warning("Publisher client fails to connect: "+clientId);
6869
e.printStackTrace();
6970
}
7071
}
@@ -81,34 +82,41 @@ public void start(long delay) {
8182

8283
@Override
8384
public void run() {
84-
if(!client.isConnected()) {
85-
Loader.countDownLatch.countDown();
86-
} else {
87-
if(pubInterval==0){
88-
continuousRun();
89-
}else{
90-
periodicalRun();
91-
}
85+
if(pubInterval==0){
86+
continuousRun();
87+
}else{
88+
periodicalRun();
9289
}
9390
}
9491

9592
public void continuousRun() {
9693
for(int i=0;i<numMessage;i++){
9794
if(cancelled) {
95+
Loader.logger.info("Publish task is cancelled: "+clientId);
9896
break;
9997
}
100-
publish();
98+
if(client.isConnected()) {
99+
publish();
100+
} else {
101+
Loader.logger.warning("On sending publish, client was not connected: "+clientId);
102+
}
101103
}
102104

105+
Loader.logger.info("Publisher finishes to send publish: "+clientId);
103106
Loader.countDownLatch.countDown();
104107
}
105108

106109
public void periodicalRun() {
107110
if(numMessage > 0) {
108-
publish();
111+
if(client.isConnected()) {
112+
publish();
113+
} else {
114+
Loader.logger.warning("On sending publish, client was not connected: "+clientId);
115+
}
109116

110117
numMessage--;
111118
if(numMessage==0){
119+
Loader.logger.info("Publisher finishes to send publish: "+clientId);
112120
Loader.countDownLatch.countDown();
113121
}
114122
}
@@ -119,6 +127,7 @@ public void publish() {
119127
try{
120128
client.publish(topic, message);
121129
} catch(MqttException me) {
130+
Loader.logger.warning("On sending publish, MqttException occurred: "+clientId);
122131
me.printStackTrace();
123132
}
124133

@@ -151,10 +160,13 @@ public void disconnect() {
151160
e.printStackTrace();
152161
}
153162

154-
try {
155-
client.disconnect();
156-
} catch (MqttException e) {
157-
e.printStackTrace();
163+
if (client.isConnected()) {
164+
try {
165+
client.disconnect();
166+
Loader.logger.info("Publisher client is disconnected: "+clientId);
167+
} catch (MqttException e) {
168+
e.printStackTrace();
169+
}
158170
}
159171
}
160172

0 commit comments

Comments
 (0)