Skip to content

Commit 8711723

Browse files
authored
Merge pull request #11 from dist-sys/fix_issue_10
Fix issue #10.
2 parents 9a2afe6 + f1f006b commit 8711723

File tree

6 files changed

+138
-112
lines changed

6 files changed

+138
-112
lines changed

src/main/java/mqttloader/Loader.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public class Loader {
5151
private CommandLine cmd = null;
5252
private ArrayList<IClient> publishers = new ArrayList<>();
5353
private ArrayList<IClient> subscribers = new ArrayList<>();
54-
public static long startTime;
55-
public static long offset = 0;
56-
public static long lastRecvTime;
54+
public static volatile long startTime;
55+
public static volatile long offset = 0;
56+
public static volatile long lastRecvTime;
5757
public static CountDownLatch countDownLatch;
5858
public static Logger logger = Logger.getLogger(Loader.class.getName());
5959

@@ -202,10 +202,12 @@ private void startMeasurement() {
202202
logger.info("Offset is "+offset+" milliseconds.");
203203
}
204204

205-
startTime = Util.getTime();
205+
// delay: Give ScheduledExecutorService time to setup scheduling.
206+
long delay = publishers.size();
207+
startTime = Util.getTime() + delay;
206208
lastRecvTime = startTime;
207209
for(IClient pub: publishers){
208-
pub.start();
210+
pub.start(delay);
209211
}
210212
}
211213

