--- a/src/pyams_zmq/process.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/process.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,29 +10,45 @@
# FOR A PARTICULAR PURPOSE.
#
-__docformat__ = 'restructuredtext'
+"""PyAMS_zmq.process module
+
+A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup
+to be able to handle incoming connections.
+
+To each process is attached a messages handler, which is responsible of doing the concrete
+messages handling.
+
+Process initialization arguments allows to define a list of client IP addresses which are
+allowed to connect to this process, as well as a login/password authentication tokens which
+must be provided to connect to this process.
+"""
import multiprocessing
import signal
import sys
import zmq
+from tornado import ioloop
from zmq.auth.thread import ThreadAuthenticator
-from zmq.eventloop import ioloop, zmqstream
+from zmq.eventloop import zmqstream
from zope.interface import implementer
from pyams_utils.registry import get_global_registry
from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent
+__docformat__ = 'restructuredtext'
+
+
@implementer(IZMQProcess)
class ZMQProcess(multiprocessing.Process):
+ # pylint: disable=too-many-instance-attributes
"""
This is the base for all processes and offers utility methods
for setup and creating new streams.
"""
- socket_type = zmq.REP
+ socket_type = zmq.REP # pylint: disable=no-member
auth_thread = None
def __init__(self, bind_addr, handler, auth=None, clients=None):
@@ -52,28 +68,27 @@
def setup(self):
"""Creates a :attr:`context` and an event :attr:`loop` for the process."""
- ioloop.install()
ctx = self.context = zmq.Context()
auth = self.auth_thread = ThreadAuthenticator(ctx)
auth.start()
if self.clients:
- auth.allow(*self.clients)
+ auth.allow(*self.clients) # pylint: disable=not-an-iterable
if self.passwords:
auth.configure_plain(domain='*', passwords=self.passwords)
- self.loop = ioloop.IOLoop.instance()
+ self.loop = ioloop.IOLoop.current()
self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
- self.initStream()
+ self.init_stream()
- def initStream(self):
+ def init_stream(self):
"""Initialize response stream"""
self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
def run(self):
- """Sets up everything and starts the event loop."""
+ """Sets up everything and starts the event loop on process startup"""
signal.signal(signal.SIGTERM, self.exit)
self.setup()
registry = get_global_registry()
- registry.notify(ZMQProcessStartedEvent(self))
+ registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member
self.loop.start()
def stop(self):
@@ -83,39 +98,39 @@
self.loop = None
self.auth_thread.stop()
- def exit(self, num, frame):
+ def exit(self, num, frame): # pylint: disable=unused-argument
+ """Process exit"""
self.stop()
sys.exit()
def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
- """
- Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+ # pylint: disable=too-many-arguments
+ """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
:param addr: Address to bind or connect to formatted as *host:port*,
- *(host, port)* or *host* (bind to random port).
- If *bind* is ``True``, *host* may be:
+ *(host, port)* or *host* (bind to random port).
+ If *bind* is ``True``, *host* may be:
- - the wild-card ``*``, meaning all available interfaces,
- - the primary IPv4 address assigned to the interface, in its
- numeric representation or
- - the interface name as defined by the operating system.
+ - the wild-card ``*``, meaning all available interfaces,
+ - the primary IPv4 address assigned to the interface, in its
+ numeric representation or
+ - the interface name as defined by the operating system.
- If *bind* is ``False``, *host* may be:
+ If *bind* is ``False``, *host* may be:
- - the DNS name of the peer or
- - the IPv4 address of the peer, in its numeric representation.
+ - the DNS name of the peer or
+ - the IPv4 address of the peer, in its numeric representation.
- If *addr* is just a host name without a port and *bind* is
- ``True``, the socket will be bound to a random port.
+ If *addr* is just a host name without a port and *bind* is
+ ``True``, the socket will be bound to a random port.
:param bind: Binds to *addr* if ``True`` or tries to connect to it
- otherwise.
+ otherwise.
:param callback: A callback for
- :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+ :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
:param subscribe: Subscription pattern for *SUB* sockets, optional,
- defaults to ``b''``.
+ defaults to ``b''``.
:returns: A tuple containg the stream and the port number.
-
"""
sock = self.context.socket(sock_type)
@@ -138,8 +153,8 @@
sock.connect('tcp://%s:%s' % (host, port))
# Add a default subscription for SUB sockets
- if sock_type == zmq.SUB:
- sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+ if sock_type == zmq.SUB: # pylint: disable=no-member
+ sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member
# Create the stream and add the callback
stream = zmqstream.ZMQStream(sock, self.loop)
@@ -150,6 +165,7 @@
def process_exit_func(process=None):
+ """Process exit func is required to correctly end the child process"""
if process is not None:
if process.is_alive():
process.terminate()