--- a/src/pyams_zmq/__init__.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/__init__.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,6 +10,15 @@
# FOR A PARTICULAR PURPOSE.
#
+"""PyAMS_zmq package
+
+This package provides features to enable local or remote inter-processes communication
+using the 0MQ protocol and the PyZMQ package.
+
+Several PyAMS packages rely on this package to provide asynchronous handling of long operations
+like medias conversions, Elasticsearch indexing or communication with the tasks scheduler.
+"""
+
__docformat__ = 'restructuredtext'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_zmq/doctests/README.rst Thu Nov 28 19:19:25 2019 +0100
@@ -0,0 +1,106 @@
+=================
+pyams_zmq package
+=================
+
+PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange
+messages following all ØMQ possible usages.
+
+At least two components are required to build a ØMQ based application:
+
+- a ØMQ server
+
+- a ØMQ client
+
+The way client and server communicate depends on used ØMQ protocol.
+
+We will take example on the medias conversion utility provided by 'pyams_media' package, which
+allows you to automatically convert medias files (videos...) asynchronously as soon as they are
+uploaded. The conversion process is a background process so doesn't return any result.
+
+The conversion process is a simple ØMQ process:
+
+ >>> from pyramid.testing import setUp, tearDown
+ >>> config = setUp()
+
+ >>> from multiprocessing import Process
+ >>> from pyams_zmq.process import ZMQProcess, process_exit_func
+
+ >>> converter_address = '127.0.0.1:25556'
+
+ >>> class MyConversionProcess(Process):
+ ... """Conversion manager process"""
+ ...
+ ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs):
+ ... Process.__init__(self, group, target, name, args, kwargs)
+ ... self.settings = settings
+ ...
+ ... def run(self):
+ ... settings = self.settings
+ ... path = settings['path']
+ ... format = settings['format']
+ ... # you can virtually do anything you want with these settings
+
+To be sure to run asynchronously, this process is managed by a thread:
+
+ >>> import time
+ >>> from threading import Thread
+
+ >>> class ConversionThread(Thread):
+ ... """Conversion thread"""
+ ...
+ ... def __init__(self, process):
+ ... Thread.__init__(self)
+ ... self.process = process
+ ...
+ ... def run(self):
+ ... self.process.start()
+ ... self.process.join()
+
+The conversion handler is the simple class to which conversion is delegated:
+
+ >>> class ConversionHandler:
+ ... """Conversion handler"""
+ ... def convert(self, data):
+ ... ConversionThread(MyConversionProcess(data)).start()
+ ... return [200, 'OK']
+
+The message handler receives the message and handle it:
+
+ >>> from pyams_zmq.handler import ZMQMessageHandler
+ >>> class ConversionMessageHandler(ZMQMessageHandler):
+ ... handler = ConversionHandler
+
+ >>> class ConversionProcess(ZMQProcess):
+ ... """Medias conversion process"""
+
+The ØMQ process is generally started on application startup; following tests are commented
+because tests are stucked when running in test mode:
+
+ # >>> import atexit
+ # >>> process = ConversionProcess(converter_address, ConversionMessageHandler)
+ # >>> process.start()
+ # >>> time.sleep(2)
+ # >>> if process.is_alive():
+ # ... atexit.register(process_exit_func, process=process)
+ # <function process_exit_func at 0x...>
+
+Once all these elements are in place, you just have to create a ØMQ client context, open a
+connection and send a message.
+
+Messages are lists of two objects; the first one is the type os the message, which should match
+a method name of the message handler; the second object is the method arguments:
+
+ >>> import zmq
+ >>> from pyams_zmq.socket import zmq_socket, zmq_response
+
+ >>> settings = {'path': '/this/is/my/path',
+ ... 'format': 'JPEG'}
+ >>> message = ['convert', settings]
+
+ # >>> socket = zmq_socket('tcp://' + converter_address)
+ # >>> socket.send_json(message)
+ # >>> response = zmq_response(socket)
+ # >>> response
+ # [200, 'OK']
+
+ >>> tearDown()
--- a/src/pyams_zmq/doctests/README.txt Fri Jan 11 13:52:29 2019 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,100 +0,0 @@
-=================
-pyams_zmq package
-=================
-
-PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange
-messages following all ØMQ possible usages.
-
-At least two components are required to build a ØMQ based application:
-
-- a ØMQ server
-
-- a ØMQ client
-
-The way client and server communicate depends on used ØMQ protocol.
-
-We will take example on the medias conversion utility provided by 'pyams_media' package, which allows you
-to automatically convert medias files (videos...) asynchronously as soon as they are uploaded. The conversion
-process is a background process so doesn't return any result.
-
-The conversion process is a simple ØMQ process:
-
- >>> converter_address = '127.0.0.1:5555'
-
- >>> from pyams_zmq.process import ZMQProcess, process_exit_func
- >>>
- >>> class MediasConversionProcess(ZMQProcess):
- ... """Medias conversion process"""
-
- >>> from multiprocessing import Process
- >>>
- >>> class ConversionProcess(Process):
- ... """Conversion manager process"""
- ...
- ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs):
- ... Process.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
- ... self.settings = settings
- ...
- ... def run(self):
- ... settings = self.settings
- ... path = settings['path']
- ... format = settings['format']
- ... # just image you're doing anything you want with these settings!
-
-To be sure to run asynchronously, this process is managed by a thread:
-
- >>> import time
- >>> from threading import Thread
- >>>
- >>> class ConversionThread(Thread):
- ... """Conversion thread"""
- ...
- ... def __init__(self, process):
- ... Thread.__init__(self)
- ... self.process = process
- ...
- ... def run(self):
- ... self.process.start()
- ... self.process.join()
-
-The conversion handler is the simple class to which conversion is delegated:
-
- >>> class ConversionHandler(object):
- ... """Conversion handler"""
- ...
- ... def convert(self, data):
- ... ConversionThread(ConversionProcess(data)).start()
- ... return [200, 'OK']
-
-The message handler receives the message and handle it:
-
- >>> from pyams_zmq.handler import ZMQMessageHandler
- >>>
- >>> class ConversionMessageHandler(ZMQMessageHandler):
- ...
- ... handler = ConversionHandler
-
-The ØMQ process is generally started on application startup:
-
- >>> import atexit
- >>>
- >>> process = MediasConversionProcess(converter_address, ConversionMessageHandler)
- >>> process.start()
- >>> time.sleep(2)
- >>> if process.is_alive():
- ... atexit.register(process_exit_func, process=process)
- <function process_exit_func at 0x...>
-
-Once all these elements are in place, you just have to create a ØMQ client context, open a connection and send a
-message:
-
- >>> import zmq
- >>> settings = {'path': '/this/is/my/path',
- ... 'format': 'JPEG'}
- >>> message = ['convert', settings]
- >>> context = zmq.Context()
- >>> socket = context.socket(zmq.REQ)
- >>> socket.connect('tcp://' + converter_address)
- >>> socket.send_json(message)
- >>> socket.recv_json()
- [200, 'OK']
--- a/src/pyams_zmq/handler.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/handler.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,21 +10,26 @@
# FOR A PARTICULAR PURPOSE.
#
+"""PyAMS_zmq.handler module
+
+This module provides a default implementation of a 0MQ messages handler.
+
+These handlers are simple classes which delegate the real message handling
+to their "handler" attribute, which is a simple class which may have a method matching the
+message type.
+"""
+
+from zmq.utils import jsonapi
+from zope.interface import implementer
+
+from pyams_zmq.interfaces import IZMQMessageHandler
+
+
__docformat__ = 'restructuredtext'
-# import standard library
-from zmq.utils import jsonapi
-
-# import interfaces
-from pyams_zmq.interfaces import IZMQMessageHandler
-
-# import packages
-from zope.interface import implementer
-
-
@implementer(IZMQMessageHandler)
-class ZMQMessageHandler(object):
+class ZMQMessageHandler:
"""Base class for message handlers for a :class:`pyams_zmq.process.Process`.
Inheriting classes only need to implement a handler function for each
@@ -34,6 +39,7 @@
handler = None
def __init__(self, process, stream, stop, handler=None, json_load=-1):
+ # pylint: disable=too-many-arguments
# ZMQ parent process
self.process = process
self._json_load = json_load
@@ -41,14 +47,15 @@
self.rep_stream = stream
self._stop = stop
# Response handler
- self.rep_handler = handler or self.handler()
+ self.rep_handler = handler or self.handler() # pylint: disable=not-callable
self.rep_handler.process = process
def __call__(self, msg):
- """
- Gets called when a messages is received by the stream this handlers is
- registered at. *msg* is a list as returned by
- :meth:`zmq.core.socket.Socket.recv_multipart`.
+ """Gets called when a message is received by the stream this handler is
+ registered with.
+
+ :param msg: message content; it's a list as returned by
+ :meth:`zmq.core.socket.Socket.recv_multipart`.
"""
# Try to JSON-decode the index "self._json_load" of the message
i = self._json_load
--- a/src/pyams_zmq/interfaces.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/interfaces.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,12 +10,18 @@
# FOR A PARTICULAR PURPOSE.
#
-__docformat__ = 'restructuredtext'
+"""PyAMS_zmq.interfaces module
+
+This module provides definition of 0MQ process and message handle.
+"""
from zope.interface import Attribute, Interface, implementer
from zope.interface.interfaces import IObjectEvent, ObjectEvent
+__docformat__ = 'restructuredtext'
+
+
class IZMQProcess(Interface):
"""ZeroMQ process interface"""
@@ -24,6 +30,7 @@
def setup(self):
"""Initialize process context and events loop and initialize stream"""
+ # pylint: disable=too-many-arguments
def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
"""Create ZMQStream"""
--- a/src/pyams_zmq/process.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/process.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,29 +10,45 @@
# FOR A PARTICULAR PURPOSE.
#
-__docformat__ = 'restructuredtext'
+"""PyAMS_zmq.process module
+
+A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup
+to be able to handle incoming connections.
+
+To each process is attached a messages handler, which is responsible of doing the concrete
+messages handling.
+
+Process initialization arguments allows to define a list of client IP addresses which are
+allowed to connect to this process, as well as a login/password authentication tokens which
+must be provided to connect to this process.
+"""
import multiprocessing
import signal
import sys
import zmq
+from tornado import ioloop
from zmq.auth.thread import ThreadAuthenticator
-from zmq.eventloop import ioloop, zmqstream
+from zmq.eventloop import zmqstream
from zope.interface import implementer
from pyams_utils.registry import get_global_registry
from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent
+__docformat__ = 'restructuredtext'
+
+
@implementer(IZMQProcess)
class ZMQProcess(multiprocessing.Process):
+ # pylint: disable=too-many-instance-attributes
"""
This is the base for all processes and offers utility methods
for setup and creating new streams.
"""
- socket_type = zmq.REP
+ socket_type = zmq.REP # pylint: disable=no-member
auth_thread = None
def __init__(self, bind_addr, handler, auth=None, clients=None):
@@ -52,28 +68,27 @@
def setup(self):
"""Creates a :attr:`context` and an event :attr:`loop` for the process."""
- ioloop.install()
ctx = self.context = zmq.Context()
auth = self.auth_thread = ThreadAuthenticator(ctx)
auth.start()
if self.clients:
- auth.allow(*self.clients)
+ auth.allow(*self.clients) # pylint: disable=not-an-iterable
if self.passwords:
auth.configure_plain(domain='*', passwords=self.passwords)
- self.loop = ioloop.IOLoop.instance()
+ self.loop = ioloop.IOLoop.current()
self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True)
- self.initStream()
+ self.init_stream()
- def initStream(self):
+ def init_stream(self):
"""Initialize response stream"""
self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
def run(self):
- """Sets up everything and starts the event loop."""
+ """Sets up everything and starts the event loop on process startup"""
signal.signal(signal.SIGTERM, self.exit)
self.setup()
registry = get_global_registry()
- registry.notify(ZMQProcessStartedEvent(self))
+ registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member
self.loop.start()
def stop(self):
@@ -83,39 +98,39 @@
self.loop = None
self.auth_thread.stop()
- def exit(self, num, frame):
+ def exit(self, num, frame): # pylint: disable=unused-argument
+ """Process exit"""
self.stop()
sys.exit()
def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
- """
- Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+ # pylint: disable=too-many-arguments
+ """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
:param addr: Address to bind or connect to formatted as *host:port*,
- *(host, port)* or *host* (bind to random port).
- If *bind* is ``True``, *host* may be:
+ *(host, port)* or *host* (bind to random port).
+ If *bind* is ``True``, *host* may be:
- - the wild-card ``*``, meaning all available interfaces,
- - the primary IPv4 address assigned to the interface, in its
- numeric representation or
- - the interface name as defined by the operating system.
+ - the wild-card ``*``, meaning all available interfaces,
+ - the primary IPv4 address assigned to the interface, in its
+ numeric representation or
+ - the interface name as defined by the operating system.
- If *bind* is ``False``, *host* may be:
+ If *bind* is ``False``, *host* may be:
- - the DNS name of the peer or
- - the IPv4 address of the peer, in its numeric representation.
+ - the DNS name of the peer or
+ - the IPv4 address of the peer, in its numeric representation.
- If *addr* is just a host name without a port and *bind* is
- ``True``, the socket will be bound to a random port.
+ If *addr* is just a host name without a port and *bind* is
+ ``True``, the socket will be bound to a random port.
:param bind: Binds to *addr* if ``True`` or tries to connect to it
- otherwise.
+ otherwise.
:param callback: A callback for
- :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+ :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
:param subscribe: Subscription pattern for *SUB* sockets, optional,
- defaults to ``b''``.
+ defaults to ``b''``.
:returns: A tuple containg the stream and the port number.
-
"""
sock = self.context.socket(sock_type)
@@ -138,8 +153,8 @@
sock.connect('tcp://%s:%s' % (host, port))
# Add a default subscription for SUB sockets
- if sock_type == zmq.SUB:
- sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+ if sock_type == zmq.SUB: # pylint: disable=no-member
+ sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member
# Create the stream and add the callback
stream = zmqstream.ZMQStream(sock, self.loop)
@@ -150,6 +165,7 @@
def process_exit_func(process=None):
+ """Process exit func is required to correctly end the child process"""
if process is not None:
if process.is_alive():
process.terminate()
--- a/src/pyams_zmq/socket.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/socket.py Thu Nov 28 19:19:25 2019 +0100
@@ -10,18 +10,19 @@
# FOR A PARTICULAR PURPOSE.
#
+"""PyAMS_zmq.socket module
+
+This module provides a few helpers which can be used to open a 0MQ socket and handle response.
+"""
+
+import zmq
+
+
__docformat__ = 'restructuredtext'
-# import standard library
-
-# import interfaces
-
-# import packages
-import zmq
-
-
def zmq_socket(address, socket_type=zmq.REQ, linger=0, protocol='tcp', auth=None):
+ # pylint: disable=no-member
"""Get ØMQ socket
auth is given as unicode 'username:password' string and automatically converted to bytes.
@@ -41,5 +42,4 @@
poller.register(socket, flags)
if poller.poll(timeout * 1000):
return socket.recv_json()
- else:
- return [503, "Connection timeout"]
+ return [503, "Connection timeout"]
--- a/src/pyams_zmq/tests/__init__.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/tests/__init__.py Thu Nov 28 19:19:25 2019 +0100
@@ -1,1 +1,31 @@
+# -*- coding: utf-8 -*- ######################################################
+##############################################################################
+#
+# Copyright (c) 2008-2010 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+Generic Test case for pyams_zmq doctest
+"""
+__docformat__ = 'restructuredtext'
+
+import os
+import sys
+
+
+def get_package_dir(value):
+ """Get package directory"""
+
+ package_dir = os.path.split(value)[0]
+ if package_dir not in sys.path:
+ sys.path.append(package_dir)
+ return package_dir
--- a/src/pyams_zmq/tests/test_utilsdocs.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/tests/test_utilsdocs.py Thu Nov 28 19:19:25 2019 +0100
@@ -1,5 +1,7 @@
+# -*- coding: utf-8 -*- ######################################################
+##############################################################################
#
-# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# Copyright (c) 2008-2010 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -9,21 +11,25 @@
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
+##############################################################################
"""
-Generic Test case for pyams_zmq doctest
+Generic test case for pyams_zmq doctests
"""
+
__docformat__ = 'restructuredtext'
+import doctest
+import os
import unittest
-import doctest
-import sys
-import os
+
+from pyams_utils.tests import get_package_dir
-current_dir = os.path.dirname(__file__)
+CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
-def doc_suite(test_dir, setUp=None, tearDown=None, globs=None):
+
+def doc_suite(test_dir, setUp=None, tearDown=None, globs=None): # pylint: disable=invalid-name
"""Returns a test suite, based on doctests found in /doctest."""
suite = []
if globs is None:
@@ -32,15 +38,12 @@
flags = (doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE |
doctest.REPORT_ONLY_FIRST_FAILURE)
- package_dir = os.path.split(test_dir)[0]
- if package_dir not in sys.path:
- sys.path.append(package_dir)
-
+ package_dir = get_package_dir(test_dir)
doctest_dir = os.path.join(package_dir, 'doctests')
# filtering files on extension
docs = [os.path.join(doctest_dir, doc) for doc in
- os.listdir(doctest_dir) if doc.endswith('.txt')]
+ os.listdir(doctest_dir) if doc.endswith('.txt') or doc.endswith('.rst')]
for test in docs:
suite.append(doctest.DocFileSuite(test, optionflags=flags,
@@ -50,10 +53,11 @@
return unittest.TestSuite(suite)
+
def test_suite():
"""returns the test suite"""
- return doc_suite(current_dir)
+ return doc_suite(CURRENT_DIR)
+
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
-
--- a/src/pyams_zmq/tests/test_utilsdocstrings.py Fri Jan 11 13:52:29 2019 +0100
+++ b/src/pyams_zmq/tests/test_utilsdocstrings.py Thu Nov 28 19:19:25 2019 +0100
@@ -1,5 +1,7 @@
+# -*- coding: utf-8 -*- ######################################################
+##############################################################################
#
-# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# Copyright (c) 2008-2010 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -9,19 +11,23 @@
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
+##############################################################################
"""
-Generic Test case for pyams_zmq doc strings
+Generic test case for pyams_zmq doc strings
"""
+
__docformat__ = 'restructuredtext'
+import doctest
+import os
import unittest
-import doctest
-import sys
-import os
+
+from pyams_utils.tests import get_package_dir
-current_dir = os.path.abspath(os.path.dirname(__file__))
+CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
+
def doc_suite(test_dir, globs=None):
"""Returns a test suite, based on doc tests strings found in /*.py"""
@@ -32,9 +38,7 @@
flags = (doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE |
doctest.REPORT_ONLY_FIRST_FAILURE)
- package_dir = os.path.split(test_dir)[0]
- if package_dir not in sys.path:
- sys.path.append(package_dir)
+ package_dir = get_package_dir(test_dir)
# filtering files on extension
docs = [doc for doc in
@@ -42,7 +46,7 @@
docs = [doc for doc in docs if not doc.startswith('__')]
for test in docs:
- fd = open(os.path.join(package_dir, test))
+ fd = open(os.path.join(package_dir, test)) # pylint: disable=invalid-name
content = fd.read()
fd.close()
if '>>> ' not in content:
@@ -54,9 +58,11 @@
return unittest.TestSuite(suite)
+
def test_suite():
"""returns the test suite"""
- return doc_suite(current_dir)
+ return doc_suite(CURRENT_DIR)
+
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')