src/pyams_media/process.py
changeset 0 fd39db613f8b
child 9 7c73df1106b4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_media/process.py	Wed Sep 02 15:31:55 2015 +0200
@@ -0,0 +1,151 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (media)')
+import os
+import shutil
+
+from multiprocessing import Process
+from threading import Thread
+
+# import interfaces
+from pyams_media.interfaces import IMediaConverter, IMediaConversions
+from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
+from transaction.interfaces import ITransactionManager
+from zope.intid.interfaces import IIntIds
+
+# import packages
+from pyams_utils.registry import set_local_registry, get_utility
+from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.handler import ZMQMessageHandler
+from pyams_zmq.process import ZMQProcess
+from pyramid.threadlocal import manager as threadlocal_manager
+from zope.component.globalregistry import getGlobalSiteManager
+
+
+class ConversionProcess(Process):
+    """Media conversion process"""
+
+    def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs):
+        Process.__init__(self, group, target, name, *args, **kwargs)
+        self.settings = settings
+
+    def run(self):
+        logger.debug("Starting conversion process...")
+        # Lower process nice...
+        os.nice(10)
+        # Loading components registry
+        registry = getGlobalSiteManager()
+        threadlocal_manager.set({'request': None, 'registry': registry})
+        logger.debug("Getting global registry: {0!r}".format(registry))
+        # Check settings
+        settings = self.settings
+        logger.debug("Checking conversion parameters: {0}".format(str(settings)))
+        zeo_settings = settings.get('zeo')
+        media_id = settings.get('media')
+        media_format = settings.get('format')
+        if not (zeo_settings and media_id and media_format):
+            logger.warning('Bad conversion request: {0}'.format(str(settings)))
+            return
+        converter = registry.queryUtility(IMediaConverter, media_format)
+        if converter is None:
+            logger.warning('Missing media converter: {0}'.format(media_format))
+            return
+        # Open ZEO connection
+        manager = None
+        connection_info = ZEOConnection()
+        connection_info.update(zeo_settings)
+        logger.debug("Opening ZEO connection...")
+        storage, db = connection_info.get_connection(get_storage=True)
+        try:
+            connection = db.open()
+            root = connection.root()
+            logger.debug("Getting connection root {0!r}".format(root))
+            application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
+            application = root.get(application_name)
+            logger.debug("Loading application {0!r} named {1}".format(application, application_name))
+            if application is not None:
+                # set local registry
+                sm = application.getSiteManager()
+                set_local_registry(sm)
+                logger.debug("Setting local registry {0!r}".format(sm))
+                # find media
+                intids = get_utility(IIntIds)
+                media = intids.queryObject(media_id)
+                if media is None:
+                    logger.warning("Can't find requested media {0}!".format(media_id))
+                    return
+                # extract converter output
+                logger.debug("Starting conversion process for {0!r} to {1}".format(media, media_format))
+                manager = ITransactionManager(media)
+                for loop, conversion in converter.convert(media):
+                    logger.debug("Finished FFmpeg conversion process. {0} bytes output".format(len(conversion)))
+                    # add conversion in a transaction attempts loop
+                    for attempt in manager.attempts():
+                        with attempt as t:
+                            IMediaConversions(media).add_conversion(conversion, media_format, converter.format, loop)
+                        if t.status == 'Committed':
+                            break
+        finally:
+            if manager is not None:
+                manager.abort()
+            connection.close()
+            storage.close()
+            threadlocal_manager.pop()
+
+
+class ConversionThread(Thread):
+    """Media conversion thread"""
+
+    def __init__(self, process):
+        Thread.__init__(self)
+        self.process = process
+
+    def run(self):
+        self.process.start()
+        self.process.join()
+
+
+class MediaConversionHandler(object):
+    """Media conversion handler"""
+
+    def convert(self, settings):
+        ConversionThread(ConversionProcess(settings)).start()
+        return [200, 'Conversion process started']
+
+    def test(self, settings):
+        messages = ['Conversion process ready to handle requests.']
+        ffmpeg = shutil.which('ffmpeg')
+        if ffmpeg is None:
+            messages.append("WARNING: missing 'ffmpeg' command!")
+        else:
+            messages.append("'ffmpeg' command is available.")
+        return [200, '\n'.join(messages)]
+
+
+class MediaConversionMessageHandler(ZMQMessageHandler):
+    """Media conversion message handler"""
+
+    handler = MediaConversionHandler
+
+
+class MediaConversionProcess(ZMQProcess):
+    """Media conversion ZMQ process"""
+
+    def __init__(self, zmq_address, handler, registry):
+        ZMQProcess.__init__(self, zmq_address, handler)
+        self.registry = registry