src/pyams_zmq/process.py
changeset 25 9352acb9366b
parent 22 05aaf0acf4fb
--- a/src/pyams_zmq/process.py	Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/process.py	Thu Nov 28 19:19:25 2019 +0100
@@ -10,29 +10,45 @@
 # FOR A PARTICULAR PURPOSE.
 #
 
-__docformat__ = 'restructuredtext'
+"""PyAMS_zmq.process module
+
+A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup
+to be able to handle incoming connections.
+
+To each process is attached a messages handler, which is responsible of doing the concrete
+messages handling.
+
+Process initialization arguments allows to define a list of client IP addresses which are
+allowed to connect to this process, as well as a login/password authentication tokens which
+must be provided to connect to this process.
+"""
 
 import multiprocessing
 import signal
 import sys
 
 import zmq
+from tornado import ioloop
 from zmq.auth.thread import ThreadAuthenticator
-from zmq.eventloop import ioloop, zmqstream
+from zmq.eventloop import zmqstream
 from zope.interface import implementer
 
 from pyams_utils.registry import get_global_registry
 from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent
 
 
+__docformat__ = 'restructuredtext'
+
+
 @implementer(IZMQProcess)
 class ZMQProcess(multiprocessing.Process):
+    # pylint: disable=too-many-instance-attributes
     """
     This is the base for all processes and offers utility methods
     for setup and creating new streams.
     """
 
-    socket_type = zmq.REP
+    socket_type = zmq.REP  # pylint: disable=no-member
     auth_thread = None
 
     def __init__(self, bind_addr, handler, auth=None, clients=None):
@@ -52,28 +68,27 @@
 
     def setup(self):
         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
-        ioloop.install()
         ctx = self.context = zmq.Context()
         auth = self.auth_thread = ThreadAuthenticator(ctx)
         auth.start()
         if self.clients:
-            auth.allow(*self.clients)
+            auth.allow(*self.clients)  # pylint: disable=not-an-iterable
         if self.passwords:
             auth.configure_plain(domain='*', passwords=self.passwords)
-        self.loop = ioloop.IOLoop.instance()
+        self.loop = ioloop.IOLoop.current()
         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
-        self.initStream()
+        self.init_stream()
 
-    def initStream(self):
+    def init_stream(self):
         """Initialize response stream"""
         self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
 
     def run(self):
-        """Sets up everything and starts the event loop."""
+        """Sets up everything and starts the event loop on process startup"""
         signal.signal(signal.SIGTERM, self.exit)
         self.setup()
         registry = get_global_registry()
-        registry.notify(ZMQProcessStartedEvent(self))
+        registry.notify(ZMQProcessStartedEvent(self))  # pylint: disable=no-member
         self.loop.start()
 
     def stop(self):
@@ -83,39 +98,39 @@
             self.loop = None
         self.auth_thread.stop()
 
-    def exit(self, num, frame):
+    def exit(self, num, frame):  # pylint: disable=unused-argument
+        """Process exit"""
         self.stop()
         sys.exit()
 
     def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
-        """
-        Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+        # pylint: disable=too-many-arguments
+        """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
 
         :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
         :param addr: Address to bind or connect to formatted as *host:port*,
-                *(host, port)* or *host* (bind to random port).
-                If *bind* is ``True``, *host* may be:
+            *(host, port)* or *host* (bind to random port).
+            If *bind* is ``True``, *host* may be:
 
-                - the wild-card ``*``, meaning all available interfaces,
-                - the primary IPv4 address assigned to the interface, in its
-                numeric representation or
-                - the interface name as defined by the operating system.
+            - the wild-card ``*``, meaning all available interfaces,
+            - the primary IPv4 address assigned to the interface, in its
+            numeric representation or
+            - the interface name as defined by the operating system.
 
-                If *bind* is ``False``, *host* may be:
+            If *bind* is ``False``, *host* may be:
 
-                - the DNS name of the peer or
-                - the IPv4 address of the peer, in its numeric representation.
+            - the DNS name of the peer or
+            - the IPv4 address of the peer, in its numeric representation.
 
-                If *addr* is just a host name without a port and *bind* is
-                ``True``, the socket will be bound to a random port.
+            If *addr* is just a host name without a port and *bind* is
+            ``True``, the socket will be bound to a random port.
         :param bind: Binds to *addr* if ``True`` or tries to connect to it
-                otherwise.
+            otherwise.
         :param callback: A callback for
-                :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+            :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
         :param subscribe: Subscription pattern for *SUB* sockets, optional,
-                defaults to ``b''``.
+            defaults to ``b''``.
         :returns: A tuple containg the stream and the port number.
-
         """
         sock = self.context.socket(sock_type)
 
@@ -138,8 +153,8 @@
             sock.connect('tcp://%s:%s' % (host, port))
 
         # Add a default subscription for SUB sockets
-        if sock_type == zmq.SUB:
-            sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+        if sock_type == zmq.SUB:  # pylint: disable=no-member
+            sock.setsockopt(zmq.SUBSCRIBE, subscribe)  # pylint: disable=no-member
 
         # Create the stream and add the callback
         stream = zmqstream.ZMQStream(sock, self.loop)
@@ -150,6 +165,7 @@
 
 
 def process_exit_func(process=None):
+    """Process exit func is required to correctly end the child process"""
     if process is not None:
         if process.is_alive():
             process.terminate()