src/pyams_zmq/process.py
changeset 13 839b61e1531a
parent 4 d624312bfc2b
child 18 cd5a88ba2223
equal deleted inserted replaced
12:6c0dceb71fb1 13:839b61e1531a
    21 
    21 
    22 # import interfaces
    22 # import interfaces
    23 from pyams_zmq.interfaces import IZMQProcess
    23 from pyams_zmq.interfaces import IZMQProcess
    24 
    24 
    25 # import packages
    25 # import packages
       
    26 from zmq.auth.thread import ThreadAuthenticator
    26 from zmq.eventloop import ioloop, zmqstream
    27 from zmq.eventloop import ioloop, zmqstream
    27 from zope.interface import implementer
    28 from zope.interface import implementer
    28 
    29 
    29 
    30 
    30 @implementer(IZMQProcess)
    31 @implementer(IZMQProcess)
    33     This is the base for all processes and offers utility methods
    34     This is the base for all processes and offers utility methods
    34     for setup and creating new streams.
    35     for setup and creating new streams.
    35     """
    36     """
    36 
    37 
    37     socket_type = zmq.REP
    38     socket_type = zmq.REP
       
    39     auth_thread = None
    38 
    40 
    39     def __init__(self, bind_addr, handler):
    41     def __init__(self, bind_addr, handler, auth=None, clients=None):
    40         super(ZMQProcess, self).__init__()
    42         super(ZMQProcess, self).__init__()
    41 
    43 
    42         self.context = None
    44         self.context = None
    43         """The ØMQ :class:`~zmq.Context` instance."""
    45         """The ØMQ :class:`~zmq.Context` instance."""
    44 
    46 
    46         """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
    48         """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
    47 
    49 
    48         self.bind_addr = bind_addr
    50         self.bind_addr = bind_addr
    49         self.rep_stream = None
    51         self.rep_stream = None
    50         self.handler = handler
    52         self.handler = handler
       
    53         self.passwords = dict([auth.split(':', 1)]) if auth else None
       
    54         self.clients = clients.split() if clients else None
    51 
    55 
    52     def setup(self):
    56     def setup(self):
    53         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
    57         """Creates a :attr:`context` and an event :attr:`loop` for the process."""
    54         self.context = zmq.Context()
    58         ctx = self.context = zmq.Context()
       
    59         auth = self.auth_thread = ThreadAuthenticator(ctx)
       
    60         auth.start()
       
    61         if self.clients:
       
    62             auth.allow(*self.clients)
       
    63         if self.passwords:
       
    64             auth.configure_plain(domain='*', passwords=self.passwords)
    55         self.loop = ioloop.IOLoop.instance()
    65         self.loop = ioloop.IOLoop.instance()
    56         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
    66         self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
    57         self.initStream()
    67         self.initStream()
    58 
    68 
    59     def initStream(self):
    69     def initStream(self):
    69     def stop(self):
    79     def stop(self):
    70         """Stops the event loop."""
    80         """Stops the event loop."""
    71         if self.loop is not None:
    81         if self.loop is not None:
    72             self.loop.stop()
    82             self.loop.stop()
    73             self.loop = None
    83             self.loop = None
       
    84         self.auth_thread.stop()
    74 
    85 
    75     def exit(self, num, frame):
    86     def exit(self, num, frame):
    76         self.stop()
    87         self.stop()
    77         sys.exit()
    88         sys.exit()
    78 
    89 
   106         :returns: A tuple containg the stream and the port number.
   117         :returns: A tuple containg the stream and the port number.
   107 
   118 
   108         """
   119         """
   109         sock = self.context.socket(sock_type)
   120         sock = self.context.socket(sock_type)
   110 
   121 
       
   122         # add server authenticator
       
   123         if self.passwords:
       
   124             sock.plain_server = True
       
   125 
   111         # addr may be 'host:port' or ('host', port)
   126         # addr may be 'host:port' or ('host', port)
   112         if isinstance(addr, str):
   127         if isinstance(addr, str):
   113             addr = addr.split(':')
   128             addr = addr.split(':')
   114         host, port = addr if len(addr) == 2 else (addr[0], None)
   129         host, port = addr if len(addr) == 2 else (addr[0], None)
   115 
   130