# [SciPy-User] using multiple processors for particle filtering

Andy Fraser afraser@lanl....
Mon Jun 7 11:18:10 CDT 2010

```Thanks to all who offered advice.  I am posting a final follow up in

The weight calculations were taking more than 97% of the time in my
code.  By putting the loop in c++ and using pthreads for the most time
consuming computations, I reduced the execution time by a factor of
68.  I used the boost multi_array library for arrays and the
numpy_boost code to do conversions from numpy to boost arrays.

Lessons learned:

1. As Zack says, for speed don't loop over data in python

2. Don't use python multiprocessing for this kind of problem

3. Don't put calls to python interface functions (eg,
PyObject_GetAttrString) in threaded c++ code.  Extract data (or at
least pointers) from python objects before starting multiple

4. Do use numpy_boost (http://code.google.com/p/numpy-boost/) and

Here are the key lines of c++:

/* Code that runs in separate thread.  The expensive calculations
are done by t_d->c->weight().  The ugly code here, sets up data
for that call using the single argument to this function.
*/
weight_data *t_d = (struct weight_data *) thread_arg;
array_d2_ref par_list = *t_d->par_list;
array_d1_ref w = *t_d->w;
for (int i=t_d->start; i<t_d->stop; i++){
array_d1_ref X = t_d->XQ->X[i];
array_d1_ref q = t_d->XQ->q[i];
for (int k=0;k<3;k++){
par_list[i][k+4] = X[k];
}
for (int k=0;k<4;k++){
par_list[i][k] = q[k];
}
w[i] = t_d->c->weight(X, q, t_d->t);
}
}

static PyObject * weights_list(PyObject *self, PyObject *args){

This function fills the numpy arrays 'w' and 'par_list' with
results of weight calculations for each plane in the list
'others'.  Calls to the 'weight' method of a c++ camera 'c' built
from 'Y', 'ref', 'data', 'mu' and 'Q' does the calculations.
*/
PyArrayObject *Py_ref, *Py_data;
PyObject *others, *dummy;
if (!PyArg_ParseTuple(args, "OiiOOOO&O&OO",
&others,
&t,
&dummy,
&dummy,
&dummy,
PyArray_Converter, &Py_ref,
PyArray_Converter, &Py_data,
&dummy,
&dummy
))
return NULL;

Py_ssize_t i = 3; // Starting argument for conversions
numpy_boost<double, 1> w(PySequence_GetItem(args, i++));
numpy_boost<double, 2> par_list(PySequence_GetItem(args, i++));
numpy_boost<double, 2> Y(PySequence_GetItem(args, i++));
PyImage ref = PyImage(Py_ref);
PyImage data = PyImage(Py_data);
Py_DECREF(Py_ref);
Py_DECREF(Py_data);
i += 2;
numpy_boost<double, 1> mu(PySequence_GetItem(args, i++));
numpy_boost<double, 2> Q(PySequence_GetItem(args, i++));

Xq XQ(others);
int N = PyList_Size(others);
Camera c = Camera::Camera(&Y, &ref, &data, &mu, &Q);
t_d[i].t = t;
t_d[i].XQ = &XQ;
t_d[i].w = &w;
t_d[i].par_list = &par_list;
t_d[i].c = &c;
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
/* Free attribute and wait for the other threads */
if (rc) {
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-1);
}
}
return Py_BuildValue("");
}

>>>>> "A" == Andy Fraser <afraser@lanl.gov> writes:

A> I am using a particle filter to estimate the trajectory of a
A> camera based on a sequence of images taken by the camera.  The
A> code is slow, but I have 8 processors in my desktop machine.
A> I'd like to use them to get results 8 times faster.  I've been
A> looking at the following sections of
A> http://docs.python.org/library: "16.6. multiprocessing" and
A> I don't have any experience with multiprocessing and would

A> Here is a bit of code that I want to modify:

A>         for i in xrange(len(self.particles)): self.particles[i]
A> = self.particles[i].random_fork()

A> Each particle is a class instance that represents a possible
A> camera state (position, orientation, and velocities).
A> particle.random_fork() is a method that moves the position and
A> orientation based on current velocities and then uses
A> numpy.random.standard_normal((N,)) to perturb the velocities.
A> I handle the correlation structure of the noise by matrices
A> that are members of particle, and I do some of the calculations
A> in c++.

A> I would like to do something like:

A>         for i in xrange(len(self.particles)): nv =
A> numpy.random.standard_normal((N,))
A> launch_on_any_available_processor( self.particles[i] =
A> self.particles[i].random_fork(nv) ) wait_for_completions()

A> But I don't see a command like
A> "launch_on_any_available_processor".  I would be grateful for