[Numpy-discussion] Thoughts on persistence/object tracking in scientific code

Luis Pedro Coelho lpc@cmu....
Mon Dec 29 13:51:48 CST 2008


Hello,

I coincidently started my own implementation of a system to manage 
intermediate results last week, which I called jug. I wasn't planning to make 
such an alpha version public just now, but it seems to be on topic.

The main idea is to use hashes to map function arguments to paths on the 
filesystem, which store the result (nothing extraordinary here). I also added 
the capability of having tasks (the basic unit) take the results of other 
tasks and defining an implicit dependency DAG. A simple locking mechanism 
enables light-weight task-level parellization (this was the second of my 
goals: help me make my stuff parallel).

A trick that helps is that I don't really use the argument values to hash 
(which would be unwieldy for big arrays). I use the computation path (e.g., 
this is the value obtained from f(g('something'),2)). Since, at least in my 
problems, things tend to always map back into simple file-system paths, the 
hash computation doesn't even need to load the intermediate results.

I will make the git repository publicly available once I figure out how to do 
that.

I append the tutorial  I wrote, which explains the system.

HTH,
Luís Pedro Coelho
PhD Student in Computational Biology
Carnegie Mellon University

============
Jug Tutorial
============

What is jug?
------------

Jug is a simple way to write easily parallelisable programs in Python. It also 
handles intermediate results for you.

Example
-------

This is a simple worked-through example which illustrates what jug does.

Problem
~~~~~~~

Assume that I want to do the following to a collection of images:

    (1) for each image, compute some features
    (2) cluster these features using k-means. In order to find out the number 
of clusters, I try several values and pick the best result. For each value of 
k, because of the random initialisation, I run the clustering 10 times.

I could write the following simple code:

::

    imgs = glob('*.png')
    features = [computefeatures(img,parameter=2) for img in imgs]
    clusters = []
    bics = []
    for k in xrange(2,200):
        for repeat in xrange(10):
            clusters.append(kmeans(features,k=k,random_seed=repeat))
            bics.append(compute_bic(clusters[-1]))
    Nr_clusters = argmin(bics) // 10

Very simple and solves the problem. However, if I want to take advantage of 
the obvious parallelisation of the problem, then I need to write much more 
complicated code. My traditional approach is to break this down into smaller 
scripts. I'd have one to compute features for some images, I'd have another to 
merge all the results together and do some of the clustering, and, finally, one 
to merge all the results of the different clusterings. These would need to be 
called with different parameters to explore different areas of the parameter 
space, so I'd have a couple of scripts just for calling the main computation 
scripts. Intermediate results would be saved and loaded by the different 
processes.

This has several problems. The biggest are

    (1) The need to manage intermediate files. These are normally files with 
long names like *features_for_img_0_with_parameter_P.pp*.
    (2) The code gets much more complex.

There are minor issues with having to issue several jobs (and having the 
cluster be idle in the meanwhile), or deciding on how to partition the jobs so 
that they take roughly the same amount of time, but the two above are the main 
ones.

Jug solves all these problems!

Tasks
~~~~~

The main unit of jug is a Task. Any function can be used to generate a Task. A 
Task can depend on the results of other Tasks.

The original idea for jug was a Makefile-like environment for declaring Tasks. 
I have moved beyond that, but it might help you think about what Tasks are.

You create a Task by giving it a function which performs the work and its 
arguments. The arguments can be either literal values or other tasks (in which 
case, the function will be called with the *result* of those tasks!). Jug also 
understands lists of tasks (all standard Python containers will be supported 
in a later version). For example, the following code declares the necessary 
tasks for our problem:

::

    imgs = glob('*.png')
    feature_tasks = [Task(computefeatures,img,parameter=2) for img in imgs]
    cluster_tasks = []
    bic_tasks = []
    for k in xrange(2,200):
        for repeat in xrange(10):
            cluster_tasks.append(Task(kmeans,feature_tasks,k=k,random_seed=repeat))
            bic_tasks.append(Task(compute_bic,cluster_tasks[-1]))
    Nr_clusters = Task(argmin,bic_tasks)

Task Generators
~~~~~~~~~~~~~~~

In the code above, there is a lot of code of the form *Task(function,args)*, 
so maybe it should read *function(args)*.  A simple helper function aids this 
process:

