diff -r 000000000000 -r 11f0f97d508f src/pyams_zmq/process.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pyams_zmq/process.py Wed Mar 04 22:40:07 2015 +0100 @@ -0,0 +1,141 @@ +# +# Copyright (c) 2008-2015 Thierry Florac +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# + +__docformat__ = 'restructuredtext' + + +# import standard library +import multiprocessing +import signal +import sys +import zmq +from zmq.eventloop import ioloop, zmqstream + +# import interfaces +from pyams_zmq.interfaces import IZMQProcess + +# import packages +from zope.interface import implementer + + +@implementer(IZMQProcess) +class ZMQProcess(multiprocessing.Process): + """ + This is the base for all processes and offers utility methods + for setup and creating new streams. + """ + + socket_type = zmq.REP + + def __init__(self, bind_addr, handler): + super(ZMQProcess, self).__init__() + + self.context = None + """The ØMQ :class:`~zmq.Context` instance.""" + + self.loop = None + """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" + + self.bind_addr = bind_addr + self.rep_stream = None + self.handler = handler + + def setup(self): + """Creates a :attr:`context` and an event :attr:`loop` for the process.""" + self.context = zmq.Context() + self.loop = ioloop.IOLoop.instance() + self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) + self.initStream() + + def initStream(self): + """Initialize response stream""" + self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) + + def run(self): + """Sets up everything and starts the event loop.""" + signal.signal(signal.SIGTERM, self.exit) + self.setup() + self.loop.start() + + def stop(self): + """Stops the event loop.""" + if self.loop is not None: + self.loop.stop() + self.loop = None + + def exit(self, num, frame): + self.stop() + sys.exit() + + def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): + """ + Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. + + :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) + :param addr: Address to bind or connect to formatted as *host:port*, + *(host, port)* or *host* (bind to random port). + If *bind* is ``True``, *host* may be: + + - the wild-card ``*``, meaning all available interfaces, + - the primary IPv4 address assigned to the interface, in its + numeric representation or + - the interface name as defined by the operating system. + + If *bind* is ``False``, *host* may be: + + - the DNS name of the peer or + - the IPv4 address of the peer, in its numeric representation. + + If *addr* is just a host name without a port and *bind* is + ``True``, the socket will be bound to a random port. + :param bind: Binds to *addr* if ``True`` or tries to connect to it + otherwise. + :param callback: A callback for + :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional + :param subscribe: Subscription pattern for *SUB* sockets, optional, + defaults to ``b''``. + :returns: A tuple containg the stream and the port number. + + """ + sock = self.context.socket(sock_type) + + # addr may be 'host:port' or ('host', port) + if isinstance(addr, str): + addr = addr.split(':') + host, port = addr if len(addr) == 2 else (addr[0], None) + + # Bind/connect the socket + if bind: + if port: + sock.bind('tcp://%s:%s' % (host, port)) + else: + port = sock.bind_to_random_port('tcp://%s' % host) + else: + sock.connect('tcp://%s:%s' % (host, port)) + + # Add a default subscription for SUB sockets + if sock_type == zmq.SUB: + sock.setsockopt(zmq.SUBSCRIBE, subscribe) + + # Create the stream and add the callback + stream = zmqstream.ZMQStream(sock, self.loop) + if callback: + stream.on_recv(callback) + + return stream, int(port) + + +def process_exit_func(process=None): + if process is not None: + if process.is_alive(): + process.terminate() + process.join()