Skip to content

Commit df64916

Browse files
committed
Merge pull request #10 from pakozm/devel
Added heap and improved efficiency of merge in reduce step
2 parents e11e662 + f780a4e commit df64916

File tree

4 files changed

+153
-56
lines changed

4 files changed

+153
-56
lines changed

.travis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ before_install:
66
- sudo apt-get install -y screen lua5.2 liblua5.2-dev libboost-filesystem-dev libboost-filesystem-dev libboost-thread-dev libssl-dev
77
- git clone https://github.com/mongodb/mongo-cxx-driver.git
88
- cd mongo-cxx-driver && sudo scons --prefix=/usr install-mongoclient && cd -
9-
- wget https://github.com/moai/luamongo/archive/v0.4.1.tar.gz
10-
- tar zxvf v0.4.1.tar.gz
11-
- cd luamongo-0.4.1 && make && sudo mkdir -p /usr/lib/lua/5.2 && sudo cp mongo.so /usr/lib/lua/5.2 && cd -
9+
- wget https://github.com/moai/luamongo/archive/v0.4.2.tar.gz
10+
- tar zxvf v0.4.2.tar.gz
11+
- cd luamongo-0.4.2 && make && sudo mkdir -p /usr/lib/lua/5.2 && sudo cp mongo.so /usr/lib/lua/5.2 && cd -
1212
- ssh-keygen -b 2048 -f /home/travis/.ssh/id_rsa -t rsa -q -N ""
1313
- ssh-keyscan -t rsa localhost > ~/.ssh/known_hosts
1414
- ssh-keyscan -t rsa $(hostname) >> ~/.ssh/known_hosts

