-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHSMaster.java
200 lines (167 loc) · 5.71 KB
/
HSMaster.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/*
* Team:
* Chandani Madnani (cxm152930)
* Jay Sharma (jms140730)
* Maitri Hitesh Jatakia (mhj150030)
*/
public class HSMaster {
private int pIdMaster, totalProcesses;
boolean isCompleted = false;
// All the processes write Message to this blocking queue to indicate that
// they are ready for the next round
private BlockingQueue<Message> masterBlockingQueue;
// Master writes to this Blocking Queue collection with the start of every
// round
private ArrayList<BlockingQueue<Message>> queueList = new ArrayList<BlockingQueue<Message>>();
private ArrayList<BlockingQueue<Message>> roundQueueList = new ArrayList<BlockingQueue<Message>>();
public HSMaster(int pIdMaster, ArrayList<Integer> processIds) {
this.pIdMaster = pIdMaster;
totalProcesses = processIds.size();
masterBlockingQueue = new ArrayBlockingQueue<>(totalProcesses);
Message readyMsg;
for (int i = 0; i < totalProcesses; i++) {
readyMsg = new Message(processIds.get(i), Message.Type.READY, Integer.MIN_VALUE, 'X');
masterBlockingQueue.add(readyMsg);
queueList.add(new ArrayBlockingQueue<>(2));
roundQueueList.add(new ArrayBlockingQueue<>(2));
}
}
// Getters
public BlockingQueue<Message> getMasterBlockingQueue() {
return masterBlockingQueue;
}
public ArrayList<BlockingQueue<Message>> getQueueList() {
return queueList;
}
public ArrayList<BlockingQueue<Message>> getRoundQueueList() {
return roundQueueList;
}
public boolean isCompleted() {
return isCompleted;
}
/**
* Validates if a new round should be started
*
* @return true if message type is 'Ready' else false
*/
public boolean shouldStartNewRound() {
int count = 0;
// If size of the queue is less than total Processes simply return false
if (masterBlockingQueue.size() < totalProcesses) {
return false;
}
Message message;
for (int i = 0; i < totalProcesses; i++) {
try {
message = masterBlockingQueue.take();
// If we already know the leader than simply mark the flag as
// completed and return false
if (message.getType() == Message.Type.LEADER) {
if (++count == totalProcesses) {
isCompleted = true;
return false;
}
}
// If message type is neither Ready nor Leader than return false
if (message.getType() != Message.Type.READY && message.getType() != Message.Type.LEADER) {
return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
/**
* Starts the next round by adding all the processes to the message queue
*/
public void beginNextRound() {
Iterator<BlockingQueue<Message>> queueIterator = roundQueueList.iterator();
while (queueIterator.hasNext()) {
BlockingQueue<Message> messageQueue = queueIterator.next();
messageQueue.add(new Message(pIdMaster, Message.Type.NEXT, Integer.MIN_VALUE, 'X'));
}
}
public static void main(String[] args) {
int totalProcesses = 0;
int masterPId = 0;
ArrayList<Integer> processIds = new ArrayList<Integer>();
Scanner sc = null;
try {
sc = new Scanner(new File("inputData.txt"));
totalProcesses = Integer.parseInt(sc.nextLine());
// Read all the processes and split it by whitespace
String processes[] = sc.nextLine().split("\\s+");
if (processes.length != totalProcesses) {
System.err.println("Please specify " + totalProcesses + " process ids");
return;
}
for (int i = 0; i < processes.length; i++) {
processIds.add(Integer.parseInt(processes[i]));
}
} catch (FileNotFoundException e) {
System.err.println(
"We were not able to find your file. Please check if the file exists in the project folder");
return;
} finally {
sc.close();
}
// Create master controller and slave processes
HSMaster master = new HSMaster(masterPId, processIds);
HSSlaveThread[] processes = new HSSlaveThread[totalProcesses];
for (int i = 0; i < totalProcesses; i++) {
processes[i] = new HSSlaveThread(processIds.get(i));
}
for (int i = 0; i < totalProcesses; i++) {
// Add a incoming queue to the process
processes[i].setInputQueue(master.getQueueList().get(i));
if (i == 0) {
processes[i].setPredecessor(processes[(totalProcesses - 1) % totalProcesses]);
} else {
processes[i].setPredecessor(processes[(i - 1) % totalProcesses]);
}
processes[i].setSuccessor(processes[(i + 1) % totalProcesses]);
processes[i].getOutputMessageList().clear();
Message msg1 = new Message(processIds.get(i), Message.Type.OUT, 1, 'L');
Message msg2 = new Message(processIds.get(i), Message.Type.OUT, 1, 'R');
processes[i].getOutputMessageList().add(msg1);
processes[i].getOutputMessageList().add(msg2);
processes[i].setRoundQueue(master.getRoundQueueList().get(i));
}
// Start all threads
Thread[] slaveThreads = new Thread[totalProcesses];
for (int i = 0; i < totalProcesses; i++) {
processes[i].setMasterQueue(master.getMasterBlockingQueue());
slaveThreads[i] = new Thread(processes[i]);
slaveThreads[i].start();
}
// Validate if HS algorithm is completed, if not then start the next
// round
while (!master.isCompleted()) {
if (master.shouldStartNewRound()) {
master.beginNextRound();
}
}
for (int i = 0; i < totalProcesses; i++) {
slaveThreads[i].interrupt();
}
// Wait for child threads to get completed
for (int i = 0; i < slaveThreads.length; i++) {
try {
slaveThreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("HS Algorithm finished");
}
}