[IPython-dev] IPython Parallel Context Manager

David Hirschfeld dave.hirschfeld@gmail....
Fri Jan 11 09:19:11 CST 2013


I've created a context manger which can be used to instantiate and
shutdown an IPython cluster. I'm posting it here in the hope that
others may find it useful and also for any feedback on how it could be
implemented better, especially in regards to the ugly sleep calls
where I found no better way of waiting until the client had connected
to the instantiated engines.

Demo code below and context manager below that.

HTH,
Dave

```
In [4]: from IPython.utils.path import locate_profile
   ...: from IPython.parallel.apps.launcher import
LocalControllerLauncher, LocalEngineLauncher
   ...: from IPython.parallel import Client

In [5]: rc = Client() # No controller running, expect error
Traceback (most recent call last):

  File "<ipython-input-5-3adcc54b8411>", line 1, in <module>
    rc = Client()

  File "c:\dev\code\ipython\IPython\parallel\client\client.py", line
409, in __init__
    with open(url_file) as f:

IOError: [Errno 2] No such file or directory:
u'C:\\Users\\dhirschfeld\\.ipython\\profile_default\\security\\ipcontroller-client.json'


In [6]: with ParallelView(nengines=4, start_hub=True) as view:
   ...:     print view.targets
   ...: #
   ...:

Starting LocalControllerLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7208
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 4292
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 6320
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8804
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 5712
[0, 1, 2, 3]

In [7]: rc = Client() # Instantiated controller shutdown on exit, expect error
Traceback (most recent call last):

  File "<ipython-input-7-3adcc54b8411>", line 1, in <module>
    rc = Client()

  File "c:\dev\code\ipython\IPython\parallel\client\client.py", line
409, in __init__
    with open(url_file) as f:

IOError: [Errno 2] No such file or directory:
u'C:\\Users\\dhirschfeld\\.ipython\\profile_default\\security\\ipcontroller-client.json'


In [8]: controller = LocalControllerLauncher(profile_dir=locate_profile())
   ...: controller.start()

In [9]: with ParallelView(nengines=4) as view:
   ...:     print view.targets
   ...: #
   ...:

Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8580
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7524
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 11064
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 8144
[0, 1, 2, 3]

In [10]: rc = Client() # Engines only shutdown on exit, expect empty list
    ...: rc.ids
Out[10]: []

In [11]: engine = LocalEngineLauncher(profile_dir=locate_profile())
    ...: engine.start()

In [12]: engine = LocalEngineLauncher(profile_dir=locate_profile())
    ...: engine.start()

In [13]: rc.ids
Out[13]: [4, 5]

In [14]: with ParallelView(nengines=4) as view: # Expect only
instantiated engines in targets
    ...:     print view.targets
    ...: #
    ...:

Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 6344
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 7532
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 10032
Starting LocalEngineLauncher: <snip>
Process 'C:\\dev\\bin\\Python27\\python.exe' started: 10236
[8, 9, 6, 7]

In [15]: rc.ids # Instantiated engines shutdown, original engines remain
Out[15]: [4, 5]

In [16]:
```


class ParallelView(object):
    def __init__(self, nengines=1,
                       start_hub=False,
                       type='load_balanced',
                       profile='default',
                       logger=None,
                       delay=0.1):
        """Context mangager which will return a view [1]_ on an IPython
        cluster, optionally instantiating one if it is not already
        running.


        Parameters
        ----------
        nengines : int
            The number of engine instances to instantiate

        start_hub : bool
            Whether or not to instantiate the controller as well

        type : str
            Either 'load_balanced' or 'direct'

        profile : str
            The name of the IPython to use

        logger : a `logging.Logger` instance
            If None a default logger writing to `sys.stdout` will be used

        delay : float
           How long between engine instantiations to delay


        References
        ----------
        .. [1] http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.view.html#IPython.parallel.client.view.View

        """
        import time
        from IPython.utils.path import locate_profile
        from IPython.config import PyFileConfigLoader
        self.rc = None
        self.ids = None
        self.delay = delay
        self.nengines = int(nengines)
        self.engines = []
        self.type = str(type).lower()

        if self.type not in ('load_balanced','direct'):
            msg = "Expected `type` to be one of 'load_balanced' or
'direct' but got %s." % type
            raise ValueError(msg)

        if profile is None:
            self.profile_dir = locate_profile()
        else:
            self.profile_dir = locate_profile(profile=profile)
        self.profile = profile

        self.config = PyFileConfigLoader('ipython_config.py',
self.profile_dir).load_config()

        if logger is None:
            import logging
            import sys
            logger = logging.Logger('ipcluster')
            logger.setLevel(logging.DEBUG)
            logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger = logger

        if start_hub:
            from IPython.parallel.apps.launcher import LocalControllerLauncher
            controller = LocalControllerLauncher(config=self.config,
                                                 log=self.logger,
                                                 profile_dir=self.profile_dir)
            controller.start()
            time.sleep(2)
        self.start_hub = bool(start_hub)
    #

    def __enter__(self):
        from IPython.parallel.apps.launcher import LocalEngineLauncher
        from IPython.parallel import Client
        import time
        self.rc = Client(profile=self.profile)
        self.ids = set(self.rc.ids)
        for i in range(self.nengines):
            engine = LocalEngineLauncher(config=self.config,
                                         log=self.logger,
                                         profile_dir=self.profile_dir)
            self.engines.append(engine)
            engine.start()
            time.sleep(self.delay)
        MAX_LOOPS = 20
        while len(set(self.rc.ids).difference(self.ids)) <
sum(e.running for e in self.engines):
            time.sleep(10*self.delay)
            MAX_LOOPS -= 1
            if MAX_LOOPS < 0:
                break
        self.ids = list(set(self.rc.ids).difference(self.ids))
        if self.type == 'load_balanced':
            return self.rc.load_balanced_view(self.ids)
        return self.rc[self.ids]
    #

    def __exit__(self,  *exc_info):
        self.rc.shutdown(self.ids, hub=self.start_hub)
#


More information about the IPython-dev mailing list