diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/process.py --- a/src/pyams_zmq/process.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/process.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,29 +10,45 @@ # FOR A PARTICULAR PURPOSE. # -__docformat__ = 'restructuredtext' +"""PyAMS_zmq.process module + +A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup +to be able to handle incoming connections. + +To each process is attached a messages handler, which is responsible of doing the concrete +messages handling. + +Process initialization arguments allows to define a list of client IP addresses which are +allowed to connect to this process, as well as a login/password authentication tokens which +must be provided to connect to this process. +""" import multiprocessing import signal import sys import zmq +from tornado import ioloop from zmq.auth.thread import ThreadAuthenticator -from zmq.eventloop import ioloop, zmqstream +from zmq.eventloop import zmqstream from zope.interface import implementer from pyams_utils.registry import get_global_registry from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent +__docformat__ = 'restructuredtext' + + @implementer(IZMQProcess) class ZMQProcess(multiprocessing.Process): + # pylint: disable=too-many-instance-attributes """ This is the base for all processes and offers utility methods for setup and creating new streams. """ - socket_type = zmq.REP + socket_type = zmq.REP # pylint: disable=no-member auth_thread = None def __init__(self, bind_addr, handler, auth=None, clients=None): @@ -52,28 +68,27 @@ def setup(self): """Creates a :attr:`context` and an event :attr:`loop` for the process.""" - ioloop.install() ctx = self.context = zmq.Context() auth = self.auth_thread = ThreadAuthenticator(ctx) auth.start() if self.clients: - auth.allow(*self.clients) + auth.allow(*self.clients) # pylint: disable=not-an-iterable if self.passwords: auth.configure_plain(domain='*', passwords=self.passwords) - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) - self.initStream() + self.init_stream() - def initStream(self): + def init_stream(self): """Initialize response stream""" self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) def run(self): - """Sets up everything and starts the event loop.""" + """Sets up everything and starts the event loop on process startup""" signal.signal(signal.SIGTERM, self.exit) self.setup() registry = get_global_registry() - registry.notify(ZMQProcessStartedEvent(self)) + registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member self.loop.start() def stop(self): @@ -83,39 +98,39 @@ self.loop = None self.auth_thread.stop() - def exit(self, num, frame): + def exit(self, num, frame): # pylint: disable=unused-argument + """Process exit""" self.stop() sys.exit() def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): - """ - Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. + # pylint: disable=too-many-arguments + """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) :param addr: Address to bind or connect to formatted as *host:port*, - *(host, port)* or *host* (bind to random port). - If *bind* is ``True``, *host* may be: + *(host, port)* or *host* (bind to random port). + If *bind* is ``True``, *host* may be: - - the wild-card ``*``, meaning all available interfaces, - - the primary IPv4 address assigned to the interface, in its - numeric representation or - - the interface name as defined by the operating system. + - the wild-card ``*``, meaning all available interfaces, + - the primary IPv4 address assigned to the interface, in its + numeric representation or + - the interface name as defined by the operating system. - If *bind* is ``False``, *host* may be: + If *bind* is ``False``, *host* may be: - - the DNS name of the peer or - - the IPv4 address of the peer, in its numeric representation. + - the DNS name of the peer or + - the IPv4 address of the peer, in its numeric representation. - If *addr* is just a host name without a port and *bind* is - ``True``, the socket will be bound to a random port. + If *addr* is just a host name without a port and *bind* is + ``True``, the socket will be bound to a random port. :param bind: Binds to *addr* if ``True`` or tries to connect to it - otherwise. + otherwise. :param callback: A callback for - :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional + :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional :param subscribe: Subscription pattern for *SUB* sockets, optional, - defaults to ``b''``. + defaults to ``b''``. :returns: A tuple containg the stream and the port number. - """ sock = self.context.socket(sock_type) @@ -138,8 +153,8 @@ sock.connect('tcp://%s:%s' % (host, port)) # Add a default subscription for SUB sockets - if sock_type == zmq.SUB: - sock.setsockopt(zmq.SUBSCRIBE, subscribe) + if sock_type == zmq.SUB: # pylint: disable=no-member + sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member # Create the stream and add the callback stream = zmqstream.ZMQStream(sock, self.loop) @@ -150,6 +165,7 @@ def process_exit_func(process=None): + """Process exit func is required to correctly end the child process""" if process is not None: if process.is_alive(): process.terminate()