Package pycosmicstar :: Module parallellistcompreension
[hide private]
[frames] | no frames]

Source Code for Module pycosmicstar.parallellistcompreension

  1  #!/usr/bin/env python 
  2  # *-* Coding: UTF-8 *-* 
  3   
  4  # Copyright (c) Mathieu Blondel 2009 
  5  # BSD license 
  6   
  7  # Based on ideas from Gael Varoquaux 
  8  # http://gael-varoquaux.info/blog/wp-content/uploads/2009/11/parallel_py 
  9   
 10  from __future__ import division, absolute_import 
 11   
 12  from functools import wraps 
13 14 15 -def pool_map_seq(func, iterable, chunksize=None, njobs=None):
16 return list(map(func, iterable))
17
18 19 -def pool_zipped_map_seq(func, iterable, chunksize=None, njobs=None):
20 ret = [] 21 for args in iterable: 22 if not isinstance(args, list) and not isinstance(args, tuple): 23 args = (args, ) 24 ret.append(func(*args)) 25 return ret
26
27 28 -def pool_sequentialize(iterable, njobs=None):
29 ret = [] 30 for func, args, kw in iterable: 31 ret.append(func(*args, **kw)) 32 return ret
33 34 try: 35 import multiprocessing
36 37 - def pool_map(func, iterable, chunksize=None, njobs=None):
38 """ 39 func must be an unary function 40 """ 41 pool = multiprocessing.Pool(njobs) 42 return pool.map(func, iterable, chunksize)
43
44 - def pool_zipped_map(func, iterable, chunksize=None, njobs=None):
45 """ 46 func can be of variable arity and each element in iterable should 47 be a tuple of the same length as func's arity 48 """ 49 # FIXME: chunksize is currently ignored 50 pool = multiprocessing.Pool(njobs) 51 52 jobs = [] 53 for args in iterable: 54 if not isinstance(args, list) and not isinstance(args, tuple): 55 args = (args, ) 56 jobs.append(pool.apply_async(func, args)) 57 58 return [job.get() for job in jobs]
59
60 - def pool_parallelize(iterable, njobs=None):
61 pool = multiprocessing.Pool(njobs) 62 63 jobs = [] 64 for func, args, kw in iterable: 65 jobs.append(pool.apply_async(func, args, kw)) 66 67 return [job.get() for job in jobs]
68 69 except ImportError: 70 pool_map = pool_map_seq 71 pool_zipped_map = pool_zipped_map_seq 72 pool_parallelize = pool_sequentialize
73 74 75 -def delayed(func):
76 @wraps(func) 77 def delayed_function(*args, **kw): 78 return func, args, kw
79 return delayed_function 80
81 82 -def parallelized(func):
83 @wraps(func) 84 def wrapper(iterable, njobs=None): 85 return pool_zipped_map(func, iterable, njobs=njobs)
86 return wrapper 87 88 if __name__ == "__main__": 89 from math import sqrt 90 91 sqrtd = delayed(sqrt) 92 powd = delayed(pow) 93 94 squares = [1, 4, 9, 16] 95 print((pool_parallelize([sqrtd(i) for i in squares], njobs=4))) 96 print((pool_parallelize([powd(i, 0.5) for i in squares], njobs=4))) 97 98 sqrtp = parallelized(sqrt) 99 powp = parallelized(pow) 100 print((sqrtp([i for i in squares], njobs=4))) 101 print((powp([(i, 0.5) for i in squares], njobs=4))) 102