[IPython-User] client.spin() increasing execution time, and MemoryError

Georges-Emmanuel RICHARD georges.emmanuel@gmail....
Thu Mar 15 12:49:52 CDT 2012

On 3/15/2012 1:09 AM, MinRK wrote:
> On Wed, Mar 14, 2012 at 08:22, RICHARD Georges-Emmanuel
> <perspective.electronic@gmail.com>  wrote:
>> Hi all, Hi Minrk,
>> I'm not sure if I should post on IPython list or PyZMQ list,
> IPython is the right place.
>> Brief: Usage of IPython parallel, I use DirectView object to "execute" code
>> on 32 engines and "pull" to retrieve results.
>> Those results are simply compared (means read only) to some range limit to
>> know which engine got error and which pass.
>> We repeat this, previous result are erase by new, so the memory footprint on
>> engines is constant.
>> from the IPython Client side, after a long run (it depends on user usage)
>> - currently it is not very intensive but as the system is never restarted
>> after several weeks it crashed,
>> - then it will be something like 1 execute every 1 or 3seconds followed by a
>> pull, I expect a crash within few days at this rhythm  :-( .
> Wow, that's rough.  I hope we can figure this out.
>> TestCase:
>> -open a shell execute:
>>      ipcontroller --ip='*' --nodb
>> -then open another shell to start 32 engines,  I use python to do it:
>> import time,os
>> for i in range(32):
>>      os.popen("ipengine&")
>>      time.sleep(1)
>> -execute the script (README_script_test.py attached to this mail):
>> When I started to debug and profile IPython internal method, I started to
>> insert some time.time() in each method involved in AsyncResult, finally I
>> pointed the client.py spin() method,
>> as I don't want you to modify your installation, in the script provided I
>> define a myClient class which inherit from IPython.parallel.Client and I
>> override spin method to add the time.time() that help me to figure out where
>> the time is spent.
>> Then it's a simple dview.execute to set a variable on all engines, followed
>> by a loop to retrieve the variable 1000 of times.
>> a boolean WRITECSVFILE, can be set to True to open profiling data in oocalc
>> or excel
>> a boolean TEST_MEMORY_ERROR, can be set to True, it just do more loops,
>> maybe need to be extended to reach the MemoryError but it's probably not
>> desired.
>> Issues:
>> 1) with a result of type List, while I retrieve it a growing delay appear
>> every N ( 4 to 5 in case of 200 elements in list to pull with 32 engines, no
>> matters if elements are float only or string only or mixed)
>> AsyncResult.get().
>>      I did some profiling, this is what the test script help to produce, I
>> also modified client.py
>>      http://i40.tinypic.com/1z6bpuh.png   or    http://dl.free.fr/bvYnPT3rR
>>      I managed to notice spin() method of the client.py in which the
>> _flush_results(self._mux_socket) seems to be where the time is spent, but
>> then I do see the pyzmq interface involved but I don't understand what's
>> wrong with it.
> I'll have to further investigate this one. I don't see anything
> particularly strange in my running of the script.  Can you do some
> profiling, to see what operations are actually taking the time?

I profiled deeper in method _flush_results()  (the one that become 
slower execution after execution of a pull('alistobject').r ),
  and point the line: handler(msg)
     def _flush_results(self, sock):
         """Flush task or queue results waiting in ZMQ queue."""
         idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
         while msg is not None:
             if self.debug:
             msg_type = msg['header']['msg_type']
             handler = self._queue_handlers.get(msg_type, None)
             if handler is None:
                 raise Exception("Unhandled message type: %s"%msg.msg_type)
                 handler(msg)    ####################################### 
Here is the incrinsing time, I continue to dig!
             idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)

Then I logged the 'msg' to stdout and pipe it to a file (something like 
430MB of log), it tells me that 'msg_type' is 'apply_reply', all 
'status' ar 'ok'.

I profiled method _handle_apply_reply(),  and point the 

     def _handle_apply_reply(self, msg):
         """Save the reply to an apply_request into our results."""
         parent = msg['parent_header']
         msg_id = parent['msg_id']
         if msg_id not in self.outstanding:
             if msg_id in self.history:
                 print ("got stale result: %s"%msg_id)
                 print self.results[msg_id]
                 print msg
                 print ("got unknown result: %s"%msg_id)
         content = msg['content']
         header = msg['header']

         # construct metadata:
         md = self.metadata[msg_id]
         md.update(self._extract_metadata(header, parent, content))
         # is this redundant?
         self.metadata[msg_id] = md

         e_outstanding = self._outstanding_dict[md['engine_uuid']]
         if msg_id in e_outstanding:

         # construct result:
         if content['status'] == 'ok':
             self.results[msg_id] = 
util.unserialize_object(msg['buffers'])[0]  ################### Here we 
are, this line is the one growing up in time.
         elif content['status'] == 'aborted':
             self.results[msg_id] = error.TaskAborted(msg_id)
         elif content['status'] == 'resubmitted':
             # TODO: handle resubmission
             self.results[msg_id] = self._unwrap_exception(content)

