[IPython-User] Iterating async results?
Jon Olav Vik
jonovik@gmail....
Sun Jun 3 02:32:50 CDT 2012
Darren Govoni <darren <at> ontrenet.com> writes:
> Hi, I was looking at the docs for iterating over async results map.I couldn't
> understand the code provided in the example, because itdidn't seem to iterate
> over async results (maybe a c&p error?).
It would help if you could refer to the code example (url?) and explain in more
detail what you expected and what happens instead.
> So my question is how to use load
> balanced view to apply async map and then asynchronously iterate over results
> (as they arrive or discover results that are complete) and get those results.
Hope this helps:
First, make sure an ipcluster is running (e.g. run "ipcluster start" from a
separate command window).
Then, paste the following into IPython. Example output is given below, with
comments.
import time
import numpy as np
from IPython.parallel import Client
c = Client()
dv = c[:] # direct view
lv = c.load_balanced_view() # load balanced view
@lv.parallel(ordered=False)
def func(i, x):
import os
import time
time.sleep(x)
return i, os.getpid()
@lv.parallel()
def func_ordered(i, x):
import os
import time
time.sleep(x)
return i, os.getpid()
ii = np.arange(5)
xx = np.r_[0.3, 0.1, 0.5, 0.4, 0.2]
t0 = time.time()
print "time i pid"
for i, pid in func.map(ii, xx):
print "%.2f" % (time.time() - t0), i, pid
print
t0 = time.time()
print "time i pid"
for i, pid in func_ordered.map(ii, xx):
print "%.2f" % (time.time() - t0), i, pid
## -- End pasted text --
I think the func example above does what you want. Note how process 5400
completes task 1 (wait for 0.1 s), then proceeds with task 4 (wait for 0.2 s),
and finishes at about the same time as process 5676 is done with task 0 (wait
for 0.3 s).
time i pid
0.11 1 5400
0.31 0 5676
0.32 4 5400
0.42 3 7868
0.52 2 5212
However, by default load balanced views have ordered=True, meaning they won't
return a result until all the previous ones are available. Here, tasks 0 and 1
both arrive after about 0.3 s. The total time is the same, though, showing that
execution is load balanced. However, if you want to watch progress, or have
post-processing that can be done in parallel, ordered=False is useful.
time i pid
0.31 0 5676
0.31 1 7868
0.51 2 5400
0.52 3 5212
0.52 4 7868
Note also how I used func.map(ii, xx) above, not func(ii, xx) directly. The
former will pass single items of ii and xx to the original function, whereas
the latter will pass sub-sequences of ii and xx.
> I also want metadata (e.g. timing) for those results.
Me too! Above I passed a task identifier (i) to keep track of what's happening,
but what I'd really want is some way to return (metadata, result) from the
iterator. Something like enumerate(iterator) might be a good syntax:
for metadata, result in func.annotate(func.map(...)):
...
Lots of metadata are collected:
http://ipython.org/ipython-doc/dev/parallel/parallel_db.html
though I think this is per-chunk, so if several workpieces are passed to an
engine at once (to reduce communication overhead), you won't have separate
statistics for each workpiece. I guess that's the way it must be.
Such an iterator would also be a nice occasion to purge metadata and results
from the hub and client. Currently, it seems that needs to be done manually, as
discussed here:
http://article.gmane.org/gmane.comp.python.ipython.user/8326
Hope this helps,
Jon Olav
More information about the IPython-User
mailing list