|
1 # |
|
2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net> |
|
3 # All Rights Reserved. |
|
4 # |
|
5 # This software is subject to the provisions of the Zope Public License, |
|
6 # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|
10 # FOR A PARTICULAR PURPOSE. |
|
11 # |
|
12 |
|
13 __docformat__ = 'restructuredtext' |
|
14 |
|
15 |
|
16 # import standard library |
|
17 import multiprocessing |
|
18 import signal |
|
19 import sys |
|
20 import zmq |
|
21 from zmq.eventloop import ioloop, zmqstream |
|
22 |
|
23 # import interfaces |
|
24 from pyams_zmq.interfaces import IZMQProcess |
|
25 |
|
26 # import packages |
|
27 from zope.interface import implementer |
|
28 |
|
29 |
|
30 @implementer(IZMQProcess) |
|
31 class ZMQProcess(multiprocessing.Process): |
|
32 """ |
|
33 This is the base for all processes and offers utility methods |
|
34 for setup and creating new streams. |
|
35 """ |
|
36 |
|
37 socket_type = zmq.REP |
|
38 |
|
39 def __init__(self, bind_addr, handler): |
|
40 super(ZMQProcess, self).__init__() |
|
41 |
|
42 self.context = None |
|
43 """The ØMQ :class:`~zmq.Context` instance.""" |
|
44 |
|
45 self.loop = None |
|
46 """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" |
|
47 |
|
48 self.bind_addr = bind_addr |
|
49 self.rep_stream = None |
|
50 self.handler = handler |
|
51 |
|
52 def setup(self): |
|
53 """Creates a :attr:`context` and an event :attr:`loop` for the process.""" |
|
54 self.context = zmq.Context() |
|
55 self.loop = ioloop.IOLoop.instance() |
|
56 self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) |
|
57 self.initStream() |
|
58 |
|
59 def initStream(self): |
|
60 """Initialize response stream""" |
|
61 self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) |
|
62 |
|
63 def run(self): |
|
64 """Sets up everything and starts the event loop.""" |
|
65 signal.signal(signal.SIGTERM, self.exit) |
|
66 self.setup() |
|
67 self.loop.start() |
|
68 |
|
69 def stop(self): |
|
70 """Stops the event loop.""" |
|
71 if self.loop is not None: |
|
72 self.loop.stop() |
|
73 self.loop = None |
|
74 |
|
75 def exit(self, num, frame): |
|
76 self.stop() |
|
77 sys.exit() |
|
78 |
|
79 def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): |
|
80 """ |
|
81 Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. |
|
82 |
|
83 :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) |
|
84 :param addr: Address to bind or connect to formatted as *host:port*, |
|
85 *(host, port)* or *host* (bind to random port). |
|
86 If *bind* is ``True``, *host* may be: |
|
87 |
|
88 - the wild-card ``*``, meaning all available interfaces, |
|
89 - the primary IPv4 address assigned to the interface, in its |
|
90 numeric representation or |
|
91 - the interface name as defined by the operating system. |
|
92 |
|
93 If *bind* is ``False``, *host* may be: |
|
94 |
|
95 - the DNS name of the peer or |
|
96 - the IPv4 address of the peer, in its numeric representation. |
|
97 |
|
98 If *addr* is just a host name without a port and *bind* is |
|
99 ``True``, the socket will be bound to a random port. |
|
100 :param bind: Binds to *addr* if ``True`` or tries to connect to it |
|
101 otherwise. |
|
102 :param callback: A callback for |
|
103 :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional |
|
104 :param subscribe: Subscription pattern for *SUB* sockets, optional, |
|
105 defaults to ``b''``. |
|
106 :returns: A tuple containg the stream and the port number. |
|
107 |
|
108 """ |
|
109 sock = self.context.socket(sock_type) |
|
110 |
|
111 # addr may be 'host:port' or ('host', port) |
|
112 if isinstance(addr, str): |
|
113 addr = addr.split(':') |
|
114 host, port = addr if len(addr) == 2 else (addr[0], None) |
|
115 |
|
116 # Bind/connect the socket |
|
117 if bind: |
|
118 if port: |
|
119 sock.bind('tcp://%s:%s' % (host, port)) |
|
120 else: |
|
121 port = sock.bind_to_random_port('tcp://%s' % host) |
|
122 else: |
|
123 sock.connect('tcp://%s:%s' % (host, port)) |
|
124 |
|
125 # Add a default subscription for SUB sockets |
|
126 if sock_type == zmq.SUB: |
|
127 sock.setsockopt(zmq.SUBSCRIBE, subscribe) |
|
128 |
|
129 # Create the stream and add the callback |
|
130 stream = zmqstream.ZMQStream(sock, self.loop) |
|
131 if callback: |
|
132 stream.on_recv(callback) |
|
133 |
|
134 return stream, int(port) |
|
135 |
|
136 |
|
137 def process_exit_func(process=None): |
|
138 if process is not None: |
|
139 if process.is_alive(): |
|
140 process.terminate() |
|
141 process.join() |