8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
10 # FOR A PARTICULAR PURPOSE. |
10 # FOR A PARTICULAR PURPOSE. |
11 # |
11 # |
12 |
12 |
13 __docformat__ = 'restructuredtext' |
13 """PyAMS_zmq.process module |
|
14 |
|
15 A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup |
|
16 to be able to handle incoming connections. |
|
17 |
|
18 To each process is attached a messages handler, which is responsible of doing the concrete |
|
19 messages handling. |
|
20 |
|
21 Process initialization arguments allows to define a list of client IP addresses which are |
|
22 allowed to connect to this process, as well as a login/password authentication tokens which |
|
23 must be provided to connect to this process. |
|
24 """ |
14 |
25 |
15 import multiprocessing |
26 import multiprocessing |
16 import signal |
27 import signal |
17 import sys |
28 import sys |
18 |
29 |
19 import zmq |
30 import zmq |
|
31 from tornado import ioloop |
20 from zmq.auth.thread import ThreadAuthenticator |
32 from zmq.auth.thread import ThreadAuthenticator |
21 from zmq.eventloop import ioloop, zmqstream |
33 from zmq.eventloop import zmqstream |
22 from zope.interface import implementer |
34 from zope.interface import implementer |
23 |
35 |
24 from pyams_utils.registry import get_global_registry |
36 from pyams_utils.registry import get_global_registry |
25 from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent |
37 from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent |
26 |
38 |
27 |
39 |
|
40 __docformat__ = 'restructuredtext' |
|
41 |
|
42 |
28 @implementer(IZMQProcess) |
43 @implementer(IZMQProcess) |
29 class ZMQProcess(multiprocessing.Process): |
44 class ZMQProcess(multiprocessing.Process): |
|
45 # pylint: disable=too-many-instance-attributes |
30 """ |
46 """ |
31 This is the base for all processes and offers utility methods |
47 This is the base for all processes and offers utility methods |
32 for setup and creating new streams. |
48 for setup and creating new streams. |
33 """ |
49 """ |
34 |
50 |
35 socket_type = zmq.REP |
51 socket_type = zmq.REP # pylint: disable=no-member |
36 auth_thread = None |
52 auth_thread = None |
37 |
53 |
38 def __init__(self, bind_addr, handler, auth=None, clients=None): |
54 def __init__(self, bind_addr, handler, auth=None, clients=None): |
39 super(ZMQProcess, self).__init__() |
55 super(ZMQProcess, self).__init__() |
40 |
56 |
50 self.passwords = dict([auth.split(':', 1)]) if auth else None |
66 self.passwords = dict([auth.split(':', 1)]) if auth else None |
51 self.clients = clients.split() if clients else None |
67 self.clients = clients.split() if clients else None |
52 |
68 |
53 def setup(self): |
69 def setup(self): |
54 """Creates a :attr:`context` and an event :attr:`loop` for the process.""" |
70 """Creates a :attr:`context` and an event :attr:`loop` for the process.""" |
55 ioloop.install() |
|
56 ctx = self.context = zmq.Context() |
71 ctx = self.context = zmq.Context() |
57 auth = self.auth_thread = ThreadAuthenticator(ctx) |
72 auth = self.auth_thread = ThreadAuthenticator(ctx) |
58 auth.start() |
73 auth.start() |
59 if self.clients: |
74 if self.clients: |
60 auth.allow(*self.clients) |
75 auth.allow(*self.clients) # pylint: disable=not-an-iterable |
61 if self.passwords: |
76 if self.passwords: |
62 auth.configure_plain(domain='*', passwords=self.passwords) |
77 auth.configure_plain(domain='*', passwords=self.passwords) |
63 self.loop = ioloop.IOLoop.instance() |
78 self.loop = ioloop.IOLoop.current() |
64 self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) |
79 self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) |
65 self.initStream() |
80 self.init_stream() |
66 |
81 |
67 def initStream(self): |
82 def init_stream(self): |
68 """Initialize response stream""" |
83 """Initialize response stream""" |
69 self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) |
84 self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) |
70 |
85 |
71 def run(self): |
86 def run(self): |
72 """Sets up everything and starts the event loop.""" |
87 """Sets up everything and starts the event loop on process startup""" |
73 signal.signal(signal.SIGTERM, self.exit) |
88 signal.signal(signal.SIGTERM, self.exit) |
74 self.setup() |
89 self.setup() |
75 registry = get_global_registry() |
90 registry = get_global_registry() |
76 registry.notify(ZMQProcessStartedEvent(self)) |
91 registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member |
77 self.loop.start() |
92 self.loop.start() |
78 |
93 |
79 def stop(self): |
94 def stop(self): |
80 """Stops the event loop.""" |
95 """Stops the event loop.""" |
81 if self.loop is not None: |
96 if self.loop is not None: |
82 self.loop.stop() |
97 self.loop.stop() |
83 self.loop = None |
98 self.loop = None |
84 self.auth_thread.stop() |
99 self.auth_thread.stop() |
85 |
100 |
86 def exit(self, num, frame): |
101 def exit(self, num, frame): # pylint: disable=unused-argument |
|
102 """Process exit""" |
87 self.stop() |
103 self.stop() |
88 sys.exit() |
104 sys.exit() |
89 |
105 |
90 def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): |
106 def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): |
91 """ |
107 # pylint: disable=too-many-arguments |
92 Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. |
108 """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. |
93 |
109 |
94 :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) |
110 :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) |
95 :param addr: Address to bind or connect to formatted as *host:port*, |
111 :param addr: Address to bind or connect to formatted as *host:port*, |
96 *(host, port)* or *host* (bind to random port). |
112 *(host, port)* or *host* (bind to random port). |
97 If *bind* is ``True``, *host* may be: |
113 If *bind* is ``True``, *host* may be: |
98 |
114 |
99 - the wild-card ``*``, meaning all available interfaces, |
115 - the wild-card ``*``, meaning all available interfaces, |
100 - the primary IPv4 address assigned to the interface, in its |
116 - the primary IPv4 address assigned to the interface, in its |
101 numeric representation or |
117 numeric representation or |
102 - the interface name as defined by the operating system. |
118 - the interface name as defined by the operating system. |
103 |
119 |
104 If *bind* is ``False``, *host* may be: |
120 If *bind* is ``False``, *host* may be: |
105 |
121 |
106 - the DNS name of the peer or |
122 - the DNS name of the peer or |
107 - the IPv4 address of the peer, in its numeric representation. |
123 - the IPv4 address of the peer, in its numeric representation. |
108 |
124 |
109 If *addr* is just a host name without a port and *bind* is |
125 If *addr* is just a host name without a port and *bind* is |
110 ``True``, the socket will be bound to a random port. |
126 ``True``, the socket will be bound to a random port. |
111 :param bind: Binds to *addr* if ``True`` or tries to connect to it |
127 :param bind: Binds to *addr* if ``True`` or tries to connect to it |
112 otherwise. |
128 otherwise. |
113 :param callback: A callback for |
129 :param callback: A callback for |
114 :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional |
130 :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional |
115 :param subscribe: Subscription pattern for *SUB* sockets, optional, |
131 :param subscribe: Subscription pattern for *SUB* sockets, optional, |
116 defaults to ``b''``. |
132 defaults to ``b''``. |
117 :returns: A tuple containg the stream and the port number. |
133 :returns: A tuple containg the stream and the port number. |
118 |
|
119 """ |
134 """ |
120 sock = self.context.socket(sock_type) |
135 sock = self.context.socket(sock_type) |
121 |
136 |
122 # add server authenticator |
137 # add server authenticator |
123 if self.passwords: |
138 if self.passwords: |
136 port = sock.bind_to_random_port('tcp://%s' % host) |
151 port = sock.bind_to_random_port('tcp://%s' % host) |
137 else: |
152 else: |
138 sock.connect('tcp://%s:%s' % (host, port)) |
153 sock.connect('tcp://%s:%s' % (host, port)) |
139 |
154 |
140 # Add a default subscription for SUB sockets |
155 # Add a default subscription for SUB sockets |
141 if sock_type == zmq.SUB: |
156 if sock_type == zmq.SUB: # pylint: disable=no-member |
142 sock.setsockopt(zmq.SUBSCRIBE, subscribe) |
157 sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member |
143 |
158 |
144 # Create the stream and add the callback |
159 # Create the stream and add the callback |
145 stream = zmqstream.ZMQStream(sock, self.loop) |
160 stream = zmqstream.ZMQStream(sock, self.loop) |
146 if callback: |
161 if callback: |
147 stream.on_recv(callback) |
162 stream.on_recv(callback) |
148 |
163 |
149 return stream, int(port) |
164 return stream, int(port) |
150 |
165 |
151 |
166 |
152 def process_exit_func(process=None): |
167 def process_exit_func(process=None): |
|
168 """Process exit func is required to correctly end the child process""" |
153 if process is not None: |
169 if process is not None: |
154 if process.is_alive(): |
170 if process.is_alive(): |
155 process.terminate() |
171 process.terminate() |
156 process.join() |
172 process.join() |