[IPython-User] Advice re parallelizing many quick calculations
Junkshops
junkshops@gmail....
Sun Jul 1 14:07:14 CDT 2012
Hmm, I guess this question was either too hard, too stupid, or too long.
In the hopes it was the latter problem I'll try again.
Short version:
Is attempting to parallelize simple per row calculations (e.g. a few
multiplications/additions/etc) on a large 2D dataset a waste of time
when routing via the notebook due to the time required for data
packaging and transport? if so, is the solution to this problem to have
the individual python engines read and write the data directly?
Long version:
I have a dataset of a million rows with 7 numbers per row. I want to
apply this function to every row:
def scaleArray(expt):
return [int(expt[0]) / 5 - 2,
int(expt[1]) / 5 - 2,
(float(expt[2]) - 0.82) * 50,
(float(expt[3]) - 0.91) * 100,
float(expt[4]) * 10,
int(expt[5]),
int(expt[6])
]
Every attempt I've made to parallelize this via the notebook has
resulted in slowing down the processing via at least an order of
magnitude. I've been using 4 engines via direct and loadbalanced views
and tried synchronous and asynchronous calls and splitting the data into
4 chunks and directly calling each engine. Is there some way to do this
or do the costs of packaging and transporting the data compared to the
calculations mean this function is unparallelizable via the notebook?
Extra super special long version is the original email, below.
Thanks and cheers, Gavin
On 6/28/2012 2:59 PM, Junkshops wrote:
> Hi all,
>
> Total noob to the iPython notebook and parallelization here, but so
> far it's incredibly cool. My only minor gripe at this point is that
> it's difficult to transfer notebook contents to other media formats,
> such as this email (unless I'm missing a feature somewhere or there's
> some clever way I haven't discovered). A download as txt or rtf
> feature would be handy.
>
> Anyway, I've been reading the docs and playing with parallelization of
> a simple routine, but so far I've only managed to make it much slower,
> so I'm hoping someone might be able to give me some advice. I'm
> worried that perhaps the calculations are so simple, no matter how I
> parallelize it the costs of un/packing and transporting the objects
> might dwarf the calcs.
>
> I'm running ipcluster and the notebook server on a VM with gobs of
> processors and memory and very few users, and connecting FF7 to the nb
> server via an ssh tunnel. Startup commands are thus:
>
> -bash-4.1$ ipcluster start -n 4 &
> -bash-4.1$ ipython notebook --pylab inline --no-browser &
>
> Here's what I've tried so far:
>
> In [1]: from IPython.parallel import Client
> import time
>
> In [2]: c = Client()
> lview = c.load_balanced_view()
> dview = c[:]
> print lview, dview
>
> <LoadBalancedView None> <DirectView [0, 1, 2, 3]>
>
> In [7]: data[-10:]
>
> Out[7]:
>
> [['40', '20', '0.82', '0.99', '0.4', '7', '2569'],
> ['40', '25', '0.82', '0.99', '0.4', '7', '2569'],
> ['40', '10', '0.82', '0.99', '0.4', '8', '2569'],
> ['40', '15', '0.82', '0.99', '0.4', '8', '2569'],
> ['40', '20', '0.82', '0.99', '0.4', '8', '2569'],
> ['40', '25', '0.82', '0.99', '0.4', '8', '2569'],
> ['40', '10', '0.82', '0.99', '0.4', '9', '2569'],
> ['40', '15', '0.82', '0.99', '0.4', '9', '2569'],
> ['40', '20', '0.82', '0.99', '0.4', '9', '2569'],
> ['40', '25', '0.82', '0.99', '0.4', '9', '2569']]
>
> In [8]: len(data)
>
> Out[8]: 1000000
>
> In [9]: testdata = data[:100000]
>
> For the purposes of testing reduce dataset size. Results on the
> smaller set more or less scale when the full set is processed.
>
> The following function could be improved for speed but for the point
> of this test it doesn't matter.
>
> In [10]:
>
> def scaleArray(expt):
> return [int(expt[0]) / 5 - 2,
> int(expt[1]) / 5 - 2,
> (float(expt[2]) - 0.82) * 50,
> (float(expt[3]) - 0.91) * 100,
> float(expt[4]) * 10,
> int(expt[5]),
> int(expt[6])
> ]
>
> In [11]: t = time.clock()
> map(scaleArray, testdata)
> time.clock() - t
>
> Out[11]: 1.2399999999999993
>
> In [12]: t = time.clock()
> dview.map(scaleArray, testdata)
> time.clock() - t
>
> Out[12]: 19.82
>
> In [13]: t = time.clock()
> dview.map_sync(scaleArray, testdata)
> time.clock() - t
>
> Out[13]: 19.989999999999998
>
> In [14]: t = time.clock()
> asr = dview.map_async(scaleArray, testdata)
> asr.wait()
> time.clock() - t
>
> Out[14]: 21.949999999999996
>
> In [92]: t = time.clock()
> lview.map(scaleArray, data)
> time.clock() - t
>
> ---------------------------------------------------------------------------
>
> KeyboardInterrupt Traceback (most recent call
> last)
> *snip*
>
> The above command ran for ~9 hours with no results (vs ~10 min for a
> direct view) on the full data set, so not bothering with the 10x
> smaller set.
>
> In [15]:
> def scaleArray2(data):
> scaled = []
> for expt in data:
> scaled.append([int(expt[0]) / 5 - 2,
> int(expt[1]) / 5 - 2,
> (float(expt[2]) - 0.82) * 50,
> (float(expt[3]) - 0.91) * 100,
> float(expt[4]) * 10,
> int(expt[5]),
> int(expt[6])
> ])
> return scaled
>
> def chunks(l, n):
> return [l[i:i+n] for i in range(0, len(l), n)]
>
> In [16]: chsize = len(testdata) / len(c.ids)
>
> In [17]: t = time.clock()
> map(scaleArray2, chunks(testdata,chsize))
> time.clock() - t
>
> Out[17]: 1.8100000000000023
>
> In [18]: t = time.clock()
> dview.map(scaleArray2, chunks(testdata,chsize))
> time.clock() - t
>
> Out[18]: 16.810000000000002
>
> In [19]: t = time.clock()
> dview.map_sync(scaleArray2, chunks(testdata,chsize))
> time.clock() - t
>
> Out[19]: 17.060000000000002
>
> In [20]: t = time.clock()
> asr = dview.map_async(scaleArray2, chunks(testdata,chsize))
> asr.wait()
> time.clock() - t
>
> Out[20]: 19.099999999999994
>
> In [21]: t = time.clock()
> dc = chunks(testdata, chsize)
> for i in range(0, len(dc)):
> print "Submitting job to engine", i, "at", time.clock()
> c[i].apply(scaleArray2, dc[i])
> time.clock() - t
>
> Submitting job to engine 0 at 123.88
> Submitting job to engine 1 at 127.45
> Submitting job to engine 2 at 130.98
> Submitting job to engine 3 at 134.55
>
> Out[21]: 15.319999999999993
>
> In [25]: t = time.clock()
> dc = chunks(testdata, chsize)
> asrlist = []
> for i in range(0, len(dc)):
> print "Submitting job to engine", i, "at", time.clock()
> asrlist.append(c[i].apply_async(scaleArray2, dc[i]))
> for i in range(0, len(asrlist)):
> asrlist[i].wait()
> print "Job done on engine", i, "at", time.clock()
> time.clock() - t
>
> Submitting job to engine 0 at 222.45
> Submitting job to engine 1 at 226.43
> Submitting job to engine 2 at 230.41
> Submitting job to engine 3 at 234.38
> Job done on engine 0 at 249.41
> Job done on engine 1 at 249.41
> Job done on engine 2 at 249.41
> Job done on engine 3 at 249.41
>
> Out[25]: 26.969999999999999
>
> Either I'm doing something very wrong (probably) or this type of
> problem just isn't amenable to parallelization (which I don't *think*
> should be the case, especially when the data set is just being broken
> into four chunks and sent to four engines - but maybe the cost of
> pickling and transporting all that data dwarfs the cost of running the
> calculations).
>
> Anyway. If you haven't deleted this overly long email yet, I'd very
> much appreciate any advice you can give me.
>
> Cheers, Gavin
>
More information about the IPython-User
mailing list