[SciPy-user] Circular Buffer

Sturla Molden sturla@molden...
Thu Feb 26 06:43:48 CST 2009


On 2/26/2009 12:17 PM, Samuel GARCIA wrote:

> -is there something more clerver in line 27 to avoid concatenate and so 
> a duplication of the array

You can make the allocated memory twice the size you need. When the 
buffer is full, you copy the latter half to the start. Let N be the 
capacity of the ringuffer, you could then have an append method like 
this (only appending to the tail is how most ringbuffers are used):

def append(self, a):
     try:
         self.array[self.tail] = a
         self.tail += 1
     except IndexError:
         N = self.array.shape[0] // 2
         self.array[:N] = self.array[N:]
         self.array[N] = a
         self.tail = N + 1

The IndexError exception triggers an O(N) operation for every Nth call 
to append, which makes appends having amortized O(1) complexity. That 
is, they become O(1) on average. You avoid the concatenation in 
__getitem__ because the items are always stored contiguously.


Sturla Molden










> - does someones already implement this for ND ?
> 
> Thanks
> 
> Samuel
> 
> # -*- coding: utf-8 -*-
> 
> 
> 
> 
> import numpy
> 
> 
> class CircularBuffer1d():
>     def __init__(self , shape , dtype = 'f'):
>         self.shape = shape
>         self.array = numpy.zeros(shape , dtype = dtype)
>        
>     def __getitem__(self , sl):
>         if type(sl) == int:
>             return self.array[sl%self.shape[0]]

And ndarray already has bounds-checks, to this is








>         elif type(sl) == slice :
>             if sl.start is None :
>                 start = 0
>             else :
>                 start = sl.start % self.shape[0]
>             stop = sl.stop % self.shape[0]
>             if stop>start :
>                 return self.array[start:stop]
>             else :
>                 return numpy.concatenate( ( self.array[start:], 
> self.array[: stop ]) , axis = 0)
>    
>     def __setitem__(self , sl, a):
>         if type(sl) == int:
>             self.array[sl%self.shape[0]] = a
>         elif type(sl) == slice :
>             if sl.start is None :
>                 start = 0
>             else :
>                 start = sl.start % self.shape[0]
>            
>             stop = sl.stop % self.shape[0]
>            
>             if stop>start :
>                 self.array[start:stop] = a
>             else :
>                 self.array[start:] = a[:self.shape[0] - start]
>                 self.array[: a.shape[0]-(self.shape[0] - start) ] = 
> a[self.shape[0] - start:]
> 
> 
> c  = CircularBuffer1d( (30,) , dtype = 'f')
> print c.array
> c[14:17] = numpy.ones((3))
> print c.array
> c[58:63] = numpy.arange(5)+1
> print c.array
> c[14] = 14
> c[15] = 15
> print c.array
> print c[15:15]
> 
> c[15:15] = numpy.arange(30)
> print c[0:30]
> 
> 
> 
> 
> 



More information about the SciPy-user mailing list