matrix - How to parallel for-loop in python? -
update 1.0 start
it seems when call
for i, wi in enumerate(w.t): idx.append(i) result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,)))
the arguments passed function als_y/als_x
not references, copied arguments..so, when x
or y
is large matrixes
, such as, in case, it's 6000*40
or so(and it's for-loop
, let's assume number iterations 50 000
, ...), exceeds limit of memory.
, tried using global arguments, passing indices parameters functions,
import multiprocessing import time import numpy np def func(idx): global a[idx] += 1 if __name__ == "__main__": a=range(10) j in xrange(2): pool = multiprocessing.pool(processes=8) result = [] in xrange(10): result.append(pool.apply_async(func, (i, ))) pool.close() pool.join() print print "sub-process(es) done."
it outputs: `
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] sub-process(es) done. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] sub-process(es) done.
so, means still copied
a`! now, wonder there way handler issue? appreciate!
update 1.0 end
below code in python solve matrix factorization problem. w = xy. however, codes below not efficient, , hope can converted parallel version, using gpu best, cpu fine. have no experience parallel programming, there can give me advice?
below code factorize matrix using als (alternating least square, details here)
for ii in range(n_iterations): u, wu in enumerate(w): x[u] = np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors), np.dot(y, np.dot(np.diag(wu), q[u].t))).t #x_inner loop i, wi in enumerate(w.t): y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), #y_inner loop np.dot(x.t, np.dot(np.diag(wi), q[:, i])))#y_inner loop error = get_error(q, x, y, w) weighted_errors.append(error) print '{}th iteration completed'.format(ii)
after used multiprocessing lib, code now:
def als_x(y, wu, q, lambda_, n_factors, u): return np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors), np.dot(y, np.dot(np.diag(wu), q[u].t))).t ii in range(n_iterations): pool = multiprocessing.pool(processes=12)#create pool result = []#store each row x idx = []#store row number u, wu in enumerate(w): idx.append(u) result.append(pool.apply_async(als_x, (y, wu, q, lambda_, n_factors, u,))) pool.close() pool.join() u, vector in zip(idx, result): x[u] = vector.get()#assign result x ###################################### pool = multiprocessing.pool(processes=12)#for y, similar x result = [] idx = [] i, wi in enumerate(w.t): idx.append(i) result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,))) pool.close() pool.join() i, vector in zip(idx, result): y[:,i] = vector.get() error = get_error(q, x, y, w) weighted_errors.append(error) print '{}th iteration completed'.format(ii), 'error: ',error
but misery, program crashed silently...
below whole bunch of code.. it's in messy. ignore load_data
get_error
, vec2str
, since here generate matrix randomly..
import pandas pd import numpy np import multiprocessing def vec2str(vec): res = '' dim in len(vec): res += str(vec[dim]) + ',' return res def load_data(heads, filename, sep,header=none): data = pd.read_table(filename, sep=sep, header=header, names=heads) rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally... q = rp.fillna(0) q = q.values w = q >0.5 w[w == true] = 1 w[w == false] = 0 w = w.astype(np.float64, copy=false) return q, w, rp def get_error(q, x, y, w): return np.sum((w * (q - np.dot(x, y)))**2) ''' x[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .t)) + * np.eye(), np.dot(, np.dot(np.diag(), q[u].t))).t ''' def als_x(y, wu, q, lambda_, n_factors, u): return np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors), np.dot(y, np.dot(np.diag(wu), q[u].t))).t ''' y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), np.dot(x.t, np.dot(np.diag(wi), q[:, i]))) ''' def als_y(x, wi, q, lambda_, n_factors, i): return np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), np.dot(x.t, np.dot(np.diag(wi), q[:, i]))) if __name__ == "__main__": lambda_ = 0.1 n_factors = 40 filename = 'data_songid' n_iterations = 20 #q, w, rp = load_data(['uid', 'sid', 'rating'], filename, ',') q = np.random.rand(1000,1000) m, n = q.shape w = np.eye(1000) print 'loading data finished, ', 'size: ', q.shape print 'settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors) x = 5 * np.random.rand(m, n_factors) y = 5 * np.random.rand(n_factors, n) errors = [] ii in range(n_iterations): x = np.linalg.solve(np.dot(y, y.t) + lambda_ * np.eye(n_factors), np.dot(y, q.t)).t y = np.linalg.solve(np.dot(x.t, x) + lambda_ * np.eye(n_factors), np.dot(x.t, q)) if ii % 100 == 0: print('{}th iteration completed'.format(ii)) errors.append(get_error(q, x, y, w)) q_hat = np.dot(x, y) print('error of rated movies: {}'.format(get_error(q, x, y, w))) print errors #####als start....##### print '*'*100 weighted_errors = [] ii in range(n_iterations): pool = multiprocessing.pool(processes=12) result = [] idx = [] u, wu in enumerate(w): idx.append(u) result.append(pool.apply_async(als_x, (y, wu, q, lambda_, n_factors, u,))) pool.close() pool.join() u, vector in zip(idx, result): x[u] = vector.get() ###################################### pool = multiprocessing.pool(processes=12) result = [] idx = [] i, wi in enumerate(w.t): idx.append(i) result.append(pool.apply_async(als_y, (x, wi, q, lambda_, n_factors, i,))) pool.close() pool.join() i, vector in zip(idx, result): y[:,i] = vector.get() error = get_error(q, x, y, w) weighted_errors.append(error) print '{}th iteration completed'.format(ii), 'error: ',error weighted_q_hat = np.dot(x,y) print weighted_errors x.tofile('x.bin') y.tofile('y.bin') latent_user_file = open('user_latent','w') idx in len(rp.axes[0]): latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(x[idx,:]) + '\n') latent_mid_file = open('mid_latent', 'w') idx in len(rp.axes[1]): latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(y.t[idx,:]) + '\n')
last year encountered desire "parallel loop" in python, , hacked 1 part of work physics paper. there many modules want, found working pp way wanted arbitrary functions.
if want looks this:
resultlist = library_parallelloop.main( function = examplefunction, listofargsets = listofargsets, algorithm = 'pp', printextra = true )
then point git hub instead of providing entire source in post implementation working painfully many lines, , involved deep copying python functions apparently else wasn't pre-built in python.
finding primes example:
https://github.com/douglasquincyadams/main/blob/master/test_parallelloop.py
repo:
https://github.com/douglasquincyadams/main
if download repo in dark corner of computer - working snippet should be:
import library_parallelloop def do_the_thing_function(ii): u, wu in enumerate(w): x[u] = np.linalg.solve(np.dot(y, np.dot(np.diag(wu), y.t)) + lambda_ * np.eye(n_factors), np.dot(y, np.dot(np.diag(wu), q[u].t))).t #x_inner loop i, wi in enumerate(w.t): y[:,i] = np.linalg.solve(np.dot(x.t, np.dot(np.diag(wi), x)) + lambda_ * np.eye(n_factors), #y_inner loop np.dot(x.t, np.dot(np.diag(wi), q[:, i])))#y_inner loop error = get_error(q, x, y, w) weighted_errors.append(error) print '{}th iteration completed'.format(ii) return #whatever result supposed be... code doesn't work on own listofargsets = [] ii in range(n_iterations): listofargsets.append( { "ii" : ii , } ) resultlist = library_parallelloop.main( function = do_the_thing_function, listofargsets = listofargsets, algorithm = 'pp', printextra = true )
if asked me - parallel loop 1 above should nice handy , built in languages seems somehow cryptically exampled wizards in tower , doesn't quite work when try out on crappy laptop. anyways - hope helps.
additional note - suggest if want solve arbitrary large-scale parallelization problem (anything more simple loops), use mpi because has sorts of bells , whistles can allow processes talk each other mid run. mpi science people use biggest simulations bigger sized clusters designed handling large jobs (~10k+ cores) support mpi , unlikely support pp or multiprocessing module. if want make use of cores in pc, (or few pc's on network ) choose easiest 1 working.