::

    from jug.task import Task

    def TaskGenerator(function):
        def gen(*args,**kwargs):
            return Task(function,*args,**kwargs)

        return gen

    computefeatures = TaskGenerator(computefeatures)
    kmeans = TaskGenerator(kmeans)
    compute_bic = TaskGenerator(compute_bic)

    @TaskGenerator
    def Nr_Clusters(bics):
        return argmin(bics) // 10

    imgs = glob('*.png')
    features = [computefeatures(img,parameter=2) for img in imgs]
    clusters = []
    bics = []
    for k in xrange(2,200):
        for repeat in xrange(10):
            clusters.append(kmeans(features,k=k,random_seed=repeat))
            bics.append(compute_bic(clusters[-1]))
    Nr_clusters(bics)

You can see that this code is almost identical to our original sequential 
code, except for the declarations at the top and the fact that *Nr_clusters* 
is now a function (actually a TaskGenerator, look at the use of a declarator).

This file is called the jugfile (you should name it *jugfile.py* on the 
filesystem) and specifies your problem. Of course, *TaskManager* is already a 
part of jug and those first few lines could have read

::

    from jug.task import TaskGenerator

Jug
~~~

So far, we have achieved seemingly little. We have turned a simple piece of 
sequential code into something that generates Task objects, but does not 
actually perform any work. The final piece is jug. Jug takes these Task objects 
and runs them. It's main loop is basically

::

    while len(tasks) > 0:
        for t in tasks:
            if can_run(t): # ensures that all dependencies have been run
                if need_to_run(t) and not is_running(t):
                    t.run()
                tasks.remove(t)

If you run jug on the script above, you will simply have reproduced the 
original code with the added benefit of having all the intermediate results 
saved.

The interesting is what happens when you run several instances of jug at the 
same time. They will start running Tasks, but each instance will run its own 
tasks. This allows you to take advantage of multiple processors in a way that 
keeps the processors all occupied as long as there is work to be done, handles 
the implicit dependencies, and passes functions the right values. Note also 
that, unlike more traditional parallel processing frameworks (like MPI), jug 
has no problems with the number of participating processors varying throughout 
the job.

Behind the scenes, jug is using the filesystem to both save intermediate 
results (which get passed around) and to lock running tasks so that each task 
is only run once (the actual main loop is thus a bit more complex than shown 
above).

Intermediate and Final Results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can obtain the final results of your computation by setting up a task that 
saves them to disk and loading them from there. If the results of your 
computation are simple enough, this might be the simplest way.

Another way, which is also the way to access the intermediate results if you 
want them, is to run the jug script and then call the *load()* method on 
Tasks. For example,

::

    img = glob('*.png')
    features = [computefeatures(img,parameter=2) for img in imgs]
    ...
    
    feature_values = [feat.load() for feat in features]

If the values are not accessible, this raises an exception.

Advantages
----------

jug is an attempt to get something that works in the setting that I have found 
myself in: code that is *embarissingly parallel* with a couple of points where 
all the results of previous processing are merged, often in a simple way.  It 
is also a way for me to manage either the explosion of temporary files that 
plagued my code and the brittleness of making sure that all results from 
separate processors are merged correctly in my *ad hoc* scripts.

Limitations
-----------

This is not an attempt to replace MPI in any way. For code that has more merge 
points, this won't do. It also won't do if the individual tasks are so small 
that the over-head of managing them swamps out the performance gains of 
parallelisation. In my code, most of the times, each task takes 20 seconds to 
a few minutes. Just enough to make the managing time irrelevant, but fast 
enough that the main job can be broken into thousands of tiny pieces.

The system makes it too easy to save all intermediate results and run out of 
disk space.

This is still Python, not a true parallel programming language. The 
abstraction will sometimes leak through, for example, if you try to pass a 
Task to a function which expects a real value. Recall how we had to re-write 
the line *Nr_clusters = argmin(bics) // 10* above.

Planned Capabilities
--------------------

Here are a couple of simple improvements I plan to make at some point:

    * jug.py cleanup: removes left-over locks, temporary files, and unsused 
results.
    * Stop & re-start. Currently, jug processes will exit if they can't make 
any progress for a while. In the future, I'd like them to be unblockable by 
other jug processes.
    * No result tasks. Task-like objects that don't save intermediate results.
    * Have tasks be passed inside *sets* and *dictionaries*. Maybe even 
*numpy* arrays! This will make jug even more like a real parallel programming 
language.
    * If the original arguments are files on disk, then jug should check their 
modification date and invalidate subsequent results.



More information about the Numpy-discussion mailing list