So I didn't feel hopeless, and I tried to understand where the time is 
def unserialize_object(bufs):
     """reconstruct an object serialized by serialize_object from data 
     bufs = list(bufs)
     sobj = pickle.loads(bufs.pop(0))  
#################################### Here is the HOT line, :'(
     if isinstance(sobj, (list, tuple)):
         for s in sobj:
             if s.data is None:
                 s.data = bufs.pop(0)
         return uncanSequence(map(unserialize, sobj)), bufs
     elif isinstance(sobj, dict):
         newobj = {}
         for k in sorted(sobj.iterkeys()):
             s = sobj[k]
             if s.data is None:
                 s.data = bufs.pop(0)
             newobj[k] = uncan(unserialize(s))
         return newobj, bufs
         if sobj.data is None:
             sobj.data = bufs.pop(0)
         return uncan(unserialize(sobj)), bufs

As I don't understand why this line    pickle.loads(bufs.pop(0))  get 
slower and slower, after many execution, I decided to bench only 
I copied only 1 'buffer' of a msg from the previous log, and unserialize 
it 10000 times, absolutely no increase of time. Just work as it should.

So my only chance,  is to dig the log to find which 'buffer' make the 
pickle.loads slow down.
I did check the lenght of the bufs.pop(0), I splitted the line:
tmp_pop = bufs.pop(0) # I checked len(tmp_pop), Ok, I am hopless.
sobj = pickle.loads(tmp_pop)
there is 30 first call with a buffer lenght of 115, then all other call 
are with a buffer lenght of 6154.
I am really confused, as I think pickle.loads is very stable, and I 
cannot imagined a weird buffer can make it change behavior and slow down.
I may give a try to a 'msg' reader, to reinject the whole log I got, it 
should make the issues appear, like it does with the Client.

But I should maybe give a try with another python version first, like 2.7.

>> 2) memory usage keep growing, and finaly reach a MemoryError.
>>      no matter the type of data retrieved, numpy array, simple string, float
>> or a list.
> This one's easy: https://github.com/ipython/ipython/issues/1131
> It's the very top of my IPython todo list, but getting it right in
> general is a bit tricky with AsyncResults, etc., so I don't have an
> implementation yet.  Maybe later today...
> The Client caches all results in its `results` dictionary.  These are
> not deduplicated with hashes or anything.  Just one object per Result
> (note that each pull you are doing produces *32* results).  So by the
> end of your script, you have at least 32000 copies of the list you are
> pulling.
> To clear this dictionary, simply call `client.results.clear()`.
> For a full client-side purge:
> # clear caches of results and metadata
> client.results.clear()
> client.metadata.clear()
> view.results.clear()
> # Also (optionally) clear the history of msg_ids
> assert not client.outstanding, "don't clear when tasks are outstanding"
> client.history = []
> dview.history = []
> Using this, I have run large amounts data-moving tasks (1 MB/s on
> average) for a couple of days (totaling 100s of GB) without
> encountering memory issues.

Good news, I didn't give a try yet for this, that's my next step.

>>   comments:
>>         concerning 1)  I understand the serialization following the type of
>> results, it takes more or less time to retrieve, BUT if the time is rising
>> pull after pull on the same result (it seems to be only with List) something
>> else is involved, and I don't catch it.
>>         concerning 2)  I though the Hub Database could have been involved, so
>> when I start the ipcontroller I added --nodb, but I still got the
>> MemoryError, moreover this option concerns only the ipcontroller app,
>> also I tried    rc.purge_results('all'), but didn't see any impact.
> purge_results is strictly a Hub DB operation.  Working out exactly how
>> are    rc.history,    dview.results,    rc.session.digest_history or other
>> internal object should be cleared regularly?
>> I read this:
>> http://ipython.org/ipython-doc/stable/parallel/parallel_details.html#non-copying-sends-and-numpy-arrays
>> What is sendable (...)
>> (unless the data is very small)      ----->  I'am not sure what that means,
>> but if a results is pulled twice, how to do to make the 2nd erase the 1st
>> one.
>> Question:
>> any kind people who want give a try to the script I attached to confirm the
>> behaviour, any hint to avoid my IPython client process to slow down, and
>> reach a MemoryError, or do I miss a page in the manual about clear the
>> memory in the user application?
> This should certainly be in the docs, but it probably hasn't been
> because I have considered it a bug I should fix, rather than behavior
> to be documented.
> -MinRK

Great work, that's on the good way, another thumb up for the notebook.

>> in advance Thanks, keep the good works!
>> Environment:
>> Linux RHEL 5.5, python 2.6.6
>> ipython 0.12, then upgrade to the latest git 0.13dev
>> zmq+pyzmq 2.1.7, then upgrade to the latest zmq+pyzmq 2.1.11
>> jsonlib2     1.3.10
>> Cheers,
>>         Joe
>> --
>> RICHARD Georges-Emmanuel
>> _______________________________________________
>> IPython-User mailing list
>> IPython-User@scipy.org
>> http://mail.scipy.org/mailman/listinfo/ipython-user
> _______________________________________________
> IPython-User mailing list
> IPython-User@scipy.org
> http://mail.scipy.org/mailman/listinfo/ipython-user

Skype Me™! <skype:joe_aie?call>
Get Skype <http://www.skype.com/go/download> and call me for free.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.scipy.org/pipermail/ipython-user/attachments/20120316/ff4dd3c3/attachment-0001.html 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: call_blue_white_124x52.png
Type: image/png
Size: 2538 bytes
Desc: not available
Url : http://mail.scipy.org/pipermail/ipython-user/attachments/20120316/ff4dd3c3/attachment-0001.png 

More information about the IPython-User mailing list