[SciPy-User] stats, classes instead of functions for results MovStats

josef.pktd@gmai... josef.pktd@gmai...
Sun Nov 22 23:43:57 CST 2009


Following up on a question by Keith on the numpy list and his reminder
that covariance can be calculated by the cross-product minus the
product of the means, I redid and
enhanced my moving stats functions.

Suppose x and y are two time series, then the moving correlation
requires the calculation of the mean, variance and covariance for each
window. Currently in scipy stats intermediate results are usually
thrown away on return (while rpy/R returns all intermediate results
used for the calculation.

Using a decorator/descriptor of Fernando written for nitime, I tried
out to write the function as a class instead, so that any desired (
intermediate) calculations are only made on demand, but once they are
calculated they are attached to the class as attributes or properties.
This seems to be a useful "pattern".

Are there any opinion for using the pattern in scipy.stats ? MovStats
will currently go into statsmodels

Below is the class (with cutting part of init), a full script is the
attachment, including examples that test the class.

about MovStats:
y and x are tested for 2d, either (T,N) with axis=0 or (N,T) with
axis=1, should (but may not yet) work for nd arrays along any axis
(signal.correlate docstring)
nans are handled by dropping the corresponding observations from the
window, not adding any additional observations,
not tested if a window is empty because it contains only nans, nor if
variance is zero
(kern is intended for weighted statistics in the window but not tested
yet, I still need to decide on normalization requirements)
requires scipy.signal, all calculations done with signal.correlate, no loops
as often, functions are one-liners
all results are returned for valid observations only, initial
observations with incomplete window are cut
bonus: slope of moving regression of y on x, since it was trivial to add
still some cleaning and documentation to do

usage:
ms = MovStats(x, y, axis=1)
ms.yvar
ms.xmean
ms.yxcorr
ms.yxcov
...


Josef

class MovStats(object):
    def __init__(self, y, x=None, kern=5, axis=0):
        self.y = y
        self.x = x
        if np.isscalar(kern):
            ws = kern
<... snip>

    @OneTimeProperty
    def ymean(self):
        ys = signal.correlate(self.y, self.kern, mode='same')[self.sslice]
        ym = ys/self.n
        return ym

    @OneTimeProperty
    def yvar(self):
        ys2 = signal.correlate(self.y*self.y, self.kern,
mode='same')[self.sslice]
        yvar = ys2/self.n  - self.ymean**2
        return yvar

    @OneTimeProperty
    def xmean(self):
        if self.x is None:
            return None
        else:
            xs = signal.correlate(self.x, self.kern, mode='same')[self.sslice]
            xm = xs/self.n
            return xm

    @OneTimeProperty
    def xvar(self):
        if self.x is None:
            return None
        else:
            xs2 = signal.correlate(self.x*self.x, self.kern,
mode='same')[self.sslice]
            xvar = xs2/self.n  - self.xmean**2
            return xvar
    @OneTimeProperty
    def yxcov(self):
        xys = signal.correlate(self.x*self.y, self.kern,
mode='same')[self.sslice]
        return xys/self.n - self.ymean*self.xmean

    @OneTimeProperty
    def yxcorr(self):
        return self.yxcov/np.sqrt(self.yvar*self.xvar)

    @OneTimeProperty
    def yxslope(self):
        return self.yxcov/self.xvar
-------------- next part --------------
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 21 14:22:29 2009

Author: josef-pktd
"""

import numpy as np
from scipy import signal

class OneTimeProperty(object):
   """A descriptor to make special properties that become normal attributes.

   This is meant to be used mostly by the auto_attr decorator in this module.
   Author: Fernando Perez, copied from nitime
   """
   def __init__(self,func):
       """Create a OneTimeProperty instance.

        Parameters
        ----------
          func : method
          
            The method that will be called the first time to compute a value.
            Afterwards, the method's name will be a standard attribute holding
            the value of this computation.
            """
       self.getter = func
       self.name = func.func_name

   def __get__(self,obj,type=None):
       """This will be called on attribute access on the class or instance. """

       if obj is None:
           # Being called on the class, return the original function. This way,
           # introspection works on the class.
           #return func
           print 'class access'
           return self.getter

       val = self.getter(obj)
       #print "** auto_attr - loading '%s'" % self.name  # dbg
       setattr(obj, self.name, val)
       return val


def moving_slope(x,y):
    '''estimate moving slope coefficient of regression of y on x
    filters along axis=1, returns valid observations
    Todo: axis and lag options
    idea by John D'Errico
    '''
    xx = np.column_stack((np.ones(x.shape), x))
    pinvxx = np.linalg.pinv(xx)[1:,:]
    windsize = len(x)
    lead = windsize//2 - 1
    return signal.correlate(y, pinvxx, 'full' )[:,windsize-lead:-(windsize+1*lead-2)]

def corrxy(x, y, ws):
    # based on example by Keith
    d = np.nan * np.ones_like(y)
    for i in range(y.shape[0]):
        yi = y[i,:]
        xi = x[i,:]
        for j in range(ws-1, y.shape[1]):
            yj = yi[j+1-ws:j+1]
            xj = xi[j+1-ws:j+1]
            d[i,j] = np.corrcoef(xj, yj, bias=1)[0,1]
    return d

    
x = np.sin(np.arange(20))[None,:] + np.random.randn(5, 20)
#x = y**2

def movstats(y, x=None, ws=5, kind='mvcr', axis=0):
    ''' return moving correlation between two timeseries
    handles 1d or 2d data
    '''
    kdim = [1]*y.ndim
    kdim[axis] = ws
    kern = np.ones(tuple(kdim))
    sslice = [slice(None)]*y.ndim
    sslice[axis] = slice(ws//2, -ws//2+1)
    ys = signal.correlate(y, kern, mode='same')[sslice]
    ys2 = signal.correlate(y*y, kern, mode='same')[sslice]
    xs = signal.correlate(x, kern, mode='same')[sslice]
    xs2 = signal.correlate(x*x, kern, mode='same')[sslice]
    xys = signal.correlate(x*y, kern, mode='same')[sslice]
    n = ws
    ym = ys/(1.*n)
    xm = xs/(1.*n)
    yvar = ys2/(1.*n) - ym**2
    xvar = xs2/(1.*n) - xm**2
    xycov = xys/(1.*n) - ym*xm
    xycorr = xycov/np.sqrt(yvar*xvar)
    return xycorr

class MovStats(object):
    def __init__(self, y, x=None, kern=5, axis=0):
        self.y = y
        self.x = x
        if np.isscalar(kern):
            ws = kern        
            kdim = [1]*self.y.ndim
            #print ws, kdim, self.y.ndim
            kdim[axis] = ws
            self.kern = np.ones(tuple(kdim))
        else:
            ws = y.shape[axis]
            if ((kern.ndim != self.y.ndim)
                or np.all([kern.shape(i) for i in self.y.ndim
                   if not i==axis])):
                raise ValueError('kern has incorrect shape')
            self.kern = kern

        sslice = [slice(None)]*y.ndim
        sslice[axis] = slice(ws//2, -ws//2+1)
        self.sslice = sslice
        #Todo: add nan handling
        
        if self.x is None:
            ynotnan = ~np.isnan(self.y)
        else:
            self.x = np.copy(self.x)
            ynotnan = (~np.isnan(self.y))*(~np.isnan(self.x))
            #ynotnan = ~np.logical_or(np.isnan(self.y), np.isnan(self.x))
            self.x[~ynotnan] = 0
        self.y = np.copy(self.y)
        self.y[~ynotnan] = 0
        if ynotnan.all():
            self.n = 1.0* ws
        else:
            self.n = signal.correlate(ynotnan, self.kern, mode='same')[self.sslice]

    @OneTimeProperty
    def ymean(self):
        ys = signal.correlate(self.y, self.kern, mode='same')[self.sslice]
        ym = ys/self.n
        return ym
    
    @OneTimeProperty
    def yvar(self):
        ys2 = signal.correlate(self.y*self.y, self.kern, mode='same')[self.sslice]
        yvar = ys2/self.n  - self.ymean**2
        return yvar
    
    @OneTimeProperty
    def xmean(self):
        if self.x is None:
            return None
        else:
            xs = signal.correlate(self.x, self.kern, mode='same')[self.sslice]
            xm = xs/self.n
            return xm
    @OneTimeProperty
    def xvar(self):
        if self.x is None:
            return None
        else:
            xs2 = signal.correlate(self.x*self.x, self.kern, mode='same')[self.sslice]
            xvar = xs2/self.n  - self.xmean**2
            return xvar
    @OneTimeProperty
    def yxcov(self):
        xys = signal.correlate(self.x*self.y, self.kern, mode='same')[self.sslice]
        return xys/self.n - self.ymean*self.xmean
            
    @OneTimeProperty
    def yxcorr(self):
        return self.yxcov/np.sqrt(self.yvar*self.xvar)
    
    @OneTimeProperty
    def yxslope(self):
        return self.yxcov/self.xvar
    


x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])

x = np.sin(np.arange(20))[None,:] + np.random.randn(5, 20)
y = 1*np.arange(20)[None,:] + np.random.randn(5, 20)
ws=5

ms = MovStats(x, y, axis=1)
print dir(ms)
xyc = MovStats(x, y, axis=1).yxcorr
xyc_loop = corrxy(x, y, ws)[:,ws-1:]
#testing
#print xyc_loop
#print xyc
print np.corrcoef(y[0,:5],x[0,:5],bias=1)
print np.corrcoef(y[0,2:7],x[0,2:7],bias=1)
print np.corrcoef(y[1,:5],x[1,:5],bias=1)
print np.corrcoef(y[-1,-5:],x[-1,-5:],bias=1)
print 'maxabsdiff', np.max(np.abs(xyc_loop - xyc))

print 'test yxolsslope'
from scipy import stats
print stats.linregress(y[0,:5],x[0,:5])[0]
print stats.linregress(y[0,2:7],x[0,2:7])[0]
print stats.linregress(y[-1,-5:],x[-1,-5:])[0]
print ms.yxslope

print 'test axis=0'
xyc_loopT = corrxy(x.T, y.T, ws)[:,ws-1:].T
xycT = MovStats(x, y, axis=0).yxcorr
print 'maxabsdiff', np.max(np.abs(xyc_loopT - xycT))

print 'testnan'
xn = x.copy()
xn[:, 2::5] = np.nan

xync = MovStats(xn, y, axis=1).yxcorr
#print xync
xnr = xn[~np.isnan(xn)].reshape(5,-1)
ynr = y[~np.isnan(xn)].reshape(5,-1)
xync_loop = corrxy(xnr, ynr, 4)[:,4-1:]
xyncr = xync[~np.isnan(xn)[:,4:]].reshape(5,-1)
print 'maxabsdiff', np.max(np.abs(xync_loop - xyncr))



More information about the SciPy-User mailing list