[IPython-User] Assigning a worker node

MinRK benjaminrk@gmail....
Sat Jan 21 18:38:01 CST 2012

On Fri, Jan 20, 2012 at 14:37, Caius Howcroft <caius.howcroft@gmail.com> wrote:
> Hi
> In a lot of our code with have things like this (psuedo code)
> def wibble(cfg):
>   n = cfg["n"]
>   f = openDataset("In%d.txt" %n)
>   fo = openDataset("Out%d.txt" %n)
>   for i in f:
>       fo.write(blah(i))
>   fo.close()
> lview = LoadBalancedView()
> ns = range(100)
> cfgs = []
> for i in n:
>  c = {"n": i,
>          ,,,
>   }
>  cfgs.append(c)
> lview.map(wibble, cfgs)
> Now that the input has got rather large I want to make sure the same
> machine is always assigned for the same N (perhaps even a specific
> machine) so I can store things locally. What would be the best way of
> doing this? Writing my own chooser function l(like lru, twobin,
> weighted ) wont work as those functions are only passed the machine
> loads and I need to know what args are being passed to the function
> being mapped. Any suggestions?

Well, you can use the `follow` dependency to force load-balanced tasks
to follow one another.  This doesn't work well with map, so you would
have to split up the tasks into individual `apply_async` requests

That is, instead of:

amr = view.map(f, inputfiles)

you would need to do a variation of:

# run your first job
amr = view.map_async(first_f, inputfiles)

# get the msg_ids of the first map, which we can use with `follow` later on:
anchors = amr.msg_ids

ars = []
for anchor, inputfile in zip(anchors, inputfiles):
    with view.temp_flags(follow=anchor): # any tasks submitted in this
block will execute on the same engine as the anchor
        ars.append(view.apply_async(f, inputfile))

This would mean that each task for a given input file will always end
up on the same engine that got the input file in the first map.

At this point, you would have a list of AsyncResult objects pointing
to each element in what would have been a map, which you can work with
in a very similar way to one big AsyncMapResult.

Now, what might actually be easier would be to use DirectView.map,
which always maps a given number of arguments the same way.  Sometimes
it's easier to let IPython decide where things should go
(LoadBalancedView), and sometimes it's easier to define the routing
yourself, and break it up with DirectViews.

You can also have some mix of this by creating LoadBalancedViews with
a specified `targets` attribute.  These will only load-balance across
that subset of the total engines, allowing you to load-balance among
engines restricted to a single multicore machine.

You can get a mapping of engines to machines with a clever use of

rc = Client()
import socket
dv = rc[:]
ar = dv.apply_async(socket.gethostname)
# dict of hostnames keyed by engine id:
engine_to_host = ar.get_dict()
# reverse the dict, so we have engine ids keyed by hostname:
machines = {}
for engine_id, host in engine_to_host.iteritems():
    if host not in machines:
        machines[host] = []

# now we have a dict whose keys are hostnames (machines)
# and whose values are the list of engine ids running on each machine

# and we can use this create a load-balanced view for each machine:
views = [ rc.load_balanced_view(engine_ids) for engine_ids in
machines.values() ]

# view.targets is the list of engine_ids, so if there is a nonuniform
distribution, you may
# make use of len(view.targets) for distributing work evenly.

# Now, instead of using a single view.map, you would use one for each view:
stride = len(views)
maps = [ view.map(f, inputfiles[i::stride]) for i,view in enumerate(views) ]

Sorry if that was a bit elaborate and long-winded, but hopefully it
helps point you in the right direction.  Feel free to ask for any
further clarifications.


> Thanks in advance
> Caius
> _______________________________________________
> IPython-User mailing list
> IPython-User@scipy.org
> http://mail.scipy.org/mailman/listinfo/ipython-user

More information about the IPython-User mailing list