matrix - How to parallel for-loop in python? -


update 1.0 start

it seems when call

for i, wi in enumerate(w.t):     idx.append(i)     result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,))) 

the arguments passed function als_y/als_x not references, copied arguments..so, when x or yis large matrixes, such as, in case, it's 6000*40 or so(and it's for-loop, let's assume number iterations 50 000, ...), exceeds limit of memory.
, tried using global arguments, passing indices parameters functions,

import multiprocessing import time import numpy np  def func(idx):     global     a[idx] += 1    if __name__ == "__main__":     a=range(10)     j in xrange(2):         pool = multiprocessing.pool(processes=8)         result = []         in xrange(10):             result.append(pool.apply_async(func, (i, )))         pool.close()         pool.join()         print         print "sub-process(es) done." 

it outputs: `

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] sub-process(es) done. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] sub-process(es) done. 

so, means still copieda`! now, wonder there way handler issue? appreciate!

update 1.0 end


below code in python solve matrix factorization problem. w = xy. however, codes below not efficient, , hope can converted parallel version, using gpu best, cpu fine. have no experience parallel programming, there can give me advice?

below code factorize matrix using als (alternating least square, details here)

for ii in range(n_iterations):     u, wu in enumerate(w):         x[u] = np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors),                                np.dot(y, np.dot(np.diag(wu), q[u].t))).t #x_inner loop      i, wi in enumerate(w.t):         y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), #y_inner loop                                  np.dot(x.t, np.dot(np.diag(wi), q[:, i])))#y_inner loop     error = get_error(q, x, y, w)     weighted_errors.append(error)     print '{}th iteration completed'.format(ii) 

after used multiprocessing lib, code now:

def als_x(y, wu, q, lambda_, n_factors, u): return np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors),                            np.dot(y, np.dot(np.diag(wu), q[u].t))).t    ii in range(n_iterations): pool = multiprocessing.pool(processes=12)#create pool result = []#store each row x idx = []#store row number u, wu in enumerate(w):     idx.append(u)     result.append(pool.apply_async(als_x, (y, wu, q, lambda_, n_factors, u,))) pool.close() pool.join() u, vector in zip(idx, result):     x[u] = vector.get()#assign result x ###################################### pool = multiprocessing.pool(processes=12)#for y, similar x result = [] idx = [] i, wi in enumerate(w.t):     idx.append(i)     result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,))) pool.close() pool.join() i, vector in zip(idx, result):     y[:,i]  = vector.get() error = get_error(q, x, y, w) weighted_errors.append(error) print '{}th iteration completed'.format(ii), 'error: ',error 

but misery, program crashed silently...

below whole bunch of code.. it's in messy. ignore load_data get_error , vec2str, since here generate matrix randomly..

