Skip to content

Commit 2d3b8a8

Browse files
committed
task parallel
1 parent d054922 commit 2d3b8a8

15 files changed

+705
-128
lines changed

src/config/ParallelConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public class ParallelConfig {
2929
* Maximal number of tuples per batch during execution.
3030
*/
3131
public static int EXE_THREADS = 1;
32+
/**
33+
* The number of threads per executor.
34+
*/
35+
public static int EXE_EXECUTORS = 30;
3236
/**
3337
* Maximal number of tuples per batch during pre-processing.
3438
*/
@@ -73,6 +77,7 @@ public class ParallelConfig {
7377
*/
7478
public static int PARALLEL_SPEC = 8;
7579

80+
7681
public static final boolean HEURISTIC_SHARING = true;
7782
public static final boolean HEURISTIC_STOP = false;
7883
}

src/console/SkinnerCmd.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ else if (spec == 8) {
425425
output += "APS_" + ParallelConfig.EXE_THREADS + caseName + ".txt";
426426
}
427427
else if (spec == 9) {
428-
output += "PT_" + ParallelConfig.EXE_THREADS + caseName + ".txt";
428+
output += "SPT_" + ParallelConfig.EXE_THREADS + caseName + ".txt";
429429
}
430430
} else {
431431
output += "Seq_1.txt";

src/joining/parallel/join/FixJoin.java

Lines changed: 367 additions & 103 deletions
Large diffs are not rendered by default.

src/joining/parallel/join/JoinDoublePartitionWrapper.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,52 @@ public int nrIndexed(int[] tupleIndices) {
9696
double priorVal = priorDoubleData.data[priorTuple];
9797
return nextDoubleIndex.nrIndexed(priorVal);
9898
}
99+
100+
@Override
101+
public int indexSize(int[] tupleIndices, int[] points) {
102+
int priorTuple = tupleIndices[priorTable];
103+
double priorVal = priorDoubleData.data[priorTuple];
104+
int prevTuple = tupleIndices[nextTable];
105+
if (nextDoubleIndex.unique) {
106+
int onlyRow = nextDoubleIndex.keyToPositions.getOrDefault(priorVal, -1);
107+
if (onlyRow > prevTuple) {
108+
points[0] = onlyRow;
109+
points[1] = onlyRow;
110+
return 1;
111+
}
112+
else {
113+
points[0] = 0;
114+
points[1] = -1;
115+
return 0;
116+
}
117+
}
118+
else {
119+
int firstPos = nextDoubleIndex.keyToPositions.getOrDefault(priorVal, -1);
120+
if (firstPos < 0) {
121+
points[0] = 0;
122+
points[1] = -1;
123+
return 0;
124+
}
125+
// Get number of indexed values
126+
int nrVals = nextDoubleIndex.positions[firstPos];
127+
int[] positions = nextDoubleIndex.positions;
128+
// Can we return first indexed value?
129+
int firstTuple = positions[firstPos + 1];
130+
if (firstTuple > prevTuple) {
131+
points[0] = firstPos + 1;
132+
points[1] = firstPos + nrVals;
133+
return nrVals;
134+
}
135+
int size = nrVals;
136+
for (int i = 2; i <= nrVals; i++) {
137+
if (positions[firstPos + i] > prevTuple) {
138+
points[0] = firstPos + i;
139+
points[1] = firstPos + nrVals;
140+
size = nrVals - i + 1;
141+
break;
142+
}
143+
}
144+
return size;
145+
}
146+
}
99147
}

src/joining/parallel/join/JoinIntPartitionWrapper.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,53 @@ public int nrIndexed(int[] tupleIndices) {
152152
int priorVal = priorIntData.data[priorTuple];
153153
return nextIntIndex.nrIndexed(priorVal);
154154
}
155+
156+
@Override
157+
public int indexSize(int[] tupleIndices, int[] points) {
158+
int priorTuple = tupleIndices[priorTable];
159+
int priorVal = priorIntData.data[priorTuple];
160+
int prevTuple = tupleIndices[nextTable];
161+
int diff = prevTuple == 0 ? 0 : 1;
162+
if (nextIntIndex.unique) {
163+
int onlyRow = nextIntIndex.keyToPositions.getOrDefault(priorVal, -1);
164+
if (onlyRow - prevTuple >= diff) {
165+
points[0] = onlyRow;
166+
points[1] = onlyRow;
167+
return 1;
168+
}
169+
else {
170+
points[0] = 0;
171+
points[1] = -1;
172+
return 0;
173+
}
174+
}
175+
else {
176+
int firstPos = nextIntIndex.keyToPositions.getOrDefault(priorVal, -1);
177+
if (firstPos < 0 || nextIntIndex.positions[firstPos] == 0) {
178+
points[0] = 0;
179+
points[1] = -1;
180+
return 0;
181+
}
182+
// Get number of indexed values
183+
int nrVals = nextIntIndex.positions[firstPos];
184+
int[] positions = nextIntIndex.positions;
185+
// Can we return first indexed value?
186+
int firstTuple = positions[firstPos + 1];
187+
if (firstTuple - prevTuple >= diff) {
188+
points[0] = firstPos + 1;
189+
points[1] = firstPos + nrVals;
190+
return nrVals;
191+
}
192+
int size = nrVals;
193+
for (int i = 2; i <= nrVals; i++) {
194+
if (positions[firstPos + i] - prevTuple >=diff) {
195+
points[0] = firstPos + i;
196+
points[1] = firstPos + nrVals;
197+
size = nrVals - i + 1;
198+
break;
199+
}
200+
}
201+
return size;
202+
}
203+
}
155204
}

src/joining/parallel/join/JoinPartitionIndexWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import query.ColumnRef;
99
import query.QueryInfo;
1010

