[SciPy-User] using multiple processors for particle filtering

Andy Fraser afraser@lanl....
Mon Jun 7 11:18:10 CDT 2010


Thanks to all who offered advice.  I am posting a final follow up in
case anyone reads the thread looking for a solution.

The weight calculations were taking more than 97% of the time in my
code.  By putting the loop in c++ and using pthreads for the most time
consuming computations, I reduced the execution time by a factor of
68.  I used the boost multi_array library for arrays and the
numpy_boost code to do conversions from numpy to boost arrays.

Lessons learned:

1. As Zack says, for speed don't loop over data in python

2. Don't use python multiprocessing for this kind of problem

3. Don't put calls to python interface functions (eg,
   PyObject_GetAttrString) in threaded c++ code.  Extract data (or at
   least pointers) from python objects before starting multiple
   threads.

4. Do use numpy_boost (http://code.google.com/p/numpy-boost/) and
   pthreads.

Here are the key lines of c++:

void *weight_thread(void *thread_arg){
  /* Code that runs in separate thread.  The expensive calculations
     are done by t_d->c->weight().  The ugly code here, sets up data
     for that call using the single argument to this function.
   */
  weight_data *t_d = (struct weight_data *) thread_arg;
  array_d2_ref par_list = *t_d->par_list;
  array_d1_ref w = *t_d->w;
  for (int i=t_d->start; i<t_d->stop; i++){
    array_d1_ref X = t_d->XQ->X[i];
    array_d1_ref q = t_d->XQ->q[i];
    for (int k=0;k<3;k++){
      par_list[i][k+4] = X[k];
    }
    for (int k=0;k<4;k++){
      par_list[i][k] = q[k];
    }
    w[i] = t_d->c->weight(X, q, t_d->t);
  }
  pthread_exit(NULL);
}

static PyObject * weights_list(PyObject *self, PyObject *args){
  /* Python call: weights_list(others,t,N_threads,w,par_list,Y,ref,data,mu,Q)

     This function fills the numpy arrays 'w' and 'par_list' with
     results of weight calculations for each plane in the list
     'others'.  Calls to the 'weight' method of a c++ camera 'c' built
     from 'Y', 'ref', 'data', 'mu' and 'Q' does the calculations.
  */
  PyArrayObject *Py_ref, *Py_data;
  PyObject *others, *dummy;
  int t, N_threads;
  if (!PyArg_ParseTuple(args, "OiiOOOO&O&OO",
			&others,
			&t,
			&N_threads,
			&dummy,
			&dummy,
			&dummy,
			PyArray_Converter, &Py_ref,
			PyArray_Converter, &Py_data,
			&dummy,
			&dummy
			))
    return NULL;

  Py_ssize_t i = 3; // Starting argument for conversions
  numpy_boost<double, 1> w(PySequence_GetItem(args, i++));
  numpy_boost<double, 2> par_list(PySequence_GetItem(args, i++));
  numpy_boost<double, 2> Y(PySequence_GetItem(args, i++));
  PyImage ref = PyImage(Py_ref);
  PyImage data = PyImage(Py_data);
  Py_DECREF(Py_ref);
  Py_DECREF(Py_data);
  i += 2;
  numpy_boost<double, 1> mu(PySequence_GetItem(args, i++));
  numpy_boost<double, 2> Q(PySequence_GetItem(args, i++));

  Xq XQ(others);
  int N = PyList_Size(others);
  Camera c = Camera::Camera(&Y, &ref, &data, &mu, &Q);
  weight_data t_d[N_threads];
  pthread_t t_id[N_threads];
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i=0; i<N_threads; i++){
    t_d[i].t = t;
    t_d[i].start = (i*N)/N_threads;
    t_d[i].stop = ((i+1)*N)/N_threads;
    t_d[i].XQ = &XQ;
    t_d[i].w = &w;
    t_d[i].par_list = &par_list;
    t_d[i].c = &c;
    int rc = pthread_create(&t_id[i], &attr, weight_thread, (void *)&t_d[i]);
    if (rc) {
      printf("ERROR; return code from pthread_create() is %d\n", rc);
      exit(-1);
    }
  }
  /* Free attribute and wait for the other threads */
  pthread_attr_destroy(&attr);
  for(int i=0; i<N_threads; i++) {
    int rc = pthread_join(t_id[i], NULL);
    if (rc) {
      printf("ERROR; return code from pthread_join() is %d\n", rc);
      exit(-1);
    }
  }
  return Py_BuildValue("");
}

>>>>> "A" == Andy Fraser <afraser@lanl.gov> writes:

    A> I am using a particle filter to estimate the trajectory of a
    A> camera based on a sequence of images taken by the camera.  The
    A> code is slow, but I have 8 processors in my desktop machine.
    A> I'd like to use them to get results 8 times faster.  I've been
    A> looking at the following sections of
    A> http://docs.python.org/library: "16.6. multiprocessing" and
    A> "16.2. threading".  I've also read some discussion from 2006 on
    A> scipy-user@scipy.org about seeds for random numbers in threads.
    A> I don't have any experience with multiprocessing and would
    A> appreciate advice.

    A> Here is a bit of code that I want to modify:

    A>         for i in xrange(len(self.particles)): self.particles[i]
    A> = self.particles[i].random_fork()

    A> Each particle is a class instance that represents a possible
    A> camera state (position, orientation, and velocities).
    A> particle.random_fork() is a method that moves the position and
    A> orientation based on current velocities and then uses
    A> numpy.random.standard_normal((N,)) to perturb the velocities.
    A> I handle the correlation structure of the noise by matrices
    A> that are members of particle, and I do some of the calculations
    A> in c++.

    A> I would like to do something like:

    A>         for i in xrange(len(self.particles)): nv =
    A> numpy.random.standard_normal((N,))
    A> launch_on_any_available_processor( self.particles[i] =
    A> self.particles[i].random_fork(nv) ) wait_for_completions()

    A> But I don't see a command like
    A> "launch_on_any_available_processor".  I would be grateful for
    A> any advice.


More information about the SciPy-User mailing list