Skip to content

Commit e5aaef6

Browse files
committed
check point
1 parent a5fe327 commit e5aaef6

38 files changed

+629
-293
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,5 +118,10 @@
118118
<artifactId>affinity</artifactId>
119119
<version>3.2.2</version>
120120
</dependency>
121+
<dependency>
122+
<groupId>org.slf4j</groupId>
123+
<artifactId>slf4j-simple</artifactId>
124+
<version>2.0.0-alpha1</version>
125+
</dependency>
121126
</dependencies>
122127
</project>

src/config/ParallelConfig.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class ParallelConfig {
2424
/**
2525
* Whether to collect statistics of all constraints.
2626
*/
27-
public final static boolean CONSTRAINTS = true;
27+
public final static boolean CONSTRAINTS = false;
2828
/**
2929
* Maximal number of tuples per batch during execution.
3030
*/
@@ -36,15 +36,23 @@ public class ParallelConfig {
3636
/**
3737
* The minimal size of sparse columns.
3838
*/
39-
public static int SPARSE_KEY_SIZE = 10000;
39+
public final static int SPARSE_KEY_SIZE = 10000;
4040
/**
4141
* The minimal size of sparse columns.
4242
*/
43-
public static int SPARSE_FILTER_SIZE = 100;
43+
public final static int SPARSE_FILTER_SIZE = 100;
4444
/**
4545
* The minimum size of partitioned table
4646
*/
47-
public static int PARTITION_SIZE = 50000;
47+
public final static int PARTITION_SIZE = 50000;
48+
/**
49+
* The maximum size of statistics
50+
*/
51+
public final static int STATISTICS_SIZE = 20;
52+
/**
53+
* Whether to assign constraint per thread.
54+
*/
55+
public final static boolean CONSTRAINT_PER_THREAD = true;
4856
/**
4957
* The base of round counts to assign a new best join order
5058
* to executor thread.

src/indexing/Index.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ public abstract class Index {
3838
* Whether it is unique key.
3939
*/
4040
public boolean sorted = true;
41-
/**
42-
* Whether it is unique key.
43-
*/
44-
public boolean filterSorted = true;
4541
/**
4642
* Initialize for given cardinality of indexed table.
4743
*

src/joining/parallel/indexing/IntPartitionIndex.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ else if (policy == IndexPolicy.Sparse) {
175175
}
176176
});
177177
}
178+
int nrPos = positions == null ? 0 : positions.length;
178179
}
179180

180181
/**

src/joining/parallel/join/DPJoin.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public abstract class DPJoin {
8686
* Last large table.
8787
*/
8888
public int largeTable;
89+
/**
90+
* Last large table.
91+
*/
92+
public int deepIndex;
8993
/**
9094
* Whether we have progress on the split table.
9195
*/

src/joining/parallel/join/JoinDoublePartitionWrapper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public JoinDoublePartitionWrapper(ExpressionInfo equiPred, int[] order) {
2727
nextDoubleIndex = (DoublePartitionIndex)nextIndex;
2828
}
2929

30+
@Override
31+
public void reset(int[] tupleIndices) {
32+
33+
}
34+
3035
@Override
3136
public int nextIndex(int[] tupleIndices, int[] nextSize) {
3237
int priorTuple = tupleIndices[priorTable];
@@ -35,6 +40,16 @@ public int nextIndex(int[] tupleIndices, int[] nextSize) {
3540
return nextDoubleIndex.nextTuple(priorVal, curTuple, nextTable, nextSize);
3641
}
3742

43+
@Override
44+
public int nextIndexFromLast(int[] tupleIndices, int[] nextSize) {
45+
return 0;
46+
}
47+
48+
@Override
49+
public int nextIndexFromLast(int[] tupleIndices, int[] nextSize, int tid) {
50+
return 0;
51+
}
52+
3853
@Override
3954
public int nextIndexInScope(int[] tupleIndices, int tid, int[] nextSize) {
4055
int priorTuple = tupleIndices[priorTable];

src/joining/parallel/join/JoinIntPartitionWrapper.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,19 @@ public JoinIntPartitionWrapper(ExpressionInfo equiPred, int[] order) throws Exce
2727
nextIntIndex = (IntPartitionIndex)nextIndex;
2828
}
2929

30+
@Override
31+
public void reset(int[] tupleIndices) {
32+
int priorTuple = tupleIndices[priorTable];
33+
int priorVal = priorIntData.data[priorTuple];
34+
int prevTuple = tupleIndices[nextTable];
35+
if (!(lastValue == priorVal &&
36+
lastPositionsStart >= 0 &&
37+
nextIntIndex.positions[lastPositionsStart] <= prevTuple)) {
38+
lastPositionsStart = -1;
39+
lastPositionsEnd = -1;
40+
}
41+
}
42+
3043
@Override
3144
public int nextIndex(int[] tupleIndices, int[] nextSize) {
3245
int priorTuple = tupleIndices[priorTable];
@@ -35,6 +48,64 @@ public int nextIndex(int[] tupleIndices, int[] nextSize) {
3548
return nextIntIndex.nextTuple(priorVal, curTuple, nextTable, nextSize);
3649
}
3750

51+
@Override
52+
public int nextIndexFromLast(int[] tupleIndices, int[] nextSize) {
53+
return 0;
54+
}
55+
56+
@Override
57+
public int nextIndexFromLast(int[] tupleIndices, int[] nextSize, int tid) {
58+
int priorTuple = tupleIndices[priorTable];
59+
int priorVal = priorIntData.data[priorTuple];
60+
int prevTuple = tupleIndices[nextTable];
61+
int cardinality = nextIntIndex.cardinality;
62+
if (nextIntIndex.unique) {
63+
int onlyRow = nextIntIndex.keyToPositions.getOrDefault(priorVal, cardinality);
64+
return onlyRow > prevTuple ? onlyRow : cardinality;
65+
}
66+
else {
67+
int[] positions = nextIntIndex.positions;
68+
int nextIndex = -1;
69+
if (lastPositionsStart >= 0) {
70+
for (int i = lastPositionsStart + 1; i <= lastPositionsEnd; i++) {
71+
nextIndex = positions[i];
72+
if (nextIndex > prevTuple) {
73+
lastPositionsStart = i;
74+
return nextIndex;
75+
}
76+
}
77+
}
78+
else {
79+
int firstPos = lastValue == priorVal ?
80+
lastFirst : nextIntIndex.keyToPositions.getOrDefault(priorVal, -1);
81+
lastValue = priorVal;
82+
lastFirst = firstPos;
83+
if (firstPos < 0) {
84+
return cardinality;
85+
}
86+
// Get number of indexed values
87+
int nrVals = nextIntIndex.positions[firstPos];
88+
// Can we return first indexed value?
89+
int firstTuple = nextIntIndex.positions[firstPos + 1];
90+
if (firstTuple > prevTuple) {
91+
lastPositionsStart = firstPos + 1;
92+
lastPositionsEnd = firstPos + nrVals;
93+
return firstTuple;
94+
}
95+
for (int i = 2; i <= nrVals; i++) {
96+
nextIndex = positions[firstPos + i];
97+
if (nextIndex > prevTuple) {
98+
lastPositionsStart = firstPos + i;
99+
lastPositionsEnd = firstPos + nrVals;
100+
return nextIndex;
101+
}
102+
}
103+
}
104+
lastPositionsStart = -1;
105+
return cardinality;
106+
}
107+
}
108+
38109
@Override
39110
public int nextIndexInScope(int[] tupleIndices, int tid, int[] nextSize) {
40111
int priorTuple = tupleIndices[priorTable];

src/joining/parallel/join/JoinPartitionIndexWrapper.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,34 @@ public abstract class JoinPartitionIndexWrapper {
3232
* Reference to prior column data.
3333
*/
3434
final ColumnData priorData;
35+
/**
36+
* Reference to prior column data.
37+
*/
38+
final IntData priorTest;
39+
/**
40+
* Reference to prior column data.
41+
*/
42+
final IntData nextTest;
3543
/**
3644
* Index on join column to use.
3745
*/
3846
final Index nextIndex;
47+
/**
48+
* last prior value.
49+
*/
50+
int lastValue = -1;
51+
/**
52+
* Last first position according to the last value.
53+
*/
54+
int lastFirst = -1;
55+
/**
56+
* last start index of positions array.
57+
*/
58+
int lastPositionsStart = -1;
59+
/**
60+
* last end index of positions array.
61+
*/
62+
int lastPositionsEnd = -1;
3963
/**
4064
* Initialize index wrapper for
4165
* given query and join order.
@@ -55,10 +79,17 @@ public JoinPartitionIndexWrapper(ExpressionInfo equiPred, int[] order) {
5579
nextTable = pos1<pos2?table2:table1;
5680
// Get column data reference for prior table
5781
priorData = equiPred.dataMentioned.get(priorTable);
82+
priorTest = (IntData) priorData;
83+
nextTest = (IntData) equiPred.dataMentioned.get(nextTable);
5884
// Get index for next table
5985
nextIndex = equiPred.indexMentioned.get(nextTable);
6086
}
6187

88+
/**
89+
* Reset temporary variables at the beginning of the join episode.
90+
*/
91+
public abstract void reset(int[] tupleIndices);
92+
6293
/**
6394
* Extracts index of table in query column reference.
6495
*
@@ -97,6 +128,9 @@ int tablePos(int[] order, int table) {
97128
*/
98129
public abstract int nextIndex(int[] tupleIndices, int[] nextSize);
99130

131+
public abstract int nextIndexFromLast(int[] tupleIndices, int[] nextSize);
132+
public int nextIndexFromLast(int[] tupleIndices, int[] nextSize, int tid) {return 0;}
133+
100134
/**
101135
* Propose next index in next table that
102136
* satisfies equi-join condition in partitions with
@@ -108,7 +142,9 @@ int tablePos(int[] order, int table) {
108142
* @return next interesting tuple index or cardinality
109143
*/
110144
public abstract int nextIndexInScope(int[] tupleIndices, int tid, int[] nextSize);
145+
111146
public abstract int nextIndexInScope(int[] tupleIndices, int tid, int[] nextSize, IntSet finishedThreads);
147+
112148
/**
113149
* Propose next index in next table that
114150
* satisfies equi-join condition with
@@ -129,6 +165,7 @@ int tablePos(int[] order, int table) {
129165
* @return next interesting tuple index or cardinality
130166
*/
131167
public abstract boolean evaluateInScope(int[] tupleIndices, int tid);
168+
132169
public abstract boolean evaluateInScope(int[] tupleIndices, int tid, IntSet finishedThreads);
133170

134171
@Override

src/joining/parallel/join/ModJoin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,12 @@ public double execute(int[] order, int splitTable, int roundCtr) throws Exceptio
151151
// Lookup or generate left-deep query plan
152152
JoinOrder joinOrder = new JoinOrder(order);
153153
int joinHash = joinOrder.splitHashCode(-1);
154+
// long timer1 = System.currentTimeMillis();
154155
LeftDeepPartitionPlan plan = planCache.get(joinHash);
155156
if (plan == null) {
156157
plan = new LeftDeepPartitionPlan(query, predToEval, joinOrder);
157158
planCache.putIfAbsent(joinHash, plan);
158159
}
159-
// long timer1 = System.currentTimeMillis();
160160
int splitHash = nrThreads == 1 ? 0 : plan.splitStrategies[splitTable];
161161
// Execute from ing state, save progress, return progress
162162

@@ -661,16 +661,17 @@ private void executeWithBudget(LeftDeepPartitionPlan plan, int splitTable, State
661661
// Number of completed tuples added
662662
nrResultTuples = 0;
663663
Arrays.fill(this.nrVisits, 0);
664+
deepIndex = -1;
664665
// Execute join order until budget depleted or all input finished -
665666
// at each iteration start, tuple indices contain next tuple
666667
// combination to look at.
667668
while (remainingBudget > 0 && joinIndex >= 0) {
668-
669669
// ++statsInstance.nrIterations;
670670
//log("Offsets:\t" + Arrays.toString(offsets));
671671
//log("Indices:\t" + Arrays.toString(tupleIndices));
672672
// Get next table in join order
673673
int nextTable = plan.joinOrder.order[joinIndex];
674+
deepIndex = Math.max(deepIndex, joinIndex);
674675
// writeLog("Indices: " + Arrays.toString(tupleIndices));
675676
// Integrate table offset
676677
tupleIndices[nextTable] = Math.max(

0 commit comments

Comments
 (0)