[SciPy-user] shared memory machines

Gael Varoquaux gael.varoquaux@normalesup....
Wed Feb 11 08:24:22 CST 2009


On Wed, Feb 11, 2009 at 02:51:38PM +0100, Sturla Molden wrote:
> As long as the Heap object is destroyed, it will clean up. Try to put in 
> some printing to see if the buffer is marked for removal. Do not use a 
> Cython's print statement but something else (e.g. printf from stdio.h).

I have put in some more print statements. I have the fealing that it is
not cleaned up. I am attaching my test code, and the modified
sharedmemory_sysv.pyx for debug. The output is the following:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Array address in main program 47782588157952 (pid: 18024)
[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
Array address in sub program 47782588157952 (pid: 18033)
Calling __dealloc__ on buffer at 47782588157952, in pid 18033
Checking for deallocating of memory at 47782588157952
Not deallocating: 8 attached segments
[ 1.  1.  1.  0.  0.  0.  0.  0.  0.  0.]
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

I do have the feeling that something is not getting garbage-collected.

Gaël
-------------- next part --------------
# Written by Sturla Molden, 2009
# Released under SciPy license
#


ctypedef int size_t


cdef extern from "errno.h":
    int EEXIST, errno
    int EACCES, errno
    int ENOMEM, errno
 
cdef extern from "string.h":
    void memset(void *addr, int val, size_t len)
    void memcpy(void *trg, void *src, size_t len)

cdef extern from "sys/types.h":
    ctypedef int key_t

cdef extern from "sys/shm.h":

    ctypedef unsigned int shmatt_t

    cdef struct shmid_ds:
        shmatt_t shm_nattch

    int shmget(key_t key, size_t size, int shmflg)
    void *shmat(int shmid, void *shmaddr, int shmflg)
    int shmdt(void *shmaddr)
    int shmctl(int shmid, int cmd, shmid_ds *buf) nogil

cdef extern from "stdio.h":
    void printf(char *str, ...)

cdef extern from "sys/ipc.h":

    key_t ftok(char *path, int id)

    int IPC_STAT, IPC_RMID, IPC_CREAT, IPC_EXCL, IPC_PRIVATE


cdef extern from "unistd.h":
    unsigned int sleep(unsigned int seconds) nogil



import uuid
import weakref
import numpy
import threading
import os


cdef object __mapped_addresses = dict()



cdef object __open_handles = weakref.WeakValueDictionary()

cdef class Handle:
    
    """ Automatic shared segment deattachment
        - without this object we would need to do reference 
        counting manually, as shmdt is global to the process.
        Do not instantiate this class, except from within 
        SharedMemoryBuffer.__init__.  
    """ 
    
    cdef int shmid
    cdef object name 
    cdef object __weakref__
    
    def __init__(Handle self, shmid, name):
        self.shmid = <int> shmid
        self.name = name
    
    def gethandle(Handle self):
        return int(self.shmid)
                 
    def __dealloc__(Handle self):
        self.dealloc()

    def dealloc(Handle self):
        cdef shmid_ds buf
        cdef int _shmid = <int> self.shmid
        cdef void *addr
        cdef int ierr
        try:
            ma, size = __mapped_addresses[ self.name ]
            addr = <void *>(<unsigned long> ma)
            ierr = shmdt(addr)
            if (ierr < 0): raise MemoryError, "shmdt failed."
            del __mapped_addresses[ self.name ]
            printf("Checking for deallocating of memory at %lu\n", 
                    <unsigned long> ma) #DBG
            
        except KeyError:
           print __mapped_addresses #DBG
           print self.name          #DBG
           print 'KeyError'         #DBG
           #pass

        if (shmctl(_shmid, IPC_STAT, &buf) == -1):
            raise OSError, \
                  "IPC_STAT failed, you could have a global memory leak!"

        if (buf.shm_nattch == 0):
            if( shmctl(_shmid, IPC_RMID, NULL) == -1 ):
                raise OSError, \
                        "IPC_RMID failed, you have a global memory leak!"
            else:
                printf("shared segment removed\n")
        else:
            printf('Not deallocating: %i attached segments\n',
                    buf.shm_nattch)

cdef class SharedMemoryBuffer:
    
    """ Windows API shared memory segment """
    
    cdef void *mapped_address
    cdef object name
    cdef object handle
    cdef int shmid
    cdef unsigned long size
        
    def __init__(SharedMemoryBuffer self, unsigned int buf_size,
                        name=None, unpickling=False):
        cdef void* mapped_address
        cdef long mode
        cdef int shmid
        cdef int ikey
        cdef key_t key
        
        lkey = 1 if IPC_PRIVATE < 0 else IPC_PRIVATE + 1

        if (name is None) and (unpickling):
            raise TypeError, "Cannot unpickle without a kernel object name."

        elif (name is None) and not unpickling:

            # create a brand new shared segment

            while 1:
                self.name = numpy.random.random_integers(lkey, int(2147483646))
                ikey = <int> self.name
                memset(<void *> &key, 0, sizeof(key_t))
                memcpy(<void *> &key, <void *> &ikey, sizeof(int)) # key_t is large enough to contain an int
                shmid = shmget(key, buf_size, IPC_CREAT|IPC_EXCL|0600)
                if (shmid < 0):
                    if (errno != EEXIST):
                        raise OSError, "Failed to open shared memory"
                else: # we have an open segment
                    break

            self.handle = Handle(int(shmid), self.name)
            __open_handles[ self.name ] = self.handle

            mapped_address = shmat(shmid, NULL, 0)
            if (mapped_address == <void *> -1):
                if errno == EACCES:
                    raise OSError, "Failed to attach shared memory: permission denied"
                elif errno == ENOMEM:
                    raise OSError, "Failed to attach shared memory: insufficient memory"
                else:
                    raise OSError, "Failed to attach shared memory"

            self.shmid = shmid 
            self.size = buf_size
            self.mapped_address = mapped_address    
            ma = int(<unsigned long> self.mapped_address)
            size = int(buf_size) 
            __mapped_addresses[ self.name ] = ma, size 

        else: # unpickling
          
            self.name = name
            try:

                # check if this process has an open handle to
                # this segment already

                self.handle = __open_handles[ self.name ]
                self.shmid = <int> self.handle.gethandle()
                ma, size = __mapped_addresses[ self.name ]
                self.mapped_address = <void *>(<unsigned long> ma) 
                self.size = <unsigned long> size

            except KeyError:

                # unpickle a segment created by another process
             
                ikey = <int> self.name
                memset(<void *> &key, 0, sizeof(key_t))
                memcpy(<void *> &key, <void *> &ikey, sizeof(int))
                shmid = shmget(key, buf_size, 0)
                if (shmid < 0):
                    raise OSError, "Failed to open shared memory"
                self.handle = Handle(int(shmid), name)
                __open_handles[ self.name ] = self.handle

                mapped_address = shmat(shmid, NULL, 0)
                if (mapped_address == <void *> -1):
                    raise OSError, "Failed to attach shared memory"
                self.shmid = shmid 
                self.size = buf_size
                self.mapped_address = mapped_address    
                ma = int(<unsigned long> self.mapped_address)
                size = int(buf_size) 
                __mapped_addresses[ self.name ] = ma, size  

    def __dealloc__(SharedMemoryBuffer self):
        printf('Calling __dealloc__ on buffer at %lu, in pid %i\n', #DBG
                <unsigned long> self.mapped_address, #DBG
                  <int> os.getpid()) #DBG
        self.handle.dealloc()

    # return base address and segment size
    # this will be used by the heap object
    def getbuffer(SharedMemoryBuffer self):
        return int(<unsigned long> self.mapped_address), int(self.size) 

    
    # pickle
    def __reduce__(SharedMemoryBuffer self):
        return (__unpickle_shm, (self.size, self.name))
                   
def __unpickle_shm(*args):
    s, n = args
    return SharedMemoryBuffer(s, name=n, unpickling=True)






-------------- next part --------------
import ndarray as shmem
import numpy as np
import sys
import os

#a = shmem.shared_zeros(10)
#print >>sys.stderr, 'Array created'
#print a.ctypes.data
#print a
#print >>sys.stderr, 'Array printed'

def modify_array(ary):
    ary[:3] = 1
    print >>sys.stderr, 'Array address in sub program %s (pid: %s)' \
                % (ary.ctypes.data, os.getpid())

from multiprocessing import Pool

def main():
    SIZE = 10
    a = shmem.shared_zeros(SIZE)
    #a = np.zeros(SIZE)

    p = Pool()

    print >>sys.stderr, 'Array address in main program %s (pid: %s)' \
            % (a.ctypes.data, os.getpid())
    print >>sys.stderr, a

    job = p.apply_async(modify_array, (a, ))
    p.close()
    p.join()

    print >>sys.stderr, a

if __name__ == '__main__':
    main()

import gc
gc.collect()



More information about the SciPy-user mailing list