src/pyams_zmq/process.py
changeset 25 9352acb9366b
parent 22 05aaf0acf4fb
equal deleted inserted replaced
24:36b9e48666ce 25:9352acb9366b
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
    12 
    12 
    13 __docformat__ = 'restructuredtext'
    13 """PyAMS_zmq.process module
       
    14 
       
    15 A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup
       
    16 to be able to handle incoming connections.
       
    17 
       
    18 To each process is attached a messages handler, which is responsible of doing the concrete
       
    19 messages handling.
       
    20 
       
    21 Process initialization arguments allows to define a list of client IP addresses which are
       
    22 allowed to connect to this process, as well as a login/password authentication tokens which
       
    23 must be provided to connect to this process.
       
    24 """
    14 
    25 
    15 import multiprocessing
    26 import multiprocessing
    16 import signal
    27 import signal
    17 import sys
    28 import sys
    18 
    29 
    19 import zmq
    30 import zmq
       
    31 from tornado import ioloop
    20 from zmq.auth.thread import ThreadAuthenticator
    32 from zmq.auth.thread import ThreadAuthenticator
    21 from zmq.eventloop import ioloop, zmqstream
    33 from zmq.eventloop import zmqstream
    22 from zope.interface import implementer
    34 from zope.interface import implementer
    23 
    35 
    24 from pyams_utils.registry import get_global_registry
    36 from pyams_utils.registry import get_global_registry
    25 from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent
    37 from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent
    26 
    38 
    27 
    39 
       
    40 __docformat__ = 'restructuredtext'
       
    41 
       
    42 
    28 @implementer(IZMQProcess)
    43 @implementer(IZMQProcess)
    29 class ZMQProcess(multiprocessing.Process):
    44 class ZMQProcess(multiprocessing.Process):
       
    45     # pylint: disable=too-many-instance-attributes
    30     """
    46     """
    31     This is the base for all processes and offers utility methods
    47     This is the base for all processes and offers utility methods
    32     for setup and creating new streams.
    48     for setup and creating new streams.
    33     """
    49     """
    34 
    50 
    35     socket_type = zmq.REP
    51     socket_type = zmq.REP  # pylint: disable=no-member
    36     auth_thread = None
    52     auth_thread = None
    37 
    53 
    38     def __init__(self, bind_addr, handler, auth=None, clients=None):
    54     def __init__(self, bind_addr, handler, auth=None, clients=None):
    39         super(ZMQProcess, self).__init__()
    55         super(ZMQProcess, self).__init__()
    40 
    56 
    50         self.passwords = dict([auth.split(':', 1)]) if auth else None
    66         self.passwords = dict([auth.split(':', 1)]) if auth else None
    51         self.clients = clients.split() if clients else None
    67         self.clients = clients.split() if clients else None
    52 
    68 
    53     def setup(self):
    69     def setup(self):
    54         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
    70         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
    55         ioloop.install()
       
    56         ctx = self.context = zmq.Context()
    71         ctx = self.context = zmq.Context()
    57         auth = self.auth_thread = ThreadAuthenticator(ctx)
    72         auth = self.auth_thread = ThreadAuthenticator(ctx)
    58         auth.start()
    73         auth.start()
    59         if self.clients:
    74         if self.clients:
    60             auth.allow(*self.clients)
    75             auth.allow(*self.clients)  # pylint: disable=not-an-iterable
    61         if self.passwords:
    76         if self.passwords:
    62             auth.configure_plain(domain='*', passwords=self.passwords)
    77             auth.configure_plain(domain='*', passwords=self.passwords)
    63         self.loop = ioloop.IOLoop.instance()
    78         self.loop = ioloop.IOLoop.current()
    64         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
    79         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
    65         self.initStream()
    80         self.init_stream()
    66 
    81 
    67     def initStream(self):
    82     def init_stream(self):
    68         """Initialize response stream"""
    83         """Initialize response stream"""
    69         self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
    84         self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
    70 
    85 
    71     def run(self):
    86     def run(self):
    72         """Sets up everything and starts the event loop."""
    87         """Sets up everything and starts the event loop on process startup"""
    73         signal.signal(signal.SIGTERM, self.exit)
    88         signal.signal(signal.SIGTERM, self.exit)
    74         self.setup()
    89         self.setup()
    75         registry = get_global_registry()
    90         registry = get_global_registry()
    76         registry.notify(ZMQProcessStartedEvent(self))
    91         registry.notify(ZMQProcessStartedEvent(self))  # pylint: disable=no-member
    77         self.loop.start()
    92         self.loop.start()
    78 
    93 
    79     def stop(self):
    94     def stop(self):
    80         """Stops the event loop."""
    95         """Stops the event loop."""
    81         if self.loop is not None:
    96         if self.loop is not None:
    82             self.loop.stop()
    97             self.loop.stop()
    83             self.loop = None
    98             self.loop = None
    84         self.auth_thread.stop()
    99         self.auth_thread.stop()
    85 
   100 
    86     def exit(self, num, frame):
   101     def exit(self, num, frame):  # pylint: disable=unused-argument
       
   102         """Process exit"""
    87         self.stop()
   103         self.stop()
    88         sys.exit()
   104         sys.exit()
    89 
   105 
    90     def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
   106     def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
    91         """
   107         # pylint: disable=too-many-arguments
    92         Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
   108         """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
    93 
   109 
    94         :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
   110         :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
    95         :param addr: Address to bind or connect to formatted as *host:port*,
   111         :param addr: Address to bind or connect to formatted as *host:port*,
    96                 *(host, port)* or *host* (bind to random port).
   112             *(host, port)* or *host* (bind to random port).
    97                 If *bind* is ``True``, *host* may be:
   113             If *bind* is ``True``, *host* may be:
    98 
   114 
    99                 - the wild-card ``*``, meaning all available interfaces,
   115             - the wild-card ``*``, meaning all available interfaces,
   100                 - the primary IPv4 address assigned to the interface, in its
   116             - the primary IPv4 address assigned to the interface, in its
   101                 numeric representation or
   117             numeric representation or
   102                 - the interface name as defined by the operating system.
   118             - the interface name as defined by the operating system.
   103 
   119 
   104                 If *bind* is ``False``, *host* may be:
   120             If *bind* is ``False``, *host* may be:
   105 
   121 
   106                 - the DNS name of the peer or
   122             - the DNS name of the peer or
   107                 - the IPv4 address of the peer, in its numeric representation.
   123             - the IPv4 address of the peer, in its numeric representation.
   108 
   124 
   109                 If *addr* is just a host name without a port and *bind* is
   125             If *addr* is just a host name without a port and *bind* is
   110                 ``True``, the socket will be bound to a random port.
   126             ``True``, the socket will be bound to a random port.
   111         :param bind: Binds to *addr* if ``True`` or tries to connect to it
   127         :param bind: Binds to *addr* if ``True`` or tries to connect to it
   112                 otherwise.
   128             otherwise.
   113         :param callback: A callback for
   129         :param callback: A callback for
   114                 :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
   130             :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
   115         :param subscribe: Subscription pattern for *SUB* sockets, optional,
   131         :param subscribe: Subscription pattern for *SUB* sockets, optional,
   116                 defaults to ``b''``.
   132             defaults to ``b''``.
   117         :returns: A tuple containg the stream and the port number.
   133         :returns: A tuple containg the stream and the port number.
   118 
       
   119         """
   134         """
   120         sock = self.context.socket(sock_type)
   135         sock = self.context.socket(sock_type)
   121 
   136 
   122         # add server authenticator
   137         # add server authenticator
   123         if self.passwords:
   138         if self.passwords:
   136                 port = sock.bind_to_random_port('tcp://%s' % host)
   151                 port = sock.bind_to_random_port('tcp://%s' % host)
   137         else:
   152         else:
   138             sock.connect('tcp://%s:%s' % (host, port))
   153             sock.connect('tcp://%s:%s' % (host, port))
   139 
   154 
   140         # Add a default subscription for SUB sockets
   155         # Add a default subscription for SUB sockets
   141         if sock_type == zmq.SUB:
   156         if sock_type == zmq.SUB:  # pylint: disable=no-member
   142             sock.setsockopt(zmq.SUBSCRIBE, subscribe)
   157             sock.setsockopt(zmq.SUBSCRIBE, subscribe)  # pylint: disable=no-member
   143 
   158 
   144         # Create the stream and add the callback
   159         # Create the stream and add the callback
   145         stream = zmqstream.ZMQStream(sock, self.loop)
   160         stream = zmqstream.ZMQStream(sock, self.loop)
   146         if callback:
   161         if callback:
   147             stream.on_recv(callback)
   162             stream.on_recv(callback)
   148 
   163 
   149         return stream, int(port)
   164         return stream, int(port)
   150 
   165 
   151 
   166 
   152 def process_exit_func(process=None):
   167 def process_exit_func(process=None):
       
   168     """Process exit func is required to correctly end the child process"""
   153     if process is not None:
   169     if process is not None:
   154         if process.is_alive():
   170         if process.is_alive():
   155             process.terminate()
   171             process.terminate()
   156         process.join()
   172         process.join()