[IPython-User] Iterating async results?

Jon Olav Vik jonovik@gmail....
Sun Jun 3 02:32:50 CDT 2012


Darren Govoni <darren <at> ontrenet.com> writes:

> Hi,  I was looking at the docs for iterating over async results map.I couldn't
> understand the code provided in the example, because itdidn't seem to iterate 
> over async results (maybe a c&p error?).

It would help if you could refer to the code example (url?) and explain in more 
detail what you expected and what happens instead.

> So my question is how to use load 
> balanced view to apply async map and then asynchronously iterate over results 
> (as they arrive or discover results that are complete) and get those results. 

Hope this helps:

First, make sure an ipcluster is running (e.g. run "ipcluster start" from a 
separate command window).

Then, paste the following into IPython. Example output is given below, with 
comments.

import time
import numpy as np
from IPython.parallel import Client

c = Client()
dv = c[:]  # direct view
lv = c.load_balanced_view()  # load balanced view


@lv.parallel(ordered=False)
def func(i, x):
    import os
    import time
    time.sleep(x)
    return i, os.getpid()

@lv.parallel()
def func_ordered(i, x):
    import os
    import time
    time.sleep(x)
    return i, os.getpid()

ii = np.arange(5)
xx = np.r_[0.3, 0.1, 0.5, 0.4, 0.2]

t0 = time.time()
print "time i  pid"
for i, pid in func.map(ii, xx):
    print "%.2f" % (time.time() - t0), i, pid

print

t0 = time.time()
print "time i  pid"
for i, pid in func_ordered.map(ii, xx):
    print "%.2f" % (time.time() - t0), i, pid

## -- End pasted text --

I think the func example above does what you want. Note how process 5400 
completes task 1 (wait for 0.1 s), then proceeds with task 4 (wait for 0.2 s), 
and finishes at about the same time as process 5676 is done with task 0 (wait 
for 0.3 s).

time i  pid
0.11 1 5400
0.31 0 5676
0.32 4 5400
0.42 3 7868
0.52 2 5212

However, by default load balanced views have ordered=True, meaning they won't 
return a result until all the previous ones are available. Here, tasks 0 and 1 
both arrive after about 0.3 s. The total time is the same, though, showing that 
execution is load balanced. However, if you want to watch progress, or have 
post-processing that can be done in parallel, ordered=False is useful.

time i  pid
0.31 0 5676
0.31 1 7868
0.51 2 5400
0.52 3 5212
0.52 4 7868

Note also how I used func.map(ii, xx) above, not func(ii, xx) directly. The 
former will pass single items of ii and xx to the original function, whereas 
the latter will pass sub-sequences of ii and xx.

> I also want metadata (e.g. timing) for those results.

Me too! Above I passed a task identifier (i) to keep track of what's happening, 
but what I'd really want is some way to return (metadata, result) from the 
iterator. Something like enumerate(iterator) might be a good syntax:

for metadata, result in func.annotate(func.map(...)):
    ...

Lots of metadata are collected:
http://ipython.org/ipython-doc/dev/parallel/parallel_db.html
though I think this is per-chunk, so if several workpieces are passed to an 
engine at once (to reduce communication overhead), you won't have separate 
statistics for each workpiece. I guess that's the way it must be.

Such an iterator would also be a nice occasion to purge metadata and results 
from the hub and client. Currently, it seems that needs to be done manually, as 
discussed here:

http://article.gmane.org/gmane.comp.python.ipython.user/8326


Hope this helps,
Jon Olav



More information about the IPython-User mailing list