11+
import java.util.Comparator;
1112
import java.util.Iterator;
13+
import java.util.List;
14+
import java.util.stream.Collectors;
1215

1316
/**
1417
* Uses index on join column to identify next
@@ -181,4 +184,5 @@ public String toString() {
181184
* @return number of indexed values
182185
*/
183186
public abstract int nrIndexed(int[] tupleIndices);
187+
public abstract int indexSize(int[] tupleIndices, int[] points);
184188
}

src/joining/parallel/parallelization/task/ExecutorTask.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import query.QueryInfo;
1515
import statistics.QueryStats;
1616

17+
import java.util.ArrayList;
1718
import java.util.Arrays;
1819
import java.util.List;
1920
import java.util.Set;
@@ -40,12 +41,18 @@ public class ExecutorTask implements Callable<TaskResult> {
4041
* each executor threads by the searching thread.
4142
*/
4243
private int[][] bestJoinOrder;
44+
/**
45+
* Multiple join operators for threads
46+
*/
47+
private final List<FixJoin> fixJoins;
4348

44-
public ExecutorTask(QueryInfo query, FixJoin spJoin, AtomicBoolean finish, int[][] bestJoinOrder) {
49+
public ExecutorTask(QueryInfo query, FixJoin spJoin, AtomicBoolean finish, int[][] bestJoinOrder,
50+
List<FixJoin> fixJoins) {
4551
this.query = query;
4652
this.spJoin = spJoin;
4753
this.finish = finish;
4854
this.bestJoinOrder = bestJoinOrder;
55+
this.fixJoins = fixJoins;
4956
}
5057

5158
@Override
@@ -68,7 +75,7 @@ public TaskResult call() throws Exception {
6875
int lastCount = 0;
6976
int nextPeriod = 1;
7077
double nextNum = 1;
71-
double base = Math.pow(ParallelConfig.C, 1.0 / nrThreads);
78+
double base = Math.pow(ParallelConfig.C, 1.0 / (nrThreads-1));
7279
SPNode root = new SPNode(0, query, true, 1);
7380
while (!finish.get()) {
7481
++roundCtr;
@@ -82,6 +89,9 @@ public TaskResult call() throws Exception {
8289
if (finish.compareAndSet(false, true)) {
8390
System.out.println("Finish id: " + tid + "\t" + Arrays.toString(joinOrder) + "\t" + roundCtr);
8491
spJoin.roundCtr = roundCtr;
92+
for (FixJoin fixJoin: fixJoins) {
93+
fixJoin.terminate.set(true);
94+
}
8595
}
8696
break;
8797
}
@@ -90,13 +100,28 @@ public TaskResult call() throws Exception {
90100
int[] best = new int[nrTables];
91101
root.maxJoinOrder(best, 0);
92102
System.arraycopy(best, 0, bestJoinOrder[nextThread], 0, nrTables);
93-
bestJoinOrder[nextThread][nrTables] = nextThread == nrThreads - 1 ? 2 : 1;
103+
// if (roundCtr >= fixRound) {
104+
// bestJoinOrder[nextThread][nrTables] = 2;
105+
// fixNum++;
106+
// fixRound = Math.pow(fixRound, fixNum + 1);
107+
// }
108+
// else {
109+
// bestJoinOrder[nextThread][nrTables] = 1;
110+
// }
111+
bestJoinOrder[nextThread][nrTables] = 2;
112+
fixJoins.get(nextThread).terminate.set(true);
94113
System.out.println("Assign " + Arrays.toString(best)
95114
+ " to Thread " + nextThread + " at round " + roundCtr);
115+
116+
// if (fixNum < nrThreads - 1) {
117+
// nextThread = (nextThread + 1) % nrThreads;
118+
// while (bestJoinOrder[nextThread][nrTables] == 2 || nextThread == 0) {
119+
// nextThread = (nextThread + 1) % nrThreads;
120+
// }
121+
// }
96122
nextThread = (nextThread + 1) % nrThreads;
97123
if (nextThread == 0) {
98-
nextThread++;
99-
nrThreads--;
124+
nextThread = (nextThread + 1) % nrThreads;
100125
}
101126
lastCount = (int) roundCtr;
102127
nextNum = nextNum * base;
@@ -109,16 +134,6 @@ public TaskResult call() throws Exception {
109134
root = new SPNode(0, query, true, 1);
110135
nextForget *= 10;
111136
}
112-
113-
// if (roundCtr == 100000) {
114-
// List<String>[] logs = new List[1];
115-
// for (int i = 0; i < 1; i++) {
116-
// logs[i] = spJoin.logs;
117-
// }
118-
// LogUtils.writeLogs(logs, "verbose/task/" + QueryStats.queryName);
119-
// System.out.println("Write to logs!");
120-
// System.exit(0);
121-
// }
122137
}
123138
}
124139
else {
@@ -138,6 +153,9 @@ public TaskResult call() throws Exception {
138153
System.out.println("Finish id: " + tid + "\t" +
139154
Arrays.toString(joinOrder) + "\t" + roundCtr);
140155
spJoin.roundCtr = roundCtr;
156+
for (FixJoin fixJoin: fixJoins) {
157+
fixJoin.terminate.set(true);
158+
}
141159
}
142160
break;
143161
}
@@ -149,7 +167,6 @@ public TaskResult call() throws Exception {
149167
else if (order[nrTables] == 2) {
150168
System.arraycopy(order, 0, joinOrder, 0, nrTables);
151169
spJoin.isFixed = true;
152-
System.out.println("Thread " + tid + " uses fixed join order: " + Arrays.toString(order));
153170
}
154171
}
155172
}

0 commit comments

Comments
 (0)