1
2
3
4
5
6
7
8
9
10 from __future__ import division, absolute_import
11
12 from functools import wraps
13
14
15 -def pool_map_seq(func, iterable, chunksize=None, njobs=None):
16 return list(map(func, iterable))
17
20 ret = []
21 for args in iterable:
22 if not isinstance(args, list) and not isinstance(args, tuple):
23 args = (args, )
24 ret.append(func(*args))
25 return ret
26
29 ret = []
30 for func, args, kw in iterable:
31 ret.append(func(*args, **kw))
32 return ret
33
34 try:
35 import multiprocessing
36
37 - def pool_map(func, iterable, chunksize=None, njobs=None):
38 """
39 func must be an unary function
40 """
41 pool = multiprocessing.Pool(njobs)
42 return pool.map(func, iterable, chunksize)
43
45 """
46 func can be of variable arity and each element in iterable should
47 be a tuple of the same length as func's arity
48 """
49
50 pool = multiprocessing.Pool(njobs)
51
52 jobs = []
53 for args in iterable:
54 if not isinstance(args, list) and not isinstance(args, tuple):
55 args = (args, )
56 jobs.append(pool.apply_async(func, args))
57
58 return [job.get() for job in jobs]
59
61 pool = multiprocessing.Pool(njobs)
62
63 jobs = []
64 for func, args, kw in iterable:
65 jobs.append(pool.apply_async(func, args, kw))
66
67 return [job.get() for job in jobs]
68
69 except ImportError:
70 pool_map = pool_map_seq
71 pool_zipped_map = pool_zipped_map_seq
72 pool_parallelize = pool_sequentialize
76 @wraps(func)
77 def delayed_function(*args, **kw):
78 return func, args, kw
79 return delayed_function
80
86 return wrapper
87
88 if __name__ == "__main__":
89 from math import sqrt
90
91 sqrtd = delayed(sqrt)
92 powd = delayed(pow)
93
94 squares = [1, 4, 9, 16]
95 print((pool_parallelize([sqrtd(i) for i in squares], njobs=4)))
96 print((pool_parallelize([powd(i, 0.5) for i in squares], njobs=4)))
97
98 sqrtp = parallelized(sqrt)
99 powp = parallelized(pow)
100 print((sqrtp([i for i in squares], njobs=4)))
101 print((powp([(i, 0.5) for i in squares], njobs=4)))
102