[IPython-User] Advice re parallelizing many quick calculations

MinRK benjaminrk@gmail....
Sun Jul 1 15:28:18 CDT 2012


On Sun, Jul 1, 2012 at 12:07 PM, Junkshops <junkshops@gmail.com> wrote:
> Hmm, I guess this question was either too hard, too stupid, or too long.
> In the hopes it was the latter problem I'll try again.

Sorry - we were blitzing on the 0.13 release.

>
> Short version:
>
> Is attempting to parallelize simple per row calculations (e.g. a few
> multiplications/additions/etc) on a large 2D dataset a waste of time
> when routing via the notebook due to the time required for data
> packaging and transport? if so, is the solution to this problem to have
> the individual python engines read and write the data directly?
>
> Long version:
>
> I have a dataset of a million rows with 7 numbers per row. I want to
> apply this function to every row:
>
> def scaleArray(expt):
>      return [int(expt[0]) / 5 - 2,
>              int(expt[1]) / 5 - 2,
>              (float(expt[2]) - 0.82) * 50,
>              (float(expt[3]) - 0.91) * 100,
>              float(expt[4]) * 10,
>              int(expt[5]),
>              int(expt[6])
>              ]
>
> Every attempt I've made to parallelize this via the notebook has
> resulted in slowing down the processing via at least an order of
> magnitude. I've been using 4 engines via direct and loadbalanced views
> and tried synchronous and asynchronous calls and splitting the data into
> 4 chunks and directly calling each engine. Is there some way to do this
> or do the costs of packaging and transporting the data compared to the
> calculations mean this function is unparallelizable via the notebook?

I seriously doubt your code will ever be faster if you parallelize it
(at least with remote tools like IPython).
Moving data around is expensive, so to parallelize code it needs to be
computationally or io-intensive, and yours is neither.

I think your best bet is to do simple vectorized version with numpy:

def scaleArray2(expt):
    """vectorized scaleArray"""
    import numpy as np
    x = numpy.empty_like(expt)
    # only cast ints, which assumes that it's a float array
    # if casting is not important, simply remove the astype(int)
    x[:,0] = expt[:,0].astype(int) / 5 - 2
    x[:,1] = expt[:,1].astype(int) / 5 - 2
    x[:,2] = (expt[:,2] - 0.82) * 50
    x[:,3] = (expt[:,3] - 0.91) * 100
    x[:,4] = expt[:,4] * 10
    x[:,5:] = expt[:,5:].astype(int)
    return x

This version scales a million-row array in 400ms, which is likely not
slow enough to justify trying to parallelize.

It also, in turn, assumes that you loaded it into a numpy array,
rather than Python lists.  Not knowing your data storage format, I
expect looking at the numpy.loadtxt function will be useful to you.

For some timing info, here's an example:

e0 = rc[0]
for p in range(3,7):
    n = 10**p
    A = numpy.random.random((n,7))
    print "%i elements" % n
    print "           scaleArray",
    %timeit sA = np.array(map(scaleArray, A))
    sys.stdout.flush()
    print "vectorized scaleArray",
    %timeit sA2 = scaleArray2(A)
    assert (sA == sA2).all()
    sys.stdout.flush()
    print "                 echo",
    %timeit e0.apply_sync(lambda x: x, A)
    sys.stdout.flush()

which gives:

1000 elements
           scaleArray 100 loops, best of 3: 16 ms per loop
vectorized scaleArray 1000 loops, best of 3: 225 us per loop
                 echo 100 loops, best of 3: 6.15 ms per loop
10000 elements
           scaleArray 10 loops, best of 3: 158 ms per loop
vectorized scaleArray 1000 loops, best of 3: 1.32 ms per loop
                 echo 100 loops, best of 3: 12.6 ms per loop
100000 elements
           scaleArray 1 loops, best of 3: 1.58 s per loop
vectorized scaleArray 10 loops, best of 3: 35.2 ms per loop
                 echo 10 loops, best of 3: 73.9 ms per loop
1000000 elements
           scaleArray 1 loops, best of 3: 16 s per loop
vectorized scaleArray 1 loops, best of 3: 367 ms per loop
                 echo 1 loops, best of 3: 667 ms per loop

