-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmister.py
447 lines (351 loc) · 14.4 KB
/
mister.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, division, print_function, absolute_import
import sys
import logging
import time
import math
import multiprocessing
from multiprocessing import queues
__version__ = "0.0.3"
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.addHandler(logging.NullHandler())
class Process(multiprocessing.Process):
def log_start(self):
name = self.name
start = time.time()
logger.debug("{} Starting".format(name))
return start
def log_stop(self, start):
name = self.name
stop = time.time()
elapsed = round(abs(stop - start) * 1000.0, 1)
total = "{:.1f} ms".format(elapsed)
logger.debug("{} finished in {}".format(name, total))
class Queue(object):
timeout = 5.0
empty_count = 1
queue_class = multiprocessing.Queue # this is actually a function
def __init__(self):
self.queue = self.queue_class()
def enqueue(self, value):
enqueued = False
enqueue_count = 1
while not enqueued:
try:
# queue size taps out at 32767, booooo
# http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767
#queue.put_nowait(val)
self.queue.put(value, True, self.timeout)
enqueued = True
if enqueue_count > 1:
logger.debug("Enqueued after {} tries".format(enqueue_count))
except queues.Full as e:
logger.debug("Queue full {}".format(enqueue_count))
enqueue_count += 1
#logger.exception(e)
#queue.close()
# If we ever hit a full queue you lose a ton of data but if you
# don't call this method then the process just hangs
#reduce_queue.cancel_join_thread()
def dequeue(self):
count = 0
while True:
try:
return self.queue.get(True, self.timeout)
except queues.Empty as e:
count += 1
if count >= self.empty_count:
raise BufferError()
def task_done(self):
return self.queue.task_done()
def empty(self):
return self.queue.empty()
class Count(int):
"""This wraps the process count value (how many subprocesses the job will have)
and just makes it a bit easier to chunk up the data, an instance of this will
be passed into reduce()
"""
def chunksize(self, n):
"""Turns out I keep doing something like this, basically I have some value
like 100 and I have count processes that are going to go through the data
so I need to find out how many rows/pieces to pass to each map method, this
is what this method does
:param n: the total size of whatever you are dividing up between all the
map callbacks
:returns: int, basically the value of n / self
"""
length = int(math.ceil(n / self))
return length
def chunks(self, l):
"""If you have a list l this will divide it up into the appropriate chunks
to pass to the map callback as *args, **kwargs
:param l: list
:returns: self tuples of (args, kwargs)
"""
n = len(l)
chunksize = self.chunksize(n)
for i in range(0, n, chunksize):
yield (l[i:i + chunksize],), {}
def bounds(self, n, *args, **kwargs):
"""if you have a maximum size n this will chunk up n to self sections of
start (offset), length (limit)
:param n: int, the full size of your whatever your data is, this will be
used to decide the size of the chunks
:param *args: these will be passed through as the args part of the return
tuple
:param **kwargs: these will be copied and "start" and "length" will be added
representing the start and stop (start + length) section of n
:returns: generator of (args, kwargs) tuples ready to be passed to map()
"""
length = self.chunksize(n)
start = 0
for x in range(n):
kw = dict(kwargs)
kw["start"] = start
kw["length"] = length
start += length
yield args, kw
class MisterMap(multiprocessing.Process):
"""This is a package internal class that handles the actual threading of the
map method
https://docs.python.org/3/library/multiprocessing.html
"""
def __init__(self, target, name, queue, args, kwargs):
"""
:param target: the map callback
:param name: the name assigned to this process
:param queue: multiprocessing.JoinableQueue, the queue used for interprocess
communication
:param args: the *args that will be passed to target
:param kwargs: the **kwargs that will be passed to target
"""
def wrapper_target(target, queue, args, kwargs):
is_logged = logger.isEnabledFor(logging.DEBUG)
if is_logged:
logger.debug("{} Starting".format(name))
start = time.time()
val = target(*args, **kwargs)
if val is not None:
try:
# queue size taps out at 32767, booooo
# http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767
#queue.put_nowait(val)
queue.put(val, True, 1.0)
except queues.Full as e:
logger.exception(e)
#queue.close()
# If we ever hit a full queue you lose a ton of data but if you
# don't call this method then the process just hangs
queue.cancel_join_thread()
if is_logged:
stop = time.time()
elapsed = round(abs(stop - start) * 1000.0, 1)
total = "{:.1f} ms".format(elapsed)
logger.debug("{} finished in {}".format(name, total))
super(MisterMap, self).__init__(target=wrapper_target, name=name, kwargs={
"target": target,
"queue": queue,
"args": args,
"kwargs": kwargs
})
def log_start(self, name):
is_logged = logger.isEnabledFor(logging.DEBUG)
start = time.time()
if is_logged:
logger.debug("{} Starting".format(name))
return start, is_logged
class Mister(object):
"""If you want to subclass this is the class to use, anything you pass into __init__
will be passed to your child's prepare() method
https://en.wikipedia.org/wiki/MapReduce
"""
map_class = MisterMap
def __init__(self, *args, **kwargs):
"""create an instance
:param *args: passed to prepare()
:param **kwargs: passed to prepare()
:kwargs target_prepare: callback, see the .prepare method
:kwargs target_map: callback, see the .map method
:kwargs target_reduce: callback, see the .reduce method
:kwargs count: int, how many processes you want
"""
target_prepare = kwargs.pop("prepare", kwargs.pop("target_prepare", None))
if target_prepare:
self.prepare = target_prepare
target_map = kwargs.pop("map", kwargs.pop("target_map", None))
if target_map:
self.map = target_map
target_reduce = kwargs.pop("reduce", kwargs.pop("target_reduce", None))
if target_reduce:
self.reduce = target_reduce
count = kwargs.pop("count", 0)
if not count:
count = multiprocessing.cpu_count()
# we subtract one for the main process
count = max(count - 1, 1)
#count = count - 1 if count > 1 else 1
self.count = count
self.args = args
self.kwargs = kwargs
def prepare(self, count, *args, **kwargs):
"""Handle chunking the data for the map() method
:param count: how many processes will work on the data, basically this is
how many chunks you want the data to split into
:param *args: the values passed into __init__
:param **kwargs: the values passed into __init__
:returns: count iter|list of tuples, basically you want to return count
tuples in the form of ((), {}) (ie, args, kwargs), the tuple will be
passed to .map() as *args, **kwargs
"""
raise NotImplementedError()
def map(self, *args, **kwargs):
"""this method will be called once for each tuple returned from prepare
:param *args: The first value of the tuple returned from prepare()
:param **kwargs: The second value of the tuple returned from prepare()
:returns: mixed, you can return anything and it will be passed to reduce
"""
raise NotImplementedError()
def reduce(self, output, value):
"""This method brings it all together
:param output: this is aggregate values of everything returned from map, the
first time this method is called output=None so you will have to initialize
it and then add value to it however you want to do that
:param value: mixed, the return value from a call to map()
:return: output, usually you return output updated however you want, the value
returned from a call to reduce will be passed into the next call to reduce
as the output value
"""
raise NotImplementedError()
def run(self):
"""run the map/reduce job, this is where all the magic happens
:returns: mixed, the final output returned from the final call to reduce()
"""
ret = None
queue = multiprocessing.JoinableQueue()
processes = []
ident = 1
count = Count(self.count)
for args, kwargs in self.prepare(count, *self.args, **self.kwargs):
name = "mister-map-{}".format(ident)
logger.debug("{} = {}/{}".format(name, ident, count))
t = self.map_class(
target=self.map,
name=name,
queue=queue,
args=args,
kwargs=kwargs
)
t.start()
processes.append(t)
ident += 1
output = None
while processes or not queue.empty():
try:
val = queue.get(True, 1.0)
ret = self.reduce(output, val)
if ret is not None:
output = ret
except queues.Empty:
pass
else:
queue.task_done()
# faster than using any((t.is_alive() for t in mts))
processes = [t for t in processes if t.is_alive()]
return output
class MissMap(Process):
"""This is a package internal class that handles the actual threading of the
map method
https://docs.python.org/3/library/multiprocessing.html
"""
queue_timeout_count = 1
def __init__(self, target, name, map_queue, reduce_queue):
"""
:param target: the map callback
:param name: the name assigned to this process
:param map_queue: multiprocessing.JoinableQueue, the values yielded from the
prepare() callback will end up here and will be dequeued and sent to
the map() callback
:param reduce_queue: multiprocessing.JoinableQueue, whatever the map() callback
returns will be placed in this queue and passed to the reduce() callback
"""
super(MissMap, self).__init__(target=self.target, name=name, kwargs={
"target": target,
"map_queue": map_queue,
"reduce_queue": reduce_queue,
})
def target(self, target, map_queue, reduce_queue):
start = self.log_start()
while True:
try:
map_val = map_queue.dequeue()
reduce_val = target(map_val)
if reduce_val is not None:
reduce_queue.enqueue(reduce_val)
except BufferError:
break
self.log_stop(start)
class MissPrepare(Process):
def __init__(self, target, name, count, map_queue, args, kwargs):
super(MissPrepare, self).__init__(target=self.target, name=name, kwargs={
"target": target,
"count": count,
"map_queue": map_queue,
"args": args,
"kwargs": kwargs,
})
def target(self, target, count, map_queue, args, kwargs):
start = self.log_start()
# now we populate the queue all our map processes are going to read from
for map_val in target(count, *args, **kwargs):
map_queue.enqueue(map_val)
self.log_stop(start)
class Miss(Mister):
map_class = MissMap
prepare_class = MissPrepare
queue_class = Queue
def run(self):
"""run the map/reduce job, this is where all the magic happens
:returns: mixed, the final output returned from the final call to reduce()
"""
ret = None
map_queue = self.queue_class()
reduce_queue = self.queue_class()
processes = []
map_count = Count(self.count - 1)
prepare_count = Count(1)
# first we start all our mapping processes
for ident in range(1, map_count + 1):
name = "miss-map-{}".format(ident)
logger.debug("{} = {}/{}".format(name, ident, map_count))
t = self.map_class(
target=self.map,
name=name,
map_queue=map_queue,
reduce_queue=reduce_queue,
)
t.start()
processes.append(t)
for ident in range(1, prepare_count + 1):
name = "miss-prepare-{}".format(ident)
t = self.prepare_class(
target=self.prepare,
name=name,
count=map_count,
map_queue=map_queue,
args=self.args,
kwargs=self.kwargs,
)
t.start()
# now we can reduce everything if we need to
output = None
while processes or not reduce_queue.empty():
try:
val = reduce_queue.dequeue()
ret = self.reduce(output, val)
if ret is not None:
output = ret
except BufferError:
pass
processes = [t for t in processes if t.is_alive()]
return output