[SciPy-User] using multiple processors for particle filtering
Andy Fraser
afraser@lanl....
Mon Jun 7 11:18:10 CDT 2010
Thanks to all who offered advice. I am posting a final follow up in
case anyone reads the thread looking for a solution.
The weight calculations were taking more than 97% of the time in my
code. By putting the loop in c++ and using pthreads for the most time
consuming computations, I reduced the execution time by a factor of
68. I used the boost multi_array library for arrays and the
numpy_boost code to do conversions from numpy to boost arrays.
Lessons learned:
1. As Zack says, for speed don't loop over data in python
2. Don't use python multiprocessing for this kind of problem
3. Don't put calls to python interface functions (eg,
PyObject_GetAttrString) in threaded c++ code. Extract data (or at
least pointers) from python objects before starting multiple
threads.
4. Do use numpy_boost (http://code.google.com/p/numpy-boost/) and
pthreads.
Here are the key lines of c++:
void *weight_thread(void *thread_arg){
/* Code that runs in separate thread. The expensive calculations
are done by t_d->c->weight(). The ugly code here, sets up data
for that call using the single argument to this function.
*/
weight_data *t_d = (struct weight_data *) thread_arg;
array_d2_ref par_list = *t_d->par_list;
array_d1_ref w = *t_d->w;
for (int i=t_d->start; i<t_d->stop; i++){
array_d1_ref X = t_d->XQ->X[i];
array_d1_ref q = t_d->XQ->q[i];
for (int k=0;k<3;k++){
par_list[i][k+4] = X[k];
}
for (int k=0;k<4;k++){
par_list[i][k] = q[k];
}
w[i] = t_d->c->weight(X, q, t_d->t);
}
pthread_exit(NULL);
}
static PyObject * weights_list(PyObject *self, PyObject *args){
/* Python call: weights_list(others,t,N_threads,w,par_list,Y,ref,data,mu,Q)
This function fills the numpy arrays 'w' and 'par_list' with
results of weight calculations for each plane in the list
'others'. Calls to the 'weight' method of a c++ camera 'c' built
from 'Y', 'ref', 'data', 'mu' and 'Q' does the calculations.
*/
PyArrayObject *Py_ref, *Py_data;
PyObject *others, *dummy;
int t, N_threads;
if (!PyArg_ParseTuple(args, "OiiOOOO&O&OO",
&others,
&t,
&N_threads,
&dummy,
&dummy,
&dummy,
PyArray_Converter, &Py_ref,
PyArray_Converter, &Py_data,
&dummy,
&dummy
))
return NULL;
Py_ssize_t i = 3; // Starting argument for conversions
numpy_boost<double, 1> w(PySequence_GetItem(args, i++));
numpy_boost<double, 2> par_list(PySequence_GetItem(args, i++));
numpy_boost<double, 2> Y(PySequence_GetItem(args, i++));
PyImage ref = PyImage(Py_ref);
PyImage data = PyImage(Py_data);
Py_DECREF(Py_ref);
Py_DECREF(Py_data);
i += 2;
numpy_boost<double, 1> mu(PySequence_GetItem(args, i++));
numpy_boost<double, 2> Q(PySequence_GetItem(args, i++));
Xq XQ(others);
int N = PyList_Size(others);
Camera c = Camera::Camera(&Y, &ref, &data, &mu, &Q);
weight_data t_d[N_threads];
pthread_t t_id[N_threads];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<N_threads; i++){
t_d[i].t = t;
t_d[i].start = (i*N)/N_threads;
t_d[i].stop = ((i+1)*N)/N_threads;
t_d[i].XQ = &XQ;
t_d[i].w = &w;
t_d[i].par_list = &par_list;
t_d[i].c = &c;
int rc = pthread_create(&t_id[i], &attr, weight_thread, (void *)&t_d[i]);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
/* Free attribute and wait for the other threads */
pthread_attr_destroy(&attr);
for(int i=0; i<N_threads; i++) {
int rc = pthread_join(t_id[i], NULL);
if (rc) {
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-1);
}
}
return Py_BuildValue("");
}
>>>>> "A" == Andy Fraser <afraser@lanl.gov> writes:
A> I am using a particle filter to estimate the trajectory of a
A> camera based on a sequence of images taken by the camera. The
A> code is slow, but I have 8 processors in my desktop machine.
A> I'd like to use them to get results 8 times faster. I've been
A> looking at the following sections of
A> http://docs.python.org/library: "16.6. multiprocessing" and
A> "16.2. threading". I've also read some discussion from 2006 on
A> scipy-user@scipy.org about seeds for random numbers in threads.
A> I don't have any experience with multiprocessing and would
A> appreciate advice.
A> Here is a bit of code that I want to modify:
A> for i in xrange(len(self.particles)): self.particles[i]
A> = self.particles[i].random_fork()
A> Each particle is a class instance that represents a possible
A> camera state (position, orientation, and velocities).
A> particle.random_fork() is a method that moves the position and
A> orientation based on current velocities and then uses
A> numpy.random.standard_normal((N,)) to perturb the velocities.
A> I handle the correlation structure of the noise by matrices
A> that are members of particle, and I do some of the calculations
A> in c++.
A> I would like to do something like:
A> for i in xrange(len(self.particles)): nv =
A> numpy.random.standard_normal((N,))
A> launch_on_any_available_processor( self.particles[i] =
A> self.particles[i].random_fork(nv) ) wait_for_completions()
A> But I don't see a command like
A> "launch_on_any_available_processor". I would be grateful for
A> any advice.
More information about the SciPy-User
mailing list