[SciPy-user] FIR Filters

Bob.Cowdery at CGI-Europe.com Bob.Cowdery at CGI-Europe.com
Fri Feb 27 14:10:39 CST 2004


With your help I am making progress. I started off using numarray, then ran
into problems so I am now using Numeric. I only import scipy so I am using
whatever it imports which I believe is Numeric. I have tried to follow your
advice and move to complex types as I can see it makes sense. I don't think
I have done a complete job yet. The audio output has a pulse type modulation
on it which should not be there, however selecting different passbands is
definitely having the desired effect. I have copied all the code in below
for context. Yes the filtering is in the frequency domain which I am told
gives better stopband attenuation and good shape. The way the filter and
demodulation works is still a bit of a mystery to me as I didn't write the
original code. I would love to work out exactly what it does but I'm not
quite there yet. I can't really ask very targetted questions yet until I
understand more. The code cleary still has problems but I don't know why or
where. The good news is the processor is down to 50% now from 80%. If you do
have a few moments perhaps you could point out any more obvious errors.


import time
import threading
import fastaudio as f
from scipy import *

class dsp( threading.Thread ):

    # signal processing
    def sig( self, samples ) :
        # allocate the filter arrays
        global m_filterMagn
        global m_filterPhase
        # convert to a numeric array of type double
        x = asarray(samples, typecode=Float64)
        # get a complex array ordered from the I,Q source
        complexin = x[1::2] + 1j*x[::2]
        # convert to complex frequency domain
        cart_complex_in_fd = fft(complexin)
        # apply filter to magnitude and phase
        magn = abs(complexin)*m_filterMagn
        phase = angle(complexin)*m_filterPhase
        # polar to cartesian in frequency domain
        demodulated = magn * cos(phase) + 1j*(magn * sin(phase))
        # convert to time domain
        complexout = ifft(demodulated)
        # convert back to interleaved array
        x[1::2] = real(complexout)
        x[::2] = real(complexout)
        # convert back to an integer array
        x = x.astype('l')
        # return sequence of processed samples
        return x
    def firwin(self, N, cutoff, width=None, window='blackman'):
        FIR Filter Design using windowed ideal filter method.

          N      -- order of filter (number of taps)
          cutoff -- cutoff frequency of filter 
                      (normalized so that 1 corresponds to Nyquist or pi
radians / sample)
          width  -- if width is not None, then assume it is the approximate
width of
                      the transition region (normalized so that 1 corresonds
to pi)
                      for use in kaiser FIR filter design.
          window -- desired window to use.


          h      -- coefficients of length N fir filter.

        if isinstance(width,float):
            A = 2.285*N*width + 8
            if (A < 21): beta = 0.0
            elif (A <= 50): beta = 0.5842*(A-21)**0.4 + 0.07886*(A-21)
            else: beta = 0.1102*(A-8.7)

        win = signal.get_window(window,N,fftbins=1)
        alpha = N//2
        m = signal.Num.arange(0,N)
        return win*special.sinc(cutoff*(m-alpha))
    # calculate the complex filter
    def filter_calc( self, fLow, fHigh ):
        global m_filterMagn
        global m_filterPhase

        m_filterMagn = zeros(2048)
        m_filterPhase = zeros(2048)
        # create empty arrays to hold results
        Rh = zeros(2048)
        Ih = zeros(2048)
        # calculate the high/low cutoff freq as a fraction of the sample
        fhc = fHigh/44100.0
        flc = fLow/44100.0
        # calculate fft bin size
        bin_size = 44100.0/4096.0
        # calculate no of bins in filter width
        bins = fHigh/bin_size
        # calculate FIR lowpass filter coefficients
        Rh = self.firwin(2048.0,(fhc-flc)/2.0)
        # compute USB filter by modulating the lowpass filter up
        filterPhase = 0.0
        filterFreq = (flc+fhc)*pi
        phaseSin = 0
        phaseCos = 0

        # modulate to USB centre frequency
        i = 0
        while i< 2048 :
            phaseSin = sin(filterPhase)
            phaseCos = cos(filterPhase)
            Ih[i] = Rh[i]*phaseSin
            Rh[i] = Rh[i]*phaseCos
            filterPhase = filterPhase+filterFreq
            if filterPhase >= 6.28318530717959 :
                filterPhase = filterPhase - 6.28318530717959
            i = i+1
        # for LSB create conjugate of USB filter
        # comment out for LSB
        Ih = -Ih
        # compute complex frequency domain response of LSB or USB filter
        complexin = Rh + 1j*Ih
        complexfd = fft(complexin)
        # Cartesian to polar gives the magnitude and phase components of the
fft result
        m_filterMagn = abs(complexfd)
        m_filterPhase = angle(complexfd)
    # start streaming
    def run( self ):

        global running
        running = 1
        #s = f.stream(7500,       # sample rate
        #             2,          # number of channels
        #             'int16',    # sample format, choose from 'int8',
