-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExternalSort.java
More file actions
282 lines (243 loc) · 10.5 KB
/
ExternalSort.java
File metadata and controls
282 lines (243 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/**
* This operator will perform external sorting
**/
package qp.operators;
import qp.utils.*;
import java.io.*;
import java.util.*;
/**
* Performs the multi-way merge sort.
* The number of Batches read/written is equal to 2N*(1 + log(N/B)/log(B-1)).
*/
public class ExternalSort extends Operator {
private Operator base; // base operator
private final List<OrderByClause> sort_cond; // list of columns to sort by and whether it's asc or desc.
private final int buffer_size; // how many buffer pages for sorting
private TupleWriter writer; // the writer to write out our output pages
private ObjectInputStream reader; // used for intermediate passes and for returning the result
private ArrayDeque<String> run_loc; // temporary files for writing out intermediate sorted runs
private Comparator<Tuple> comp; // used for sorting runs
private final int tuples_per_page; // number of tuples in a page
private static int sorter_count = 0; // index number generator
private final int sorter_index; // index number for this sorter
/**
* Initialises operator for multi-way merge-sort.
*
* @param base The underlying source of tuples
* @param sort_cond The list of which attributes to sort by, and whether it's asc or desc
* @param buffer_size The number of buffer pages allocated for sorting. Must be at least 3.
* @throws IllegalArgumentException if number of buffer pages is less than 3
*/
public ExternalSort(Operator base, List<OrderByClause> sort_cond, int buffer_size) throws IllegalArgumentException {
super(OpType.EXTERNAL_SORT);
// check if sufficient buffer pages are allocated
if (buffer_size < 3) throw new IllegalArgumentException("External sort needs at least 3 buffer pages!");
this.buffer_size = buffer_size;
this.base = base;
this.sort_cond = sort_cond;
this.run_loc = new ArrayDeque<>();
this.tuples_per_page = Batch.getPageSize() / base.getSchema().getTupleSize();
this.sorter_index = sorter_count++;
}
/**
* Performs external sort so that each subsequent call to next() will
* return sorted Batches
*
* @return boolean indicating success or failure of opening the base operator
*/
@Override
public boolean open() {
// try opening the underlying operator
if (!base.open()) return false;
// phase 1 - generating sorted runs
create_sorted_runs();
// phase 2 - merging sorted runs
merge_sorted_runs();
return true;
}
public Operator getBase() {
return base;
}
public void setBase(Operator base) {
this.base = base;
this.setSchema(base.getSchema());
}
/**
* Extracts batches after sorting has finished.
*
* @return the batch of tuples read in by the TupleReader
*/
@Override
public Batch next() {
// use an ObjectInputStream to read in the final sorted run
if (reader == null) {
// initialise the stream
String fileName = run_loc.peek() + ".tbl";
try {
reader = new ObjectInputStream(new FileInputStream(fileName));
} catch (IOException io) {
System.out.printf("%s:reading the temporary file error", fileName);
System.exit(1);
}
}
Batch result = null;
try {
result = (Batch) reader.readObject();
} catch (EOFException e) {
// No more batch in the file
return null;
} catch (ClassNotFoundException c) {
System.out.printf("%s:Some error in deserialization\n", run_loc.peek() + ".tbl");
System.exit(1);
} catch (IOException io) {
System.out.printf("%s:temporary file reading error\n", run_loc.peek() + ".tbl");
System.exit(1);
}
return result;
}
/**
* Closes the TupleReader and TupleWriter.
*
* @return boolean indicating success or failure of de-allocating resource handles
*/
@Override
public boolean close() {
// close the tuple reader
if (reader != null) {
try {
reader.close();
reader = null; // for garbage collector
} catch (IOException e) {
System.out.println("Failed to close the input stream in ExternalSort");
}
}
// close the tuple writer
if (writer != null) {
writer.close();
writer = null; // for garbage collector
}
// delete the file storing the completely sorted run
if (run_loc != null) {
try {
File f = new File(run_loc.poll() + ".tbl");
if (!f.delete()) System.out.println("Could not delete the last sorted run: " + f.getPath());
run_loc = null;
} catch (Exception e) {
e.printStackTrace();
}
}
return base.close();
}
/**
* Read in B pages each time, sort the records, and produce a B page sorted
* run (except possibly for the last run). Number of sorted runs = Math.ceil(N / B)
*/
private void create_sorted_runs() {
// construct the comparator for tuple-sorting
comp = (x, y) -> {
for (OrderByClause clause : sort_cond) {
// figure out which index to compare x and y on
int index = base.getSchema().indexOf(clause.getAttr());
// if they compare differently, return immediately
int comp = Tuple.compareTuples(x, y, index);
if (comp != 0) {
if (clause.getDirection() == OrderByClause.DESC) comp *= -1;
return comp;
}
}
return 0;
};
// read in B pages each time
for (Batch base_page = base.next(); base_page != null && !base_page.isEmpty(); base_page = base.next()) {
// create the buffer
ArrayList<Batch> buffer_pages = new ArrayList<>();
// add the first batch to the buffer
buffer_pages.add(base_page);
// fill the buffer to the brim
while (buffer_pages.size() < buffer_size) {
if ((base_page = base.next()) == null) break;
buffer_pages.add(base_page);
}
// unpack all the batches
ArrayList<Tuple> tuples = new ArrayList<>();
for (Batch b : buffer_pages) {
for (int i = 0; i < b.size(); ++i) {
// Debug.PPrint(b.get(i));
tuples.add(b.get(i));
}
}
// sort all tuples
tuples.sort(comp);
// create a temp file name for this sorted run
String temp_file_name = "External_Sort_" + sorter_index + "_" + run_loc.size();
run_loc.offer(temp_file_name); // External_Sort_0_0, External_Sort_0_1 etc without the .tbl extension
// use TupleWriter to flush tuples to the temp file
writer = new TupleWriter(temp_file_name + ".tbl", tuples_per_page);
writer.open();
for (Tuple t : tuples) writer.next(t);
writer.close();
}
}
/**
* Use B-1 buffer pages for input and one buffer page for output
* • Perform (B-1)-way merge iteratively until one sorted run is produced
* • Each iteration requires 1 pass (read + write) over the file
* • Requires log(N / B)/log(B-1) passes
*/
private void merge_sorted_runs() {
// used for indexing merged runs
int run_index = run_loc.size(); // continue from the previous index
// while you have at least 2 runs to merge, do a pass
while (run_loc.size() > 1) {
// check how many runs you need to merge in this pass
int runs_to_merge = run_loc.size();
// while you have at least 2 sorted runs
while (runs_to_merge > 1) {
// figure out how many runs you are putting into the buffer
int num_runs = Math.min(runs_to_merge, buffer_size - 1);
// use a min-heap for k-way merge sort, and use a TupleReader to extract tuples
PriorityQueue<TupleReader> min_heap = new PriorityQueue<>((x, y) -> comp.compare(x.peek(), y.peek()));
for (int i = 0; i < num_runs; ++i) {
var reader = new TupleReader(run_loc.poll() + ".tbl", tuples_per_page);
reader.open(); // must open for the comparator to use
min_heap.offer(reader);
}
// create a TupleWriter for your merged run
String temp_file_name = "External_Sort_" + sorter_index + "_" + run_index++;
// System.out.println("External sort line 200: " + temp_file_name);
run_loc.offer(temp_file_name); // External_Sort_0_10, External_Sort_0_11 without the .tbl extension
writer = new TupleWriter(temp_file_name + ".tbl", tuples_per_page);
writer.open(); // create the outstream
// flush tuples into the TupleWriter
while (!min_heap.isEmpty()) {
TupleReader next = min_heap.poll();
Tuple next_smallest_tuple = next.next();
writer.next(next_smallest_tuple);
// re-enqueue the reader if it still has tuples
if (next.peek() != null) min_heap.offer(next);
else {
// delete the file since you are not using it anymore
File f = new File(next.getFileName());
f.delete();
}
}
// update the number of runs you have left for this pass
runs_to_merge -= num_runs;
// flush this writer to file
writer.close();
}
}
}
@Override
public Object clone() {
// clone the base
Operator base_copy = (Operator) base.clone();
// clone the sort conditions
List<OrderByClause> sort_cond_copy = new ArrayList<>(sort_cond);
// clone this operator
ExternalSort this_clone = new ExternalSort(base_copy, sort_cond_copy, buffer_size);
// set the schema
this_clone.setSchema((Schema) base_copy.getSchema().clone());
return this_clone;
}
}