equal
deleted
inserted
replaced
21 |
21 |
22 # import interfaces |
22 # import interfaces |
23 from pyams_zmq.interfaces import IZMQProcess |
23 from pyams_zmq.interfaces import IZMQProcess |
24 |
24 |
25 # import packages |
25 # import packages |
|
26 from zmq.auth.thread import ThreadAuthenticator |
26 from zmq.eventloop import ioloop, zmqstream |
27 from zmq.eventloop import ioloop, zmqstream |
27 from zope.interface import implementer |
28 from zope.interface import implementer |
28 |
29 |
29 |
30 |
30 @implementer(IZMQProcess) |
31 @implementer(IZMQProcess) |
33 This is the base for all processes and offers utility methods |
34 This is the base for all processes and offers utility methods |
34 for setup and creating new streams. |
35 for setup and creating new streams. |
35 """ |
36 """ |
36 |
37 |
37 socket_type = zmq.REP |
38 socket_type = zmq.REP |
|
39 auth_thread = None |
38 |
40 |
39 def __init__(self, bind_addr, handler): |
41 def __init__(self, bind_addr, handler, auth=None, clients=None): |
40 super(ZMQProcess, self).__init__() |
42 super(ZMQProcess, self).__init__() |
41 |
43 |
42 self.context = None |
44 self.context = None |
43 """The ØMQ :class:`~zmq.Context` instance.""" |
45 """The ØMQ :class:`~zmq.Context` instance.""" |
44 |
46 |
46 """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" |
48 """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" |
47 |
49 |
48 self.bind_addr = bind_addr |
50 self.bind_addr = bind_addr |
49 self.rep_stream = None |
51 self.rep_stream = None |
50 self.handler = handler |
52 self.handler = handler |
|
53 self.passwords = dict([auth.split(':', 1)]) if auth else None |
|
54 self.clients = clients.split() if clients else None |
51 |
55 |
52 def setup(self): |
56 def setup(self): |
53 """Creates a :attr:`context` and an event :attr:`loop` for the process.""" |
57 """Creates a :attr:`context` and an event :attr:`loop` for the process.""" |
54 self.context = zmq.Context() |
58 ctx = self.context = zmq.Context() |
|
59 auth = self.auth_thread = ThreadAuthenticator(ctx) |
|
60 auth.start() |
|
61 if self.clients: |
|
62 auth.allow(*self.clients) |
|
63 if self.passwords: |
|
64 auth.configure_plain(domain='*', passwords=self.passwords) |
55 self.loop = ioloop.IOLoop.instance() |
65 self.loop = ioloop.IOLoop.instance() |
56 self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) |
66 self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) |
57 self.initStream() |
67 self.initStream() |
58 |
68 |
59 def initStream(self): |
69 def initStream(self): |
69 def stop(self): |
79 def stop(self): |
70 """Stops the event loop.""" |
80 """Stops the event loop.""" |
71 if self.loop is not None: |
81 if self.loop is not None: |
72 self.loop.stop() |
82 self.loop.stop() |
73 self.loop = None |
83 self.loop = None |
|
84 self.auth_thread.stop() |
74 |
85 |
75 def exit(self, num, frame): |
86 def exit(self, num, frame): |
76 self.stop() |
87 self.stop() |
77 sys.exit() |
88 sys.exit() |
78 |
89 |
106 :returns: A tuple containg the stream and the port number. |
117 :returns: A tuple containg the stream and the port number. |
107 |
118 |
108 """ |
119 """ |
109 sock = self.context.socket(sock_type) |
120 sock = self.context.socket(sock_type) |
110 |
121 |
|
122 # add server authenticator |
|
123 if self.passwords: |
|
124 sock.plain_server = True |
|
125 |
111 # addr may be 'host:port' or ('host', port) |
126 # addr may be 'host:port' or ('host', port) |
112 if isinstance(addr, str): |
127 if isinstance(addr, str): |
113 addr = addr.split(':') |
128 addr = addr.split(':') |
114 host, port = addr if len(addr) == 2 else (addr[0], None) |
129 host, port = addr if len(addr) == 2 else (addr[0], None) |
115 |
130 |