# RICHARD Georges-Emmanuel # minimal environnement to reproduce IPython slow down beahviour with List and # memory footprint growth # #open a shell execute: #ipcontroller --ip='*' --nodb& # #then open another shell to start 32 engines: # I use ipython shell as its very convenient # #import time,os #for i in range(32): # os.popen("ipengine&") # time.sleep(1) # #finally open a last shell and run ipython to work with the cluster: # # the interesting profiling are # testTimeDict['timespent'], this is the profiling of line 78: res=ares.r (equivalent to a AsyncResult.get()) # dview.client._testTimeBUG['2res'], this is the profiling of line 50: _flush_results(self._mux_socket) ################################ WRITECSVFILE = False # ^_^ I'm polite, we write file only if user read this code, and set it to True TEST_MEMORY_ERROR = False # still polite, it's just more loop that can make computer swap ,slow down before it finaly reach the MemoryError from IPython.parallel import Client import time # it's just to avoid modify the IPython installation. class myClient(Client): # the purpose is to profile spin method, we can override. def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, exec_key=None, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, **extra_args ): self.DEBUG_WAIT = False # 20120227 bug reseau self.testTimeBUG={'0timestp':[],'1noti':[],'2res':[],'3res':[],'4ctl':[],'5iopub':[],'6ignhub':[]}# 20120227 bug reseau super(myClient, self).__init__(url_or_file, profile, profile_dir, ipython_dir,context, debug, exec_key,sshserver, sshkey, password, paramiko,timeout, **extra_args) #Client.__init__(url_or_file, profile, profile_dir, ipython_dir,context, debug, exec_key,sshserver, sshkey, password, paramiko,timeout, **extra_args) def spin(self): """Flush any registration notifications and execution results waiting in the ZMQ queue. """ if self.DEBUG_WAIT:_testTimeBUG = time.time();self.testTimeBUG['0timestp'].append(_testTimeBUG) if self._notification_socket: self._flush_notifications() if self.DEBUG_WAIT:_testTimeBUG2 = time.time();self.testTimeBUG['1noti'].append(_testTimeBUG2 - _testTimeBUG) if self._mux_socket: self._flush_results(self._mux_socket) if self.DEBUG_WAIT:_testTimeBUG = time.time();self.testTimeBUG['2res'].append(_testTimeBUG - _testTimeBUG2) if self._task_socket: self._flush_results(self._task_socket) if self.DEBUG_WAIT:_testTimeBUG2 = time.time();self.testTimeBUG['3res'].append(_testTimeBUG2 - _testTimeBUG) if self._control_socket: self._flush_control(self._control_socket) if self.DEBUG_WAIT:_testTimeBUG = time.time();self.testTimeBUG['4ctl'].append(_testTimeBUG - _testTimeBUG2) if self._iopub_socket: self._flush_iopub(self._iopub_socket) if self.DEBUG_WAIT:_testTimeBUG2 = time.time();self.testTimeBUG['5iopub'].append(_testTimeBUG2 - _testTimeBUG) if self._query_socket: self._flush_ignored_hub_replies() if self.DEBUG_WAIT:_testTimeBUG = time.time();self.testTimeBUG['6ignhub'].append(_testTimeBUG - _testTimeBUG2) rc=myClient() dview=rc[:] dview.execute("MY_RES=200*['abcdefgh']") # the list can be of float, or string or mixed all case got slow down issue #dview.execute("MY_RES=200*'abcdefgh'") -> memory issue fast #dview.execute("import numpy") #dview.execute("MY_RES= numpy.empty(2000,dtype='|S18')") -> memory issue testTimeDict={'timestp':[],'timespent':[]} dview.client.testTimeBUG={'0timestp':[],'1noti':[],'2res':[],'3res':[],'4ctl':[],'5iopub':[],'6ignhub':[]}# 20120227 bug reseau for loop in range(1000): tmp = time.time() ares=dview.pull("MY_RES") dview.client.DEBUG_WAIT=True res=ares.r dview.client.DEBUG_WAIT=False testTimeDict['timespent'].append(time.time()-tmp) testTimeDict['timestp'].append(tmp) if WRITECSVFILE: #formatting and export to csv import csv,numpy p=[] f=open('bug_test_applevel.csv','wb') fc = csv.writer(f,dialect='excel') col_sorted= testTimeDict.keys() col_sorted.sort() for k in col_sorted: p.append(testTimeDict[k]) p2 = numpy.array(p).transpose().tolist() p2.insert(0,col_sorted) #we keep column title for k in p2: fc.writerow(k) f.close() p=[] f=open('bug_at_spinlevel.csv','wb') fc = csv.writer(f,dialect='excel') col_sorted=dview.client.testTimeBUG.keys() col_sorted.sort() for k in col_sorted: p.append(dview.client.testTimeBUG[k]) p2 = numpy.array(p).transpose().tolist() p2.insert(0,col_sorted) #we keep column title for k in p2: fc.writerow(k) f.close() if TEST_MEMORY_ERROR: # range may be adjusted to reach a MemoryError, but in any case, either with shell command 'top' or other process monitoring software we can track the growth of Memory footprint. How we can reduce, or reset this footprint on runtime? for i in range(6): print i testTimeDict={'timestp':[],'timespent':[]} dview.client.testTimeBUG={'0timestp':[],'1noti':[],'2res':[],'3res':[],'4ctl':[],'5iopub':[],'6ignhub':[]}# 20120227 bug reseau for loop in range(1000): tmp = time.time() ares=dview.pull("MY_RES") dview.client.DEBUG_WAIT=True res=ares.r dview.client.DEBUG_WAIT=False testTimeDict['timespent'].append(time.time()-tmp) testTimeDict['timestp'].append(tmp)