-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmister_test.py
144 lines (109 loc) · 3.53 KB
/
mister_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, division, print_function, absolute_import
import os
import math
from collections import Counter
import re
import testdata
from testdata import TestCase
from mister import Mister, Miss
testdata.basic_logging()
class MrHelloWorld(Mister):
def prepare(self, count, name):
# we're just going to return the number and the name we pass in
for x in range(count):
yield ([x, name], {})
def map(self, x, name):
return "Process {} says 'hello {}'".format(x, name)
def reduce(self, output, value):
if output is None:
output = []
output.append(value)
return output
class MrWordCount(Mister):
def prepare(self, count, path):
size = os.path.getsize(path)
length = int(math.ceil(size / count))
start = 0
for x in range(count):
kwargs = {}
kwargs["path"] = path
kwargs["start"] = start
kwargs["length"] = length
start += length
yield (), kwargs
def map(self, path, start, length):
output = Counter()
with open(path) as fp:
fp.seek(start, 0)
words = fp.read(length)
for word in re.split(r"\s+", words):
output[word] += 1
return output
def reduce(self, output, count):
if not output:
output = Counter()
output.update(count)
return output
class MrCount(Mister):
def prepare(self, count, numbers):
chunk = int(math.ceil(len(numbers) / count))
# this splits the numbers into chunk size chunks
# https://stackoverflow.com/a/312464/5006
for i in range(0, len(numbers), chunk):
yield (), {"numbers": numbers[i:i + chunk]}
def map(self, numbers):
return sum(numbers)
def reduce(self, output, total):
if not output:
output = 0
output += total
return output
class MrTest(TestCase):
def test_count(self):
numbers = list(range(0, 1000000))
total_sync = sum(numbers)
m = MrCount(numbers)
total_async = m.run()
self.assertEqual(total_sync, total_async)
def test_wordcount(self):
path = testdata.get_contents("bible-kjv").path
output_sync = Counter()
with open(path) as fp:
words = fp.read()
for word in re.split(r"\s+", words):
output_sync[word] += 1
mr = MrWordCount(path)
output_async = mr.run()
# sigh, not accounting for wordbreak bit me, turns out one of the breaks
# was "and" and so the counts don't line up, so we'll just test the
# words
self.assertEqual(
[k[0] for k in output_sync.most_common(10)],
[k[0] for k in output_async.most_common(10)]
)
def test_helloworld(self):
mr = MrHelloWorld("Alice")
output = mr.run()
print(output)
class MsCount(Miss):
def prepare(self, count, size):
for x in range(size):
yield x
def map(self, n):
return 1
def reduce(self, output, incr):
#pout.v(output, incr)
ret = output + incr if output else incr
return ret
def run(self):
self.queue_class.timeout = 0.1
return super(MsCount, self).run()
class MsTest(TestCase):
def test_count(self):
size = 1000
#size = 10
#size = 100000
ms = MsCount(size)
total = ms.run()
self.assertEqual(size, total)