--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_zmq/process.py Wed Mar 04 22:40:07 2015 +0100
@@ -0,0 +1,141 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import multiprocessing
+import signal
+import sys
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+# import interfaces
+from pyams_zmq.interfaces import IZMQProcess
+
+# import packages
+from zope.interface import implementer
+
+
+@implementer(IZMQProcess)
+class ZMQProcess(multiprocessing.Process):
+ """
+ This is the base for all processes and offers utility methods
+ for setup and creating new streams.
+ """
+
+ socket_type = zmq.REP
+
+ def __init__(self, bind_addr, handler):
+ super(ZMQProcess, self).__init__()
+
+ self.context = None
+ """The ØMQ :class:`~zmq.Context` instance."""
+
+ self.loop = None
+ """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
+
+ self.bind_addr = bind_addr
+ self.rep_stream = None
+ self.handler = handler
+
+ def setup(self):
+ """Creates a :attr:`context` and an event :attr:`loop` for the process."""
+ self.context = zmq.Context()
+ self.loop = ioloop.IOLoop.instance()
+ self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
+ self.initStream()
+
+ def initStream(self):
+ """Initialize response stream"""
+ self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
+
+ def run(self):
+ """Sets up everything and starts the event loop."""
+ signal.signal(signal.SIGTERM, self.exit)
+ self.setup()
+ self.loop.start()
+
+ def stop(self):
+ """Stops the event loop."""
+ if self.loop is not None:
+ self.loop.stop()
+ self.loop = None
+
+ def exit(self, num, frame):
+ self.stop()
+ sys.exit()
+
+ def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
+ """
+ Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+
+ :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
+ :param addr: Address to bind or connect to formatted as *host:port*,
+ *(host, port)* or *host* (bind to random port).
+ If *bind* is ``True``, *host* may be:
+
+ - the wild-card ``*``, meaning all available interfaces,
+ - the primary IPv4 address assigned to the interface, in its
+ numeric representation or
+ - the interface name as defined by the operating system.
+
+ If *bind* is ``False``, *host* may be:
+
+ - the DNS name of the peer or
+ - the IPv4 address of the peer, in its numeric representation.
+
+ If *addr* is just a host name without a port and *bind* is
+ ``True``, the socket will be bound to a random port.
+ :param bind: Binds to *addr* if ``True`` or tries to connect to it
+ otherwise.
+ :param callback: A callback for
+ :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+ :param subscribe: Subscription pattern for *SUB* sockets, optional,
+ defaults to ``b''``.
+ :returns: A tuple containg the stream and the port number.
+
+ """
+ sock = self.context.socket(sock_type)
+
+ # addr may be 'host:port' or ('host', port)
+ if isinstance(addr, str):
+ addr = addr.split(':')
+ host, port = addr if len(addr) == 2 else (addr[0], None)
+
+ # Bind/connect the socket
+ if bind:
+ if port:
+ sock.bind('tcp://%s:%s' % (host, port))
+ else:
+ port = sock.bind_to_random_port('tcp://%s' % host)
+ else:
+ sock.connect('tcp://%s:%s' % (host, port))
+
+ # Add a default subscription for SUB sockets
+ if sock_type == zmq.SUB:
+ sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+
+ # Create the stream and add the callback
+ stream = zmqstream.ZMQStream(sock, self.loop)
+ if callback:
+ stream.on_recv(callback)
+
+ return stream, int(port)
+
+
+def process_exit_func(process=None):
+ if process is not None:
+ if process.is_alive():
+ process.terminate()
+ process.join()