mapreduce/heap.lua

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
--[[
2+
This file is part of Lua-MapReduce
3+
4+
Copyright 2014, Francisco Zamora-Martinez
5+
6+
The Lua-MapReduce toolkit is free software; you can redistribute it and/or modify it
7+
under the terms of the GNU General Public License version 3 as
8+
published by the Free Software Foundation
9+
10+
This library is distributed in the hope that it will be useful, but WITHOUT
11+
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12+
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13+
for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with this library; if not, write to the Free Software Foundation,
17+
Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18+
]]
19+
20+
local heap = {
21+
_VERSION = "0.1",
22+
_NAME = "mapreduce.heap",
23+
}
24+
25+
local function parent(p) return math.floor(p/2) end
26+
local function left(p) return 2*p end
27+
local function right(p) return 2*p+1 end
28+
29+
function heap:top()
30+
return self.data[1]
31+
end
32+
33+
function heap:pop()
34+
local data, cmp = self.data, self.cmp
35+
local v = data[#data]
36+
data[#data] = nil
37+
if #data == 0 then return end
38+
-- heapify(1)
39+
local pos = 1
40+
while true do
41+
local l,r = left(pos), right(pos)
42+
local lv, rv = data[l], data[r]
43+
if not lv then break end
44+
local child = (rv and cmp(rv,lv) and r) or l
45+
if cmp(data[child],v) then
46+
data[pos] = data[child]
47+
pos = child
48+
else
49+
break
50+
end
51+
end
52+
data[pos] = v
53+
end
54+
55+
function heap:push(v)
56+
local data, cmp = self.data, self.cmp
57+
local pos = #data + 1
58+
-- bubble up
59+
while pos > 1 do
60+
local p = parent(pos)
61+
local pv = data[p]
62+
if cmp(v,pv) then
63+
data[pos] = data[p]
64+
pos = p
65+
else
66+
break
67+
end
68+
end
69+
data[pos] = v
70+
end
71+
72+
function heap:clear()
73+
self.data = {}
74+
end
75+
76+
function heap:size()
77+
return #self.data
78+
end
79+
80+
function heap:empty()
81+
return #self.data == 0
82+
end
83+
84+
function heap:__call(cmp)
85+
local obj = {
86+
cmp = cmp or function(a,b) return a < b end,
87+
data = {},
88+
}
89+
setmetatable(obj, { __index = self,
90+
__len = function(self) return #self.data end })
91+
return obj
92+
end
93+
setmetatable(heap, heap)
94+
95+
----------------------------------------------------------------------------
96+
------------------------------ UNIT TEST -----------------------------------
97+
----------------------------------------------------------------------------
98+
99+
heap.utest = function()
100+
local t = { 20, 10, 15, 1 }
101+
local h = heap()
102+
assert(h:empty())
103+
assert(h:top() == nil)
104+
assert(h:size() == 0)
105+
h:push(30)
106+
assert(h:top() == 30)
107+
h:clear()
108+
assert(h:empty())
109+
assert(h:top() == nil)
110+
assert(h:size() == 0)
111+
for i=1,#t do h:push(t[i]) end
112+
table.sort(t)
113+
for i=1,#t do
114+
assert(h:top() == t[i])
115+
h:pop()
116+
end
117+
assert(h:top() == nil)
118+
end
119+
120+
return heap

mapreduce/test.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ local task = require "mapreduce.task"
2424
local server = require "mapreduce.server"
2525
local worker = require "mapreduce.worker"
2626
local persistent_table = require "mapreduce.persistent_table"
27+
local heap = require "mapreduce.heap"
2728

2829
utils.utest()
2930
cnn.utest()
@@ -33,5 +34,6 @@ task.utest()
3334
server.utest()
3435
worker.utest()
3536
persistent_table.utest()
37+
heap.utest()
3638

3739
print("Ok")

mapreduce/utils.lua

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1818
]]
1919
local mongo = require "mongo"
20+
local heap = require "mapreduce.heap"
2021

2122
assert(mongo._VERSION == "0.4" or tonumber(mongo._VERSION > 0.4))
2223

@@ -203,48 +204,40 @@ local function merge_iterator(fs, filenames, make_lines_iterator)
203204
line_iterators[ #line_iterators+1 ] = make_lines_iterator(name)
204205
end
205206
local finished = false
206-
local data = {}
207+
-- heap for efficient merge operation
208+
local cmp = function(a,b) return a[1] < b[1] end
209+
local queue = heap(cmp)
210+
local data = {}
207211
-- take the next data of a given file number
208212
local take_next = function(which)
209213
if line_iterators[which] then
210214
local line = line_iterators[which]()
211215
if line then
212-
data[which] = data[which] or {}
213-
data[which][3],data[which][1],data[which][2] =
214-
line,assert(load(line),
215-
string.format("Impossible to load line '%s' from '%s'",
216-
line, filenames[which]))()
216+
local l,k,v = line,assert(load(line),
217+
string.format("Impossible to load line '%s' from '%s'",
218+
line, filenames[which]))()
219+
queue:push({ k, v, l, which })
217220
else
218-
data[which] = nil
219221
line_iterators[which] = nil
220222
end
221-
else
222-
data[which] = nil
223223
end
224224
end
225-
-- we finished when all the data is nil
226-
local finished = function()
227-
local ret = true
228-
for i=1,#filenames do
229-
if data[i] ~= nil then ret = false break end
225+
-- merge all the data which shares min key
226+
local merge_min_keys = function()
227+
local top = queue:top()
228+
queue:pop()
229+
local key = top[1] -- key
230+
local result = top[2] -- value
231+
take_next(top[4])
232+
while not queue:empty() and key == queue:top()[1] do
233+
local aux = queue:top()
234+
local v = aux[2]
235+
local which = aux[4]
236+
queue:pop()
237+
take_next(which)
238+
for j=1,#v do result[ #result+1 ] = v[j] end
230239
end
231-
return ret
232-
end
233-
-- look for all the data which has the min key
234-
local search_min = function()
235-
local key
236-
local list = {}
237-
for i=1,#filenames do
238-
if data[i] then
239-
local current = data[i][1]
240-
if not key or current <= key then
241-
if not key or current < key then list = {} end
242-
table.insert(list,i)
243-
key = current
244-
end
245-
end
246-
end
247-
return list
240+
return key,result
248241
end
249242
-- initialize data with first line over all files
250243
for i=1,#filenames do take_next(i) end
@@ -256,30 +249,12 @@ local function merge_iterator(fs, filenames, make_lines_iterator)
256249
local assert = assert
257250
local data = data
258251
local take_next = take_next
259-
local finished = finished
260-
-- merge all the files until finished
261-
while not finished() do
252+
local queue = queue
253+
-- merge all the files until finished (empty queue)
254+
while not queue:empty() do
262255
counter = counter + 1
263256
--
264-
local mins_list = search_min()
265-
assert(#mins_list > 0)
266-
local key = data[mins_list[1]][1]
267-
local result
268-
if #mins_list == 1 then
269-
-- only one secuence of values, nothing to merge
270-
result = data[mins_list[1]][2]
271-
take_next(mins_list[1])
272-
else -- if #mins_list == 1 then ... else
273-
result = {}
274-
for i=1,#mins_list do
275-
local which = mins_list[i]
276-
-- sanity check
277-
assert(data[which][1] == key)
278-
local v = data[which][2]
279-
for j=1,#v do result[ #result+1 ] = v[j] end
280-
take_next(which)
281-
end
282-
end -- if #mins_list == 1 then ... else ... end
257+
local key,result = merge_min_keys()
283258
-- verbose output
284259
if counter % MAX_IT_WO_CGARBAGE == 0 then
285260
collectgarbage("collect")

0 commit comments

Comments
 (0)