import pandas pd import numpy np import multiprocessing  def vec2str(vec):     res = ''     dim in len(vec):         res += str(vec[dim]) + ','     return res  def load_data(heads, filename, sep,header=none):     data = pd.read_table(filename, sep=sep, header=header, names=heads)     rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally...     q = rp.fillna(0)     q = q.values     w = q >0.5     w[w == true] = 1     w[w == false] = 0     w = w.astype(np.float64, copy=false)     return q, w, rp  def get_error(q, x, y, w):     return np.sum((w * (q - np.dot(x, y)))**2)  ''' x[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .t)) +  * np.eye(),                                np.dot(, np.dot(np.diag(), q[u].t))).t ''' def als_x(y, wu, q, lambda_, n_factors, u):     return np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors),                                np.dot(y, np.dot(np.diag(wu), q[u].t))).t  ''' y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors),                                  np.dot(x.t, np.dot(np.diag(wi), q[:, i]))) '''  def als_y(x, wi, q, lambda_, n_factors, i):     return np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors),                                  np.dot(x.t, np.dot(np.diag(wi), q[:, i])))    if __name__ == "__main__":      lambda_ = 0.1     n_factors = 40     filename = 'data_songid'     n_iterations = 20     #q, w, rp = load_data(['uid', 'sid', 'rating'], filename, ',')     q = np.random.rand(1000,1000)     m, n = q.shape     w = np.eye(1000)     print 'loading data finished, ', 'size: ', q.shape     print 'settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors)     x = 5 * np.random.rand(m, n_factors)      y = 5 * np.random.rand(n_factors, n)     errors = []     ii in range(n_iterations):         x = np.linalg.solve(np.dot(y, y.t) + lambda_ * np.eye(n_factors),                          np.dot(y, q.t)).t         y = np.linalg.solve(np.dot(x.t, x) + lambda_ * np.eye(n_factors),                         np.dot(x.t, q))         if ii % 100 == 0:             print('{}th iteration completed'.format(ii))         errors.append(get_error(q, x, y, w))         q_hat = np.dot(x, y)         print('error of rated movies: {}'.format(get_error(q, x, y, w)))     print errors     #####als start....#####     print '*'*100     weighted_errors = []     ii in range(n_iterations):         pool = multiprocessing.pool(processes=12)         result = []         idx = []         u, wu in enumerate(w):             idx.append(u)             result.append(pool.apply_async(als_x, (y, wu, q, lambda_, n_factors, u,)))         pool.close()         pool.join()         u, vector in zip(idx, result):             x[u] = vector.get()         ######################################         pool = multiprocessing.pool(processes=12)         result = []         idx = []         i, wi in enumerate(w.t):             idx.append(i)             result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,)))         pool.close()         pool.join()         i, vector in zip(idx, result):             y[:,i]  = vector.get()         error = get_error(q, x, y, w)         weighted_errors.append(error)         print '{}th iteration completed'.format(ii), 'error: ',error      weighted_q_hat = np.dot(x,y)     print weighted_errors     x.tofile('x.bin')     y.tofile('y.bin')     latent_user_file = open('user_latent','w')     idx in len(rp.axes[0]):         latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(x[idx,:]) + '\n')      latent_mid_file = open('mid_latent', 'w')     idx in len(rp.axes[1]):         latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(y.t[idx,:]) + '\n') 

last year encountered desire "parallel loop" in python, , hacked 1 part of work physics paper. there many modules want, found working pp way wanted arbitrary functions.

if want looks this:

resultlist = library_parallelloop.main(     function = examplefunction,     listofargsets = listofargsets,     algorithm = 'pp',     printextra = true     ) 

then point git hub instead of providing entire source in post implementation working painfully many lines, , involved deep copying python functions apparently else wasn't pre-built in python.

finding primes example:

https://github.com/douglasquincyadams/main/blob/master/test_parallelloop.py

repo:

https://github.com/douglasquincyadams/main

if download repo in dark corner of computer - working snippet should be:

import library_parallelloop  def do_the_thing_function(ii):     u, wu in enumerate(w):     x[u] = np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors),                            np.dot(y, np.dot(np.diag(wu), q[u].t))).t #x_inner loop      i, wi in enumerate(w.t):     y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), #y_inner loop                              np.dot(x.t, np.dot(np.diag(wi), q[:, i])))#y_inner loop     error = get_error(q, x, y, w)     weighted_errors.append(error)     print '{}th iteration completed'.format(ii)     return #whatever result supposed be... code doesn't work on own  listofargsets = [] ii in range(n_iterations):     listofargsets.append(  { "ii"  :  ii ,  }  )  resultlist = library_parallelloop.main(     function = do_the_thing_function,     listofargsets = listofargsets,     algorithm = 'pp',     printextra = true     ) 

if asked me - parallel loop 1 above should nice handy , built in languages seems somehow cryptically exampled wizards in tower , doesn't quite work when try out on crappy laptop. anyways - hope helps.

additional note - suggest if want solve arbitrary large-scale parallelization problem (anything more simple loops), use mpi because has sorts of bells , whistles can allow processes talk each other mid run. mpi science people use biggest simulations bigger sized clusters designed handling large jobs (~10k+ cores) support mpi , unlikely support pp or multiprocessing module. if want make use of cores in pc, (or few pc's on network ) choose easiest 1 working.


Popular posts from this blog

php - How should I create my API for mobile applications (Needs Authentication) -

5 Reasons to Blog Anonymously (and 5 Reasons Not To)

Google AdWords and AdSense - A Dynamic Small Business Marketing Duo