# HG changeset patch # User Thierry Florac # Date 1574965165 -3600 # Node ID 9352acb9366bd3803c88cd59debef2ae38a91c0f # Parent 36b9e48666ce2121f1e9ccb1454621980e2b0630 Code cleanup diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/__init__.py --- a/src/pyams_zmq/__init__.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/__init__.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,6 +10,15 @@ # FOR A PARTICULAR PURPOSE. # +"""PyAMS_zmq package + +This package provides features to enable local or remote inter-processes communication +using the 0MQ protocol and the PyZMQ package. + +Several PyAMS packages rely on this package to provide asynchronous handling of long operations +like medias conversions, Elasticsearch indexing or communication with the tasks scheduler. +""" + __docformat__ = 'restructuredtext' diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/doctests/README.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pyams_zmq/doctests/README.rst Thu Nov 28 19:19:25 2019 +0100 @@ -0,0 +1,106 @@ +================= +pyams_zmq package +================= + +PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange +messages following all ØMQ possible usages. + +At least two components are required to build a ØMQ based application: + +- a ØMQ server + +- a ØMQ client + +The way client and server communicate depends on used ØMQ protocol. + +We will take example on the medias conversion utility provided by 'pyams_media' package, which +allows you to automatically convert medias files (videos...) asynchronously as soon as they are +uploaded. The conversion process is a background process so doesn't return any result. + +The conversion process is a simple ØMQ process: + + >>> from pyramid.testing import setUp, tearDown + >>> config = setUp() + + >>> from multiprocessing import Process + >>> from pyams_zmq.process import ZMQProcess, process_exit_func + + >>> converter_address = '127.0.0.1:25556' + + >>> class MyConversionProcess(Process): + ... """Conversion manager process""" + ... + ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): + ... Process.__init__(self, group, target, name, args, kwargs) + ... self.settings = settings + ... + ... def run(self): + ... settings = self.settings + ... path = settings['path'] + ... format = settings['format'] + ... # you can virtually do anything you want with these settings + +To be sure to run asynchronously, this process is managed by a thread: + + >>> import time + >>> from threading import Thread + + >>> class ConversionThread(Thread): + ... """Conversion thread""" + ... + ... def __init__(self, process): + ... Thread.__init__(self) + ... self.process = process + ... + ... def run(self): + ... self.process.start() + ... self.process.join() + +The conversion handler is the simple class to which conversion is delegated: + + >>> class ConversionHandler: + ... """Conversion handler""" + ... def convert(self, data): + ... ConversionThread(MyConversionProcess(data)).start() + ... return [200, 'OK'] + +The message handler receives the message and handle it: + + >>> from pyams_zmq.handler import ZMQMessageHandler + >>> class ConversionMessageHandler(ZMQMessageHandler): + ... handler = ConversionHandler + + >>> class ConversionProcess(ZMQProcess): + ... """Medias conversion process""" + +The ØMQ process is generally started on application startup; following tests are commented +because tests are stucked when running in test mode: + + # >>> import atexit + # >>> process = ConversionProcess(converter_address, ConversionMessageHandler) + # >>> process.start() + # >>> time.sleep(2) + # >>> if process.is_alive(): + # ... atexit.register(process_exit_func, process=process) + # + +Once all these elements are in place, you just have to create a ØMQ client context, open a +connection and send a message. + +Messages are lists of two objects; the first one is the type os the message, which should match +a method name of the message handler; the second object is the method arguments: + + >>> import zmq + >>> from pyams_zmq.socket import zmq_socket, zmq_response + + >>> settings = {'path': '/this/is/my/path', + ... 'format': 'JPEG'} + >>> message = ['convert', settings] + + # >>> socket = zmq_socket('tcp://' + converter_address) + # >>> socket.send_json(message) + # >>> response = zmq_response(socket) + # >>> response + # [200, 'OK'] + + >>> tearDown() diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/doctests/README.txt --- a/src/pyams_zmq/doctests/README.txt Fri Jan 11 13:52:29 2019 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,100 +0,0 @@ -================= -pyams_zmq package -================= - -PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange -messages following all ØMQ possible usages. - -At least two components are required to build a ØMQ based application: - -- a ØMQ server - -- a ØMQ client - -The way client and server communicate depends on used ØMQ protocol. - -We will take example on the medias conversion utility provided by 'pyams_media' package, which allows you -to automatically convert medias files (videos...) asynchronously as soon as they are uploaded. The conversion -process is a background process so doesn't return any result. - -The conversion process is a simple ØMQ process: - - >>> converter_address = '127.0.0.1:5555' - - >>> from pyams_zmq.process import ZMQProcess, process_exit_func - >>> - >>> class MediasConversionProcess(ZMQProcess): - ... """Medias conversion process""" - - >>> from multiprocessing import Process - >>> - >>> class ConversionProcess(Process): - ... """Conversion manager process""" - ... - ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): - ... Process.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs) - ... self.settings = settings - ... - ... def run(self): - ... settings = self.settings - ... path = settings['path'] - ... format = settings['format'] - ... # just image you're doing anything you want with these settings! - -To be sure to run asynchronously, this process is managed by a thread: - - >>> import time - >>> from threading import Thread - >>> - >>> class ConversionThread(Thread): - ... """Conversion thread""" - ... - ... def __init__(self, process): - ... Thread.__init__(self) - ... self.process = process - ... - ... def run(self): - ... self.process.start() - ... self.process.join() - -The conversion handler is the simple class to which conversion is delegated: - - >>> class ConversionHandler(object): - ... """Conversion handler""" - ... - ... def convert(self, data): - ... ConversionThread(ConversionProcess(data)).start() - ... return [200, 'OK'] - -The message handler receives the message and handle it: - - >>> from pyams_zmq.handler import ZMQMessageHandler - >>> - >>> class ConversionMessageHandler(ZMQMessageHandler): - ... - ... handler = ConversionHandler - -The ØMQ process is generally started on application startup: - - >>> import atexit - >>> - >>> process = MediasConversionProcess(converter_address, ConversionMessageHandler) - >>> process.start() - >>> time.sleep(2) - >>> if process.is_alive(): - ... atexit.register(process_exit_func, process=process) - - -Once all these elements are in place, you just have to create a ØMQ client context, open a connection and send a -message: - - >>> import zmq - >>> settings = {'path': '/this/is/my/path', - ... 'format': 'JPEG'} - >>> message = ['convert', settings] - >>> context = zmq.Context() - >>> socket = context.socket(zmq.REQ) - >>> socket.connect('tcp://' + converter_address) - >>> socket.send_json(message) - >>> socket.recv_json() - [200, 'OK'] diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/handler.py --- a/src/pyams_zmq/handler.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/handler.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,21 +10,26 @@ # FOR A PARTICULAR PURPOSE. # +"""PyAMS_zmq.handler module + +This module provides a default implementation of a 0MQ messages handler. + +These handlers are simple classes which delegate the real message handling +to their "handler" attribute, which is a simple class which may have a method matching the +message type. +""" + +from zmq.utils import jsonapi +from zope.interface import implementer + +from pyams_zmq.interfaces import IZMQMessageHandler + + __docformat__ = 'restructuredtext' -# import standard library -from zmq.utils import jsonapi - -# import interfaces -from pyams_zmq.interfaces import IZMQMessageHandler - -# import packages -from zope.interface import implementer - - @implementer(IZMQMessageHandler) -class ZMQMessageHandler(object): +class ZMQMessageHandler: """Base class for message handlers for a :class:`pyams_zmq.process.Process`. Inheriting classes only need to implement a handler function for each @@ -34,6 +39,7 @@ handler = None def __init__(self, process, stream, stop, handler=None, json_load=-1): + # pylint: disable=too-many-arguments # ZMQ parent process self.process = process self._json_load = json_load @@ -41,14 +47,15 @@ self.rep_stream = stream self._stop = stop # Response handler - self.rep_handler = handler or self.handler() + self.rep_handler = handler or self.handler() # pylint: disable=not-callable self.rep_handler.process = process def __call__(self, msg): - """ - Gets called when a messages is received by the stream this handlers is - registered at. *msg* is a list as returned by - :meth:`zmq.core.socket.Socket.recv_multipart`. + """Gets called when a message is received by the stream this handler is + registered with. + + :param msg: message content; it's a list as returned by + :meth:`zmq.core.socket.Socket.recv_multipart`. """ # Try to JSON-decode the index "self._json_load" of the message i = self._json_load diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/interfaces.py --- a/src/pyams_zmq/interfaces.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/interfaces.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,12 +10,18 @@ # FOR A PARTICULAR PURPOSE. # -__docformat__ = 'restructuredtext' +"""PyAMS_zmq.interfaces module + +This module provides definition of 0MQ process and message handle. +""" from zope.interface import Attribute, Interface, implementer from zope.interface.interfaces import IObjectEvent, ObjectEvent +__docformat__ = 'restructuredtext' + + class IZMQProcess(Interface): """ZeroMQ process interface""" @@ -24,6 +30,7 @@ def setup(self): """Initialize process context and events loop and initialize stream""" + # pylint: disable=too-many-arguments def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): """Create ZMQStream""" diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/process.py --- a/src/pyams_zmq/process.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/process.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,29 +10,45 @@ # FOR A PARTICULAR PURPOSE. # -__docformat__ = 'restructuredtext' +"""PyAMS_zmq.process module + +A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup +to be able to handle incoming connections. + +To each process is attached a messages handler, which is responsible of doing the concrete +messages handling. + +Process initialization arguments allows to define a list of client IP addresses which are +allowed to connect to this process, as well as a login/password authentication tokens which +must be provided to connect to this process. +""" import multiprocessing import signal import sys import zmq +from tornado import ioloop from zmq.auth.thread import ThreadAuthenticator -from zmq.eventloop import ioloop, zmqstream +from zmq.eventloop import zmqstream from zope.interface import implementer from pyams_utils.registry import get_global_registry from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent +__docformat__ = 'restructuredtext' + + @implementer(IZMQProcess) class ZMQProcess(multiprocessing.Process): + # pylint: disable=too-many-instance-attributes """ This is the base for all processes and offers utility methods for setup and creating new streams. """ - socket_type = zmq.REP + socket_type = zmq.REP # pylint: disable=no-member auth_thread = None def __init__(self, bind_addr, handler, auth=None, clients=None): @@ -52,28 +68,27 @@ def setup(self): """Creates a :attr:`context` and an event :attr:`loop` for the process.""" - ioloop.install() ctx = self.context = zmq.Context() auth = self.auth_thread = ThreadAuthenticator(ctx) auth.start() if self.clients: - auth.allow(*self.clients) + auth.allow(*self.clients) # pylint: disable=not-an-iterable if self.passwords: auth.configure_plain(domain='*', passwords=self.passwords) - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) - self.initStream() + self.init_stream() - def initStream(self): + def init_stream(self): """Initialize response stream""" self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop)) def run(self): - """Sets up everything and starts the event loop.""" + """Sets up everything and starts the event loop on process startup""" signal.signal(signal.SIGTERM, self.exit) self.setup() registry = get_global_registry() - registry.notify(ZMQProcessStartedEvent(self)) + registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member self.loop.start() def stop(self): @@ -83,39 +98,39 @@ self.loop = None self.auth_thread.stop() - def exit(self, num, frame): + def exit(self, num, frame): # pylint: disable=unused-argument + """Process exit""" self.stop() sys.exit() def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): - """ - Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. + # pylint: disable=too-many-arguments + """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) :param addr: Address to bind or connect to formatted as *host:port*, - *(host, port)* or *host* (bind to random port). - If *bind* is ``True``, *host* may be: + *(host, port)* or *host* (bind to random port). + If *bind* is ``True``, *host* may be: - - the wild-card ``*``, meaning all available interfaces, - - the primary IPv4 address assigned to the interface, in its - numeric representation or - - the interface name as defined by the operating system. + - the wild-card ``*``, meaning all available interfaces, + - the primary IPv4 address assigned to the interface, in its + numeric representation or + - the interface name as defined by the operating system. - If *bind* is ``False``, *host* may be: + If *bind* is ``False``, *host* may be: - - the DNS name of the peer or - - the IPv4 address of the peer, in its numeric representation. + - the DNS name of the peer or + - the IPv4 address of the peer, in its numeric representation. - If *addr* is just a host name without a port and *bind* is - ``True``, the socket will be bound to a random port. + If *addr* is just a host name without a port and *bind* is + ``True``, the socket will be bound to a random port. :param bind: Binds to *addr* if ``True`` or tries to connect to it - otherwise. + otherwise. :param callback: A callback for - :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional + :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional :param subscribe: Subscription pattern for *SUB* sockets, optional, - defaults to ``b''``. + defaults to ``b''``. :returns: A tuple containg the stream and the port number. - """ sock = self.context.socket(sock_type) @@ -138,8 +153,8 @@ sock.connect('tcp://%s:%s' % (host, port)) # Add a default subscription for SUB sockets - if sock_type == zmq.SUB: - sock.setsockopt(zmq.SUBSCRIBE, subscribe) + if sock_type == zmq.SUB: # pylint: disable=no-member + sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member # Create the stream and add the callback stream = zmqstream.ZMQStream(sock, self.loop) @@ -150,6 +165,7 @@ def process_exit_func(process=None): + """Process exit func is required to correctly end the child process""" if process is not None: if process.is_alive(): process.terminate() diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/socket.py --- a/src/pyams_zmq/socket.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/socket.py Thu Nov 28 19:19:25 2019 +0100 @@ -10,18 +10,19 @@ # FOR A PARTICULAR PURPOSE. # +"""PyAMS_zmq.socket module + +This module provides a few helpers which can be used to open a 0MQ socket and handle response. +""" + +import zmq + + __docformat__ = 'restructuredtext' -# import standard library - -# import interfaces - -# import packages -import zmq - - def zmq_socket(address, socket_type=zmq.REQ, linger=0, protocol='tcp', auth=None): + # pylint: disable=no-member """Get ØMQ socket auth is given as unicode 'username:password' string and automatically converted to bytes. @@ -41,5 +42,4 @@ poller.register(socket, flags) if poller.poll(timeout * 1000): return socket.recv_json() - else: - return [503, "Connection timeout"] + return [503, "Connection timeout"] diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/tests/__init__.py --- a/src/pyams_zmq/tests/__init__.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/tests/__init__.py Thu Nov 28 19:19:25 2019 +0100 @@ -1,1 +1,31 @@ +# -*- coding: utf-8 -*- ###################################################### +############################################################################## +# +# Copyright (c) 2008-2010 Thierry Florac +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +""" +Generic Test case for pyams_zmq doctest +""" +__docformat__ = 'restructuredtext' + +import os +import sys + + +def get_package_dir(value): + """Get package directory""" + + package_dir = os.path.split(value)[0] + if package_dir not in sys.path: + sys.path.append(package_dir) + return package_dir diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/tests/test_utilsdocs.py --- a/src/pyams_zmq/tests/test_utilsdocs.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/tests/test_utilsdocs.py Thu Nov 28 19:19:25 2019 +0100 @@ -1,5 +1,7 @@ +# -*- coding: utf-8 -*- ###################################################### +############################################################################## # -# Copyright (c) 2008-2015 Thierry Florac +# Copyright (c) 2008-2010 Thierry Florac # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, @@ -9,21 +11,25 @@ # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # +############################################################################## """ -Generic Test case for pyams_zmq doctest +Generic test case for pyams_zmq doctests """ + __docformat__ = 'restructuredtext' +import doctest +import os import unittest -import doctest -import sys -import os + +from pyams_utils.tests import get_package_dir -current_dir = os.path.dirname(__file__) +CURRENT_DIR = os.path.abspath(os.path.dirname(__file__)) -def doc_suite(test_dir, setUp=None, tearDown=None, globs=None): + +def doc_suite(test_dir, setUp=None, tearDown=None, globs=None): # pylint: disable=invalid-name """Returns a test suite, based on doctests found in /doctest.""" suite = [] if globs is None: @@ -32,15 +38,12 @@ flags = (doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_ONLY_FIRST_FAILURE) - package_dir = os.path.split(test_dir)[0] - if package_dir not in sys.path: - sys.path.append(package_dir) - + package_dir = get_package_dir(test_dir) doctest_dir = os.path.join(package_dir, 'doctests') # filtering files on extension docs = [os.path.join(doctest_dir, doc) for doc in - os.listdir(doctest_dir) if doc.endswith('.txt')] + os.listdir(doctest_dir) if doc.endswith('.txt') or doc.endswith('.rst')] for test in docs: suite.append(doctest.DocFileSuite(test, optionflags=flags, @@ -50,10 +53,11 @@ return unittest.TestSuite(suite) + def test_suite(): """returns the test suite""" - return doc_suite(current_dir) + return doc_suite(CURRENT_DIR) + if __name__ == '__main__': unittest.main(defaultTest='test_suite') - diff -r 36b9e48666ce -r 9352acb9366b src/pyams_zmq/tests/test_utilsdocstrings.py --- a/src/pyams_zmq/tests/test_utilsdocstrings.py Fri Jan 11 13:52:29 2019 +0100 +++ b/src/pyams_zmq/tests/test_utilsdocstrings.py Thu Nov 28 19:19:25 2019 +0100 @@ -1,5 +1,7 @@ +# -*- coding: utf-8 -*- ###################################################### +############################################################################## # -# Copyright (c) 2008-2015 Thierry Florac +# Copyright (c) 2008-2010 Thierry Florac # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, @@ -9,19 +11,23 @@ # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # +############################################################################## """ -Generic Test case for pyams_zmq doc strings +Generic test case for pyams_zmq doc strings """ + __docformat__ = 'restructuredtext' +import doctest +import os import unittest -import doctest -import sys -import os + +from pyams_utils.tests import get_package_dir -current_dir = os.path.abspath(os.path.dirname(__file__)) +CURRENT_DIR = os.path.abspath(os.path.dirname(__file__)) + def doc_suite(test_dir, globs=None): """Returns a test suite, based on doc tests strings found in /*.py""" @@ -32,9 +38,7 @@ flags = (doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_ONLY_FIRST_FAILURE) - package_dir = os.path.split(test_dir)[0] - if package_dir not in sys.path: - sys.path.append(package_dir) + package_dir = get_package_dir(test_dir) # filtering files on extension docs = [doc for doc in @@ -42,7 +46,7 @@ docs = [doc for doc in docs if not doc.startswith('__')] for test in docs: - fd = open(os.path.join(package_dir, test)) + fd = open(os.path.join(package_dir, test)) # pylint: disable=invalid-name content = fd.read() fd.close() if '>>> ' not in content: @@ -54,9 +58,11 @@ return unittest.TestSuite(suite) + def test_suite(): """returns the test suite""" - return doc_suite(current_dir) + return doc_suite(CURRENT_DIR) + if __name__ == '__main__': unittest.main(defaultTest='test_suite')