src/main/java/mqttloader/client/IClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
public interface IClient {
2525
String getClientId();
26-
void start();
26+
void start(long delay);
2727
void disconnect();
2828
ArrayList<Throughput> getThroughputs();
2929
ArrayList<Latency> getLatencies();

src/main/java/mqttloader/client/Publisher.java

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX;
2020

2121
import java.util.ArrayList;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import java.util.concurrent.ScheduledFuture;
23-
import java.util.concurrent.ScheduledThreadPoolExecutor;
2425
import java.util.concurrent.TimeUnit;
2526

2627
import mqttloader.Loader;
@@ -35,26 +36,27 @@
3536
public class Publisher implements Runnable, IClient {
3637
private MqttClient client;
3738
private final String clientId;
38-
private String topic;
39-
private int payloadSize;
39+
private final String topic;
40+
private final int payloadSize;
4041
private int numMessage;
41-
private int pubInterval;
42+
private final int pubInterval;
4243
private MqttMessage message = new MqttMessage();
43-
private boolean hasInterval;
4444

45+
// If change publisher to be multi-threaded, throughputs (and others) should be thread-safe.
4546
private ArrayList<Throughput> throughputs = new ArrayList<>();
4647

47-
private ScheduledThreadPoolExecutor service;
48+
private ScheduledExecutorService service;
4849
private ScheduledFuture future;
4950

51+
private volatile boolean cancelled = false;
52+
5053
public Publisher(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) {
5154
message.setQos(qos);
5255
message.setRetained(retain);
5356
this.topic = topic;
5457
this.payloadSize = payloadSize;
5558
this.numMessage = numMessage;
5659
this.pubInterval = pubInterval;
57-
hasInterval = pubInterval > 0;
5860

5961
clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber);
6062
MqttConnectionOptions options = new MqttConnectionOptions();
@@ -68,20 +70,50 @@ public Publisher(int clientNumber, String broker, int qos, boolean retain, Strin
6870
}
6971

7072
@Override
71-
public void start() {
72-
service = new ScheduledThreadPoolExecutor(1);
73+
public void start(long delay) {
74+
service = Executors.newSingleThreadScheduledExecutor();
7375
if(pubInterval==0){
74-
future = service.schedule(this, 0, TimeUnit.MILLISECONDS);
76+
future = service.schedule(this, delay, TimeUnit.MILLISECONDS);
7577
}else{
76-
future = service.scheduleAtFixedRate(this, 0, pubInterval, TimeUnit.MILLISECONDS);
78+
future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS);
7779
}
7880
}
7981

80-
public void terminate() {
81-
service.shutdown();
82+
@Override
83+
public void run() {
84+
if(!client.isConnected()) {
85+
Loader.countDownLatch.countDown();
86+
} else {
87+
if(pubInterval==0){
88+
continuousRun();
89+
}else{
90+
periodicalRun();
91+
}
92+
}
93+
}
94+
95+
public void continuousRun() {
96+
for(int i=0;i<numMessage;i++){
97+
if(cancelled) {
98+
break;
99+
}
100+
publish();
101+
}
102+
82103
Loader.countDownLatch.countDown();
83104
}
84105

106+
public void periodicalRun() {
107+
if(numMessage > 0) {
108+
publish();
109+
110+
numMessage--;
111+
if(numMessage==0){
112+
Loader.countDownLatch.countDown();
113+
}
114+
}
115+
}
116+
85117
public void publish() {
86118
message.setPayload(Util.genPayloads(payloadSize));
87119
try{
@@ -105,42 +137,18 @@ public void publish() {
105137
Loader.logger.fine("Published a message (" + topic + "): "+clientId);
106138
}
107139

108-
public void periodicalRun() {
109-
publish();
110-
111-
numMessage--;
112-
if(numMessage==0){
113-
terminate();
114-
}
115-
}
116-
117-
public void continuousRun() {
118-
for(int i=0;i<numMessage;i++){
119-
if(future.isCancelled()) break;
120-
publish();
121-
}
122-
123-
terminate();
124-
}
125-
126-
@Override
127-
public void run() {
128-
if(!client.isConnected()) {
129-
terminate();
130-
}
131-
132-
if(pubInterval==0){
133-
continuousRun();
134-
}else{
135-
periodicalRun();
136-
}
137-
}
138-
139140
@Override
140141
public void disconnect() {
141142
if(!future.isDone()) {
143+
cancelled = true;
142144
future.cancel(false);
143-
service.shutdown();
145+
}
146+
147+
service.shutdown();
148+
try {
149+
service.awaitTermination(1, TimeUnit.SECONDS);
150+
} catch (InterruptedException e) {
151+
e.printStackTrace();
144152
}
145153

146154
try {

src/main/java/mqttloader/client/PublisherV3.java

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
import static mqttloader.Constants.PUB_CLIENT_ID_PREFIX;
2020

2121
import java.util.ArrayList;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import java.util.concurrent.ScheduledFuture;
23-
import java.util.concurrent.ScheduledThreadPoolExecutor;
2425
import java.util.concurrent.TimeUnit;
2526

2627
import mqttloader.Loader;
@@ -35,26 +36,27 @@
3536
public class PublisherV3 implements Runnable, IClient {
3637
private MqttClient client;
3738
private final String clientId;
38-
private String topic;
39-
private int payloadSize;
39+
private final String topic;
40+
private final int payloadSize;
4041
private int numMessage;
41-
private int pubInterval;
42+
private final int pubInterval;
4243
private MqttMessage message = new MqttMessage();
43-
private boolean hasInterval;
4444

45+
// If change publisher to be multi-threaded, throughputs (and others) should be thread-safe.
4546
private ArrayList<Throughput> throughputs = new ArrayList<>();
4647

47-
private ScheduledThreadPoolExecutor service;
48+
private ScheduledExecutorService service;
4849
private ScheduledFuture future;
4950

51+
private volatile boolean cancelled = false;
52+
5053
public PublisherV3(int clientNumber, String broker, int qos, boolean retain, String topic, int payloadSize, int numMessage, int pubInterval) {
5154
message.setQos(qos);
5255
message.setRetained(retain);
5356
this.topic = topic;
5457
this.payloadSize = payloadSize;
5558
this.numMessage = numMessage;
5659
this.pubInterval = pubInterval;
57-
hasInterval = pubInterval > 0;
5860

5961
clientId = PUB_CLIENT_ID_PREFIX + String.format("%06d", clientNumber);
6062
MqttConnectOptions options = new MqttConnectOptions();
@@ -69,20 +71,50 @@ public PublisherV3(int clientNumber, String broker, int qos, boolean retain, Str
6971
}
7072

7173
@Override
72-
public void start() {
73-
service = new ScheduledThreadPoolExecutor(1);
74+
public void start(long delay) {
75+
service = Executors.newSingleThreadScheduledExecutor();
7476
if(pubInterval==0){
75-
future = service.schedule(this, 0, TimeUnit.MILLISECONDS);
77+
future = service.schedule(this, delay, TimeUnit.MILLISECONDS);
7678
}else{
77-
future = service.scheduleAtFixedRate(this, 0, pubInterval, TimeUnit.MILLISECONDS);
79+
future = service.scheduleAtFixedRate(this, delay, pubInterval, TimeUnit.MILLISECONDS);
7880
}
7981
}
8082

81-
public void terminate() {
82-
service.shutdown();
83+
@Override
84+
public void run() {
85+
if(!client.isConnected()) {
86+
Loader.countDownLatch.countDown();
87+
} else {
88+
if(pubInterval==0){
89+
continuousRun();
90+
}else{
91+
periodicalRun();
92+
}
93+
}
94+
}
95+
96+
public void continuousRun() {
97+
for(int i=0;i<numMessage;i++){
98+
if(cancelled) {
99+
break;
100+
}
101+
publish();
102+
}
103+
83104
Loader.countDownLatch.countDown();
84105
}
85106

107+
public void periodicalRun() {
108+
if(numMessage > 0) {
109+
publish();
110+
111+
numMessage--;
112+
if(numMessage==0){
113+
Loader.countDownLatch.countDown();
114+
}
115+
}
116+
}
117+
86118
public void publish() {
87119
message.setPayload(Util.genPayloads(payloadSize));
88120
try {
@@ -106,42 +138,18 @@ public void publish() {
106138
Loader.logger.fine("Published a message (" + topic + "): "+clientId);
107139
}
108140

109-
public void periodicalRun() {
110-
publish();
111-
112-
numMessage--;
113-
if(numMessage==0){
114-
terminate();
115-
}
116-
}
117-
118-
public void continuousRun() {
119-
for(int i=0;i<numMessage;i++){
120-
if(future.isCancelled()) break;
121-
publish();
122-
}
123-
124-
terminate();
125-
}
126-
127-
@Override
128-
public void run() {
129-
if(!client.isConnected()) {
130-
terminate();
131-
}
132-
133-
if(pubInterval==0){
134-
continuousRun();
135-
}else{
136-
periodicalRun();
137-
}
138-
}
139-
140141
@Override
141142
public void disconnect() {
142143
if(!future.isDone()) {
144+
cancelled = true;
143145
future.cancel(false);
144-
service.shutdown();
146+
}
147+
148+
service.shutdown();
149+
try {
150+
service.awaitTermination(1, TimeUnit.SECONDS);
151+
} catch (InterruptedException e) {
152+
e.printStackTrace();
145153
}
146154

147155
try {

src/main/java/mqttloader/client/Subscriber.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public Subscriber(int clientNumber, String broker, int qos, boolean shSub, Strin
6363
}
6464

6565
@Override
66-
public void start(){
66+
public void start(long delay){
6767
}
6868

6969
@Override
@@ -100,19 +100,23 @@ public void mqttErrorOccurred(MqttException exception) {}
100100
public void messageArrived(String topic, MqttMessage message) throws Exception {
101101
long time = Util.getTime();
102102
int slot = (int)((time-Loader.startTime)/1000);
103-
if(throughputs.size()>0){
104-
Throughput lastTh = throughputs.get(throughputs.size()-1);
105-
if(lastTh.getSlot() == slot) {
106-
lastTh.setCount(lastTh.getCount()+1);
103+
synchronized (throughputs) {
104+
if(throughputs.size()>0){
105+
Throughput lastTh = throughputs.get(throughputs.size()-1);
106+
if(lastTh.getSlot() == slot) {
107+
lastTh.setCount(lastTh.getCount()+1);
108+
}else{
109+
throughputs.add(new Throughput(slot, 1));
110+
}
107111
}else{
108112
throughputs.add(new Throughput(slot, 1));
109113
}
110-
}else{
111-
throughputs.add(new Throughput(slot, 1));
112114
}
113115

114116
long pubTime = ByteBuffer.wrap(message.getPayload()).getLong();
115-
latencies.add(new Latency(slot, (int)(time-pubTime)));
117+
synchronized (latencies) {
118+
latencies.add(new Latency(slot, (int)(time-pubTime)));
119+
}
116120

117121
Loader.lastRecvTime = time;
118122

0 commit comments

Comments
 (0)