src/pyams_zmq/process.py
changeset 0 11f0f97d508f
child 4 d624312bfc2b
equal deleted inserted replaced
-1:000000000000 0:11f0f97d508f
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import multiprocessing
       
    18 import signal
       
    19 import sys
       
    20 import zmq
       
    21 from zmq.eventloop import ioloop, zmqstream
       
    22 
       
    23 # import interfaces
       
    24 from pyams_zmq.interfaces import IZMQProcess
       
    25 
       
    26 # import packages
       
    27 from zope.interface import implementer
       
    28 
       
    29 
       
    30 @implementer(IZMQProcess)
       
    31 class ZMQProcess(multiprocessing.Process):
       
    32     """
       
    33     This is the base for all processes and offers utility methods
       
    34     for setup and creating new streams.
       
    35     """
       
    36 
       
    37     socket_type = zmq.REP
       
    38 
       
    39     def __init__(self, bind_addr, handler):
       
    40         super(ZMQProcess, self).__init__()
       
    41 
       
    42         self.context = None
       
    43         """The ØMQ :class:`~zmq.Context` instance."""
       
    44 
       
    45         self.loop = None
       
    46         """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
       
    47 
       
    48         self.bind_addr = bind_addr
       
    49         self.rep_stream = None
       
    50         self.handler = handler
       
    51 
       
    52     def setup(self):
       
    53         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
       
    54         self.context = zmq.Context()
       
    55         self.loop = ioloop.IOLoop.instance()
       
    56         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
       
    57         self.initStream()
       
    58 
       
    59     def initStream(self):
       
    60         """Initialize response stream"""
       
    61         self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
       
    62 
       
    63     def run(self):
       
    64         """Sets up everything and starts the event loop."""
       
    65         signal.signal(signal.SIGTERM, self.exit)
       
    66         self.setup()
       
    67         self.loop.start()
       
    68 
       
    69     def stop(self):
       
    70         """Stops the event loop."""
       
    71         if self.loop is not None:
       
    72             self.loop.stop()
       
    73             self.loop = None
       
    74 
       
    75     def exit(self, num, frame):
       
    76         self.stop()
       
    77         sys.exit()
       
    78 
       
    79     def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
       
    80         """
       
    81         Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
       
    82 
       
    83         :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
       
    84         :param addr: Address to bind or connect to formatted as *host:port*,
       
    85                 *(host, port)* or *host* (bind to random port).
       
    86                 If *bind* is ``True``, *host* may be:
       
    87 
       
    88                 - the wild-card ``*``, meaning all available interfaces,
       
    89                 - the primary IPv4 address assigned to the interface, in its
       
    90                 numeric representation or
       
    91                 - the interface name as defined by the operating system.
       
    92 
       
    93                 If *bind* is ``False``, *host* may be:
       
    94 
       
    95                 - the DNS name of the peer or
       
    96                 - the IPv4 address of the peer, in its numeric representation.
       
    97 
       
    98                 If *addr* is just a host name without a port and *bind* is
       
    99                 ``True``, the socket will be bound to a random port.
       
   100         :param bind: Binds to *addr* if ``True`` or tries to connect to it
       
   101                 otherwise.
       
   102         :param callback: A callback for
       
   103                 :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
       
   104         :param subscribe: Subscription pattern for *SUB* sockets, optional,
       
   105                 defaults to ``b''``.
       
   106         :returns: A tuple containg the stream and the port number.
       
   107 
       
   108         """
       
   109         sock = self.context.socket(sock_type)
       
   110 
       
   111         # addr may be 'host:port' or ('host', port)
       
   112         if isinstance(addr, str):
       
   113             addr = addr.split(':')
       
   114         host, port = addr if len(addr) == 2 else (addr[0], None)
       
   115 
       
   116         # Bind/connect the socket
       
   117         if bind:
       
   118             if port:
       
   119                 sock.bind('tcp://%s:%s' % (host, port))
       
   120             else:
       
   121                 port = sock.bind_to_random_port('tcp://%s' % host)
       
   122         else:
       
   123             sock.connect('tcp://%s:%s' % (host, port))
       
   124 
       
   125         # Add a default subscription for SUB sockets
       
   126         if sock_type == zmq.SUB:
       
   127             sock.setsockopt(zmq.SUBSCRIBE, subscribe)
       
   128 
       
   129         # Create the stream and add the callback
       
   130         stream = zmqstream.ZMQStream(sock, self.loop)
       
   131         if callback:
       
   132             stream.on_recv(callback)
       
   133 
       
   134         return stream, int(port)
       
   135 
       
   136 
       
   137 def process_exit_func(process=None):
       
   138     if process is not None:
       
   139         if process.is_alive():
       
   140             process.terminate()
       
   141         process.join()