Skip to content

Commit ebfcdea

Browse files
committed
Add example of external sort using BigQueue
1 parent 02a3d44 commit ebfcdea

3 files changed

Lines changed: 364 additions & 0 deletions

File tree

examples/extsort/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
extsort
2+
*.dat
3+
bq/*

examples/extsort/main.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"flag"
6+
"fmt"
7+
"io"
8+
"log"
9+
"math/rand"
10+
"os"
11+
"strconv"
12+
"strings"
13+
"time"
14+
)
15+
16+
const (
17+
cMaxRandomNum = 10000
18+
)
19+
20+
func main() {
21+
var numDataElements, maxMemSortSize int
22+
var inputPath, tempPath, outputPath string
23+
flag.IntVar(&numDataElements, "num", 1000, "# elements to generate & sort")
24+
flag.IntVar(&maxMemSortSize, "nummem", 100, "# elements to sort in mem")
25+
flag.StringVar(&inputPath, "i", "input.dat", "input file (overwritten)")
26+
flag.StringVar(&tempPath, "t", "bq", "path to write intermediate data")
27+
flag.StringVar(&outputPath, "o", "output.dat", "output file (overwritten)")
28+
flag.Parse()
29+
30+
// generate random data for testing
31+
log.Println("generating random dataset")
32+
if err := generateData(numDataElements, cMaxRandomNum, inputPath); err != nil {
33+
panic(err)
34+
}
35+
36+
// sort the data
37+
if err := ExternalSort(inputPath, tempPath, outputPath, maxMemSortSize); err != nil {
38+
panic(err)
39+
}
40+
41+
// validate the sorted data
42+
log.Println("validating sort")
43+
if err := validateSort(outputPath); err != nil {
44+
panic(err)
45+
}
46+
}
47+
48+
// generateData generates random data and writes them to a file with newline delimiter
49+
func generateData(size, maxRandomNum int, dataFilePath string) error {
50+
// open file
51+
fd, err := os.Create(dataFilePath)
52+
if err != nil {
53+
return fmt.Errorf("unable to open file :: %v", err)
54+
}
55+
defer fd.Close()
56+
writer := bufio.NewWriter(fd)
57+
58+
// generate random numbers and write to file
59+
rand.Seed(time.Now().UnixNano())
60+
for i := 0; i < size; i++ {
61+
num := (rand.Int() % (2 * maxRandomNum)) - maxRandomNum
62+
str := strconv.Itoa(num) + "\n"
63+
writer.WriteString(str)
64+
}
65+
66+
return writer.Flush()
67+
}
68+
69+
func validateSort(outputPath string) error {
70+
// open input file
71+
fd, err := os.Open(outputPath)
72+
if err != nil {
73+
return fmt.Errorf("error in opening input file :: %v", err)
74+
}
75+
defer fd.Close()
76+
reader := bufio.NewReader(fd)
77+
78+
prevNum := 0
79+
firstIter := true
80+
for {
81+
// each line contains 1 element in the file
82+
str, err := reader.ReadString('\n')
83+
if err == io.EOF {
84+
break
85+
} else if err != nil {
86+
return fmt.Errorf("error in reading input file :: %v", err)
87+
}
88+
89+
// convert the element into integer
90+
str = strings.TrimSpace(str)
91+
num, err := strconv.Atoi(str)
92+
if err != nil {
93+
return fmt.Errorf("error in converting {%s} :: %v", str, err)
94+
}
95+
96+
// check the order of prev and cur element
97+
if firstIter {
98+
firstIter = false
99+
} else if prevNum > num {
100+
return fmt.Errorf("incorrect sort")
101+
}
102+
103+
prevNum = num
104+
}
105+
106+
return nil
107+
}

examples/extsort/sort.go

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"log"
9+
"os"
10+
"path"
11+
"sort"
12+
"strconv"
13+
"strings"
14+
15+
"github.com/grandecola/bigqueue"
16+
)
17+
18+
// TODO:
19+
// - release resources for bigqueue
20+
// - ensure that we do not cross the available memory
21+
// - use fewer queues
22+
23+
var (
24+
bqFileIndexCount = 0
25+
)
26+
27+
// ExternalSort perform external sort https://en.wikipedia.org/wiki/External_sorting
28+
// The inputPath should be a path to a file containing integers in each line
29+
// The outputPath is similarly formatted file with sorted integers
30+
// The tempPath is used to write intermediate files
31+
// maxMemSortSize is number of elements that can be sorted directly in memory
32+
func ExternalSort(inputPath, tempPath, outputPath string, maxMemSortSize int) error {
33+
files, err := ioutil.ReadDir(tempPath)
34+
if err != nil {
35+
return fmt.Errorf("unable to read temp directory :: %v", err)
36+
}
37+
if len(files) != 0 {
38+
return fmt.Errorf("non-empty temp directory")
39+
}
40+
41+
log.Println("starting divide step")
42+
iqs, err := divide(inputPath, tempPath, maxMemSortSize)
43+
if err != nil {
44+
return fmt.Errorf("error in divide step :: %v", err)
45+
}
46+
47+
log.Println("starting merge step")
48+
oq, err := merge(tempPath, iqs)
49+
if err != nil {
50+
return fmt.Errorf("error in merge step :: %v", err)
51+
}
52+
53+
if err := writeToFile(oq, outputPath); err != nil {
54+
return fmt.Errorf("error in writing output to file :: %v", err)
55+
}
56+
57+
return nil
58+
}
59+
60+
// divide step divides all the input data into sorted group of elements.
61+
// Each group is persisted to disk using bigqueue interface.
62+
func divide(inputPath, tempPath string, maxMemSortSize int) ([]*bigqueue.BigQueue, error) {
63+
log.Println("reading input file")
64+
queues := make([]*bigqueue.BigQueue, 0)
65+
66+
// open input file
67+
fd, err := os.Open(inputPath)
68+
if err != nil {
69+
return nil, fmt.Errorf("error in opening input file :: %v", err)
70+
}
71+
defer fd.Close()
72+
reader := bufio.NewReader(fd)
73+
74+
// read all the data from input file and divide it in multiple queues
75+
// such that each queue has data sorted and has maximum size of maxMemSortSize
76+
elemCount := 0
77+
data := make([]int, 0, maxMemSortSize)
78+
for {
79+
// each line contains 1 element in the file
80+
str, err := reader.ReadString('\n')
81+
if err == io.EOF {
82+
break
83+
} else if err != nil {
84+
return nil, fmt.Errorf("error in reading input file :: %v", err)
85+
}
86+
87+
// convert the element into integer
88+
str = strings.TrimSpace(str)
89+
num, err := strconv.Atoi(str)
90+
if err != nil {
91+
return nil, fmt.Errorf("error in converting {%s} :: %v", str, err)
92+
}
93+
elemCount++
94+
data = append(data, num)
95+
96+
// check whether we have enough element to perform in memory sort
97+
if elemCount < maxMemSortSize {
98+
continue
99+
}
100+
101+
// if yes, add the sorted elements into the queue
102+
sort.Ints(data)
103+
bq, err := buildBigQueue(tempPath, data)
104+
if err != nil {
105+
return nil, fmt.Errorf("error in building bigqueue :: %v", err)
106+
}
107+
108+
// add the queue in the list and truncate the slice that holds data in memory
109+
queues = append(queues, bq)
110+
elemCount = 0
111+
data = data[:0]
112+
}
113+
114+
// write the final list of elements to bigqueue
115+
if len(data) != 0 {
116+
sort.Ints(data)
117+
bq, err := buildBigQueue(tempPath, data)
118+
if err != nil {
119+
return nil, fmt.Errorf("error in building bigqueue :: %v", err)
120+
}
121+
122+
queues = append(queues, bq)
123+
}
124+
125+
return queues, nil
126+
}
127+
128+
// merge step merges the sorted group of elements stored in bigqueue using bigqueue
129+
func merge(tempPath string, queues []*bigqueue.BigQueue) (*bigqueue.BigQueue, error) {
130+
currentQueues := queues
131+
nextQueues := make([]*bigqueue.BigQueue, 0)
132+
for iteration := 0; len(currentQueues) != 1; iteration++ {
133+
log.Printf("iteration %d, # queues %d\n", iteration, len(currentQueues))
134+
135+
for i := 0; i < len(currentQueues); i += 2 {
136+
// if only one queue is left, just add this queue
137+
q1 := currentQueues[i]
138+
if i+1 >= len(currentQueues) {
139+
nextQueues = append(nextQueues, q1)
140+
continue
141+
}
142+
143+
// otherwise, merge the two queues
144+
q2 := currentQueues[i+1]
145+
mq, err := mergeQueues(q1, q2, tempPath)
146+
if err != nil {
147+
return nil, fmt.Errorf("error in merging two queues :: %v", err)
148+
}
149+
150+
nextQueues = append(nextQueues, mq)
151+
}
152+
153+
currentQueues = nextQueues
154+
nextQueues = make([]*bigqueue.BigQueue, 0)
155+
}
156+
157+
return currentQueues[0], nil
158+
}
159+
160+
func buildBigQueue(tempPath string, data []int) (*bigqueue.BigQueue, error) {
161+
bq, err := bigqueue.NewBigQueue(getTempDir(tempPath))
162+
if err != nil {
163+
return nil, fmt.Errorf("unable to init bigqueue :: %v", err)
164+
}
165+
166+
// write all the data to bigqueue
167+
for _, e := range data {
168+
if err := bq.Enqueue([]byte(strconv.Itoa(e))); err != nil {
169+
return nil, fmt.Errorf("unable to write to bigqueue :: %v", err)
170+
}
171+
}
172+
173+
return bq, nil
174+
}
175+
176+
func getTempDir(tempPath string) string {
177+
queueDir := "q" + strconv.Itoa(bqFileIndexCount)
178+
bqFileIndexCount++
179+
180+
queuePath := path.Join(tempPath, queueDir)
181+
if err := os.MkdirAll(queuePath, 0700); err != nil {
182+
panic(err)
183+
}
184+
185+
return queuePath
186+
}
187+
188+
func mergeQueues(q1, q2 *bigqueue.BigQueue, tempPath string) (*bigqueue.BigQueue, error) {
189+
mq, err := bigqueue.NewBigQueue(getTempDir(tempPath))
190+
if err != nil {
191+
return nil, fmt.Errorf("unable to create bigqueue :: %v", err)
192+
}
193+
194+
for !q1.IsEmpty() && !q2.IsEmpty() {
195+
e1, err1 := q1.Peek()
196+
e2, err2 := q2.Peek()
197+
if err1 != nil || err2 != nil {
198+
return nil, fmt.Errorf("unable to dequeue :: %v || %v", err1, err2)
199+
}
200+
201+
num1, err1 := strconv.Atoi(string(e1))
202+
num2, err2 := strconv.Atoi(string(e2))
203+
if err1 != nil || err2 != nil {
204+
return nil, fmt.Errorf("error in conversion :: %v || %v", err1, err2)
205+
}
206+
207+
if num1 < num2 {
208+
q1.Dequeue()
209+
mq.Enqueue([]byte(strconv.Itoa(num1)))
210+
} else {
211+
q2.Dequeue()
212+
mq.Enqueue([]byte(strconv.Itoa(num2)))
213+
}
214+
}
215+
216+
// add elements from the non-empty queue
217+
var lq *bigqueue.BigQueue
218+
if q1.IsEmpty() {
219+
lq = q1
220+
} else {
221+
lq = q2
222+
}
223+
for !lq.IsEmpty() {
224+
e1, err := lq.Dequeue()
225+
if err != nil {
226+
return nil, fmt.Errorf("unable to dequeue :: %v", err)
227+
}
228+
229+
mq.Enqueue(e1)
230+
}
231+
232+
return mq, nil
233+
}
234+
235+
func writeToFile(oq *bigqueue.BigQueue, outputPath string) error {
236+
// write the final output to file
237+
od, err := os.Create(outputPath)
238+
if err != nil {
239+
return fmt.Errorf("error in opening input file :: %v", err)
240+
}
241+
defer od.Close()
242+
243+
w := bufio.NewWriter(od)
244+
for !oq.IsEmpty() {
245+
v, err := oq.Dequeue()
246+
if err != nil {
247+
return fmt.Errorf("unable to dequeue from bigqueue :: %v", err)
248+
}
249+
250+
w.WriteString(string(v) + "\n")
251+
}
252+
253+
return w.Flush()
254+
}

0 commit comments

Comments
 (0)