Which shows that up to one million elements (and probably ad
infinitum), simply echoing the array, which is the minimum overhead
incurred by using IPython.parallel, is always slower than the
vectorized scaleArray function and thus should never result in
anything but slowing down the operation.

-MinRK

>
> Extra super special long version is the original email, below.
>
> Thanks and cheers, Gavin
>
> On 6/28/2012 2:59 PM, Junkshops wrote:
>> Hi all,
>>
>> Total noob to the iPython notebook and parallelization here, but so
>> far it's incredibly cool. My only minor gripe at this point is that
>> it's difficult to transfer notebook contents to other media formats,
>> such as this email (unless I'm missing a feature somewhere or there's
>> some clever way I haven't discovered). A download as txt or rtf
>> feature would be handy.
>>
>> Anyway, I've been reading the docs and playing with parallelization of
>> a simple routine, but so far I've only managed to make it much slower,
>> so I'm hoping someone might be able to give me some advice. I'm
>> worried that perhaps the calculations are so simple, no matter how I
>> parallelize it the costs of un/packing and transporting the objects
>> might dwarf the calcs.
>>
>> I'm running ipcluster and the notebook server on a VM with gobs of
>> processors and memory and very few users, and connecting FF7 to the nb
>> server via an ssh tunnel. Startup commands are thus:
>>
>> -bash-4.1$ ipcluster start -n 4 &
>> -bash-4.1$ ipython notebook --pylab inline --no-browser &
>>
>> Here's what I've tried so far:
>>
>> In [1]: from IPython.parallel import Client
>>             import time
>>
>> In [2]: c = Client()
>>             lview = c.load_balanced_view()
>>             dview = c[:]
>>             print lview, dview
>>
>> <LoadBalancedView None> <DirectView [0, 1, 2, 3]>
>>
>> In [7]: data[-10:]
>>
>> Out[7]:
>>
>> [['40', '20', '0.82', '0.99', '0.4', '7', '2569'],
>>  ['40', '25', '0.82', '0.99', '0.4', '7', '2569'],
>>  ['40', '10', '0.82', '0.99', '0.4', '8', '2569'],
>>  ['40', '15', '0.82', '0.99', '0.4', '8', '2569'],
>>  ['40', '20', '0.82', '0.99', '0.4', '8', '2569'],
>>  ['40', '25', '0.82', '0.99', '0.4', '8', '2569'],
>>  ['40', '10', '0.82', '0.99', '0.4', '9', '2569'],
>>  ['40', '15', '0.82', '0.99', '0.4', '9', '2569'],
>>  ['40', '20', '0.82', '0.99', '0.4', '9', '2569'],
>>  ['40', '25', '0.82', '0.99', '0.4', '9', '2569']]
>>
>> In [8]: len(data)
>>
>> Out[8]: 1000000
>>
>> In [9]: testdata = data[:100000]
>>
>> For the purposes of testing reduce dataset size. Results on the
>> smaller set more or less scale when the full set is processed.
>>
>> The following function could be improved for speed but for the point
>> of this test it doesn't matter.
>>
>> In [10]:
>>
>> def scaleArray(expt):
>>     return [int(expt[0]) / 5 - 2,
>>             int(expt[1]) / 5 - 2,
>>             (float(expt[2]) - 0.82) * 50,
>>             (float(expt[3]) - 0.91) * 100,
>>             float(expt[4]) * 10,
>>             int(expt[5]),
>>             int(expt[6])
>>             ]
>>
>> In [11]: t = time.clock()
>>             map(scaleArray, testdata)
>>             time.clock() - t
>>
>> Out[11]: 1.2399999999999993
>>
>> In [12]: t = time.clock()
>>             dview.map(scaleArray, testdata)
>>             time.clock() - t
>>
>> Out[12]: 19.82
>>
>> In [13]: t = time.clock()
>>             dview.map_sync(scaleArray, testdata)
>>             time.clock() - t
>>
>> Out[13]: 19.989999999999998
>>
>> In [14]: t = time.clock()
>>             asr = dview.map_async(scaleArray, testdata)
>>             asr.wait()
>>             time.clock() - t
>>
>> Out[14]: 21.949999999999996
>>
>> In [92]: t = time.clock()
>>               lview.map(scaleArray, data)
>>               time.clock() - t
>>
>> ---------------------------------------------------------------------------
>>
>> KeyboardInterrupt                         Traceback (most recent call
>> last)
>> *snip*
>>
>> The above command ran for ~9 hours with no results (vs ~10 min for a
>> direct view) on the full data set, so not bothering with the 10x
>> smaller set.
>>
>> In [15]:
>> def scaleArray2(data):
>>             scaled = []
>>             for expt in data:
>>                 scaled.append([int(expt[0]) / 5 - 2,
>>                 int(expt[1]) / 5 - 2,
>>                 (float(expt[2]) - 0.82) * 50,
>>                 (float(expt[3]) - 0.91) * 100,
>>                 float(expt[4]) * 10,
>>                 int(expt[5]),
>>                 int(expt[6])
>>                 ])
>>          return scaled
>>
>> def chunks(l, n):
>>     return [l[i:i+n] for i in range(0, len(l), n)]
>>
>> In [16]: chsize = len(testdata) / len(c.ids)
>>
>> In [17]: t = time.clock()
>>             map(scaleArray2, chunks(testdata,chsize))
>>             time.clock() - t
>>
>> Out[17]: 1.8100000000000023
>>
>> In [18]: t = time.clock()
>>             dview.map(scaleArray2, chunks(testdata,chsize))
>>             time.clock() - t
>>
>> Out[18]: 16.810000000000002
>>
>> In [19]: t = time.clock()
>>             dview.map_sync(scaleArray2, chunks(testdata,chsize))
>>             time.clock() - t
>>
>> Out[19]: 17.060000000000002
>>
>> In [20]: t = time.clock()
>>             asr = dview.map_async(scaleArray2, chunks(testdata,chsize))
>>             asr.wait()
>>             time.clock() - t
>>
>> Out[20]: 19.099999999999994
>>
>> In [21]: t = time.clock()
>>             dc = chunks(testdata, chsize)
>>             for i in range(0, len(dc)):
>>                 print "Submitting job to engine", i, "at", time.clock()
>>                 c[i].apply(scaleArray2, dc[i])
>>             time.clock() - t
>>
>> Submitting job to engine 0 at 123.88
>> Submitting job to engine 1 at 127.45
>> Submitting job to engine 2 at 130.98
>> Submitting job to engine 3 at 134.55
>>
>> Out[21]: 15.319999999999993
>>
>> In [25]: t = time.clock()
>>             dc = chunks(testdata, chsize)
>>             asrlist = []
>>             for i in range(0, len(dc)):
>>                 print "Submitting job to engine", i, "at", time.clock()
>>                 asrlist.append(c[i].apply_async(scaleArray2, dc[i]))
>>             for i in range(0, len(asrlist)):
>>                 asrlist[i].wait()
>>                 print "Job done on engine", i, "at", time.clock()
>>             time.clock() - t
>>
>> Submitting job to engine 0 at 222.45
>> Submitting job to engine 1 at 226.43
>> Submitting job to engine 2 at 230.41
>> Submitting job to engine 3 at 234.38
>> Job done on engine 0 at 249.41
>> Job done on engine 1 at 249.41
>> Job done on engine 2 at 249.41
>> Job done on engine 3 at 249.41
>>
>> Out[25]: 26.969999999999999
>>
>> Either I'm doing something very wrong (probably) or this type of
>> problem just isn't amenable to parallelization (which I don't *think*
>> should be the case, especially when the data set is just being broken
>> into four chunks and sent to four engines - but maybe the cost of
>> pickling and transporting all that data dwarfs the cost of running the
>> calculations).
>>
>> Anyway. If you haven't deleted this overly long email yet, I'd very
>> much appreciate any advice you can give me.
>>
>> Cheers, Gavin
>>
> _______________________________________________
> IPython-User mailing list
> IPython-User@scipy.org
> http://mail.scipy.org/mailman/listinfo/ipython-user


More information about the IPython-User mailing list