'int16', 'int32'.
        #             4096,       # frames per buffer
        #             16)         # maximum number of input buffers

        # open stream for input/output
        print "Create, open and start stream"
        global s
        s = f.stream(44100,2,'int16',2048,4)

        # open and start streaming

        print "Thread running..."
        # calculate the SSB filter
        while( running ):

            # read next block of data from the input buffer
            samples = s.read()

            if( len( samples ) > 0 ):
                demod = self.sig( samples )
                # now, write the data back
                s.write( demod )

            # sleep for 10 ms
            # a stero block of 4096 samples takes 46 ms so 10ms should avoid
dropped blocks
        print "Thread exiting..."

def start():
    global DSP
    DSP = dsp()
def stop():
    # stop the stream
    global running
    running = 0
    global s
    # and close it
def filter(id):
    # set the filter passband
    global DSP
    if id == 0:
    elif id == 1:
    elif id == 2:

-----Original Message-----
From: Travis Oliphant [mailto:oliphant at ee.byu.edu] 
Sent: 26 February 2004 22:38
To: SciPy Users List
Subject: Re: [SciPy-user] FIR Filters

Bob.Cowdery at CGI-Europe.com wrote:

> Travis
> Thanks very much for the responses. Don't confuse me with someone who
> knows what they are doing (but I am working on it!). To give you some 
> background I am converting a .NET application that uses Intel IPP to 
> Python. The application is for a software defined radio. At the moment 
> I have audio capture which contains my quadrature samples in blocks of 
> 4096 (2048 each for I&Q) with 44100 sampling rate (46ms per block). I 
> have a simple dsp process to test each signal processing call. At the 
> moment this should output exactly what it receives, i.e just the audio 
> input as the demodulator needs the FIR filter and some magic applied 
> at the point indicated in the code. When I have written the 
> demodulator I can test firwin() properly. At the moment I can see it 
> outputs valid data, and it's fast which is good. The code below is the 
> very simple path. I have had to write two functions for 
> polar/cartesian conversions as I didn't find any in the library. I 
> don't know if these work right as I can hear clicking sounds on the 
> audio not there when they are commented out. However, it could be 
> audio overrun as I am up at 75% processor. The functions are also 
> quite slow (adding 30% to
> my processor loading). Is there a more efficient way to do this?

All the best in your education...


Are you using numarray?  if so then that might cause a slower 
application than using Numeric (older but often faster arraymodule)

Comments on your code below:

>     def sigproc( self, samples ) :       
>         # convert to a numarray array of type double
>         x = asarray(samples, typecode=Float64)
            is this numarray.asarray or Numeric.asarray ?

>         # first to do is sort into real and image arrays from
> interleaved array
>         realin = x[1::2]  # every other element starting at index 1
>         imagin = x[::2]   # every other element starting at index 0 \
>         # convert to frequency domain
>         realfd = fft(realin)
>         imagfd = fft(imagin)

          Normally, real and imaginary parts of the data are handled 
together when a complex data type is available.    I would do
          in = x[1::2] + 1j*x[::2]
          fd = fft(in)

>         # cartesian to polar in frequency domain
>         magn,phase = self.cartesian_to_polar(realfd, imagfd)
          This seems strange to me, why are you filtering the magnitude 
and phase separately?  This is very non-standard.

>         # apply filter and demodulate
           I doubt you want a 2048 tap filter at this point.  Probably 
something a lot less.
           What kind of modulation are you demodulating?   Are you doing 
frequency-domain filtering then?

>         # polar to cartesian in frequency domain
>         realfd, imagfd = self.polar_to_cartesian(magn, phase)

>         # convert to time domain
>         realtd = ifft(realfd)
>         imagtd = ifft(imagfd)
>         # convert back to interleaved array
>         x[::2] = realtd.astype(Float64)
>         x[1::2] = imagtd.astype(Float64)
>         # convert back to an integer array
>         x = x.astype('l')
>         # return python list of processed samples
>         return x.tolist()
     Why are you going back to a python list?  A numeric array is a 
valid sequence.  Why does it need to be a list.  You have a lot of 
memory copying going on which is going to slow down everything.

>     # utility functions not in the library
>     def polar_to_cartesian(self, magn, phase):
>         return magn * cos(phase), magn * sin(phase)
>     def cartesian_to_polar(self, real, imag):
>         # convert the arrays real, imag into the polar system
>         # magnitude array
>         magn = sqrt(real**2 + imag**2)
>         # phase array
>         phase =
>         return magn, phase

If you have a complex-valued array, then abs(x) is the magnitude and 
angle(x) is the phase.

My colleagues down the hall do software radio and I have a lot of 
background in signal processing.  Let me know if you have more specific 


-Travis O.

SciPy-user mailing list
SciPy-user at scipy.net http://www.scipy.net/mailman/listinfo/scipy-user

*** Confidentiality Notice *** Proprietary/Confidential
Information belonging to CGI Group Inc. and its affiliates
may be contained in this message. If you are not a recipient
indicated or intended in this message (or responsible for
delivery of this message to such person), or you think for
any reason that this message may have been addressed to you
in error, you may not use or copy or deliver this message
to anyone else.  In such case, you should destroy this
message and are asked to notify the sender by reply email.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.scipy.net/pipermail/scipy-user/attachments/20040227/d67b11e1/attachment-0001.htm

More information about the SciPy-user mailing list