--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_media/process.py Wed Sep 02 15:31:55 2015 +0200
@@ -0,0 +1,151 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (media)')
+import os
+import shutil
+
+from multiprocessing import Process
+from threading import Thread
+
+# import interfaces
+from pyams_media.interfaces import IMediaConverter, IMediaConversions
+from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
+from transaction.interfaces import ITransactionManager
+from zope.intid.interfaces import IIntIds
+
+# import packages
+from pyams_utils.registry import set_local_registry, get_utility
+from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.handler import ZMQMessageHandler
+from pyams_zmq.process import ZMQProcess
+from pyramid.threadlocal import manager as threadlocal_manager
+from zope.component.globalregistry import getGlobalSiteManager
+
+
+class ConversionProcess(Process):
+ """Media conversion process"""
+
+ def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs):
+ Process.__init__(self, group, target, name, *args, **kwargs)
+ self.settings = settings
+
+ def run(self):
+ logger.debug("Starting conversion process...")
+ # Lower process nice...
+ os.nice(10)
+ # Loading components registry
+ registry = getGlobalSiteManager()
+ threadlocal_manager.set({'request': None, 'registry': registry})
+ logger.debug("Getting global registry: {0!r}".format(registry))
+ # Check settings
+ settings = self.settings
+ logger.debug("Checking conversion parameters: {0}".format(str(settings)))
+ zeo_settings = settings.get('zeo')
+ media_id = settings.get('media')
+ media_format = settings.get('format')
+ if not (zeo_settings and media_id and media_format):
+ logger.warning('Bad conversion request: {0}'.format(str(settings)))
+ return
+ converter = registry.queryUtility(IMediaConverter, media_format)
+ if converter is None:
+ logger.warning('Missing media converter: {0}'.format(media_format))
+ return
+ # Open ZEO connection
+ manager = None
+ connection_info = ZEOConnection()
+ connection_info.update(zeo_settings)
+ logger.debug("Opening ZEO connection...")
+ storage, db = connection_info.get_connection(get_storage=True)
+ try:
+ connection = db.open()
+ root = connection.root()
+ logger.debug("Getting connection root {0!r}".format(root))
+ application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
+ application = root.get(application_name)
+ logger.debug("Loading application {0!r} named {1}".format(application, application_name))
+ if application is not None:
+ # set local registry
+ sm = application.getSiteManager()
+ set_local_registry(sm)
+ logger.debug("Setting local registry {0!r}".format(sm))
+ # find media
+ intids = get_utility(IIntIds)
+ media = intids.queryObject(media_id)
+ if media is None:
+ logger.warning("Can't find requested media {0}!".format(media_id))
+ return
+ # extract converter output
+ logger.debug("Starting conversion process for {0!r} to {1}".format(media, media_format))
+ manager = ITransactionManager(media)
+ for loop, conversion in converter.convert(media):
+ logger.debug("Finished FFmpeg conversion process. {0} bytes output".format(len(conversion)))
+ # add conversion in a transaction attempts loop
+ for attempt in manager.attempts():
+ with attempt as t:
+ IMediaConversions(media).add_conversion(conversion, media_format, converter.format, loop)
+ if t.status == 'Committed':
+ break
+ finally:
+ if manager is not None:
+ manager.abort()
+ connection.close()
+ storage.close()
+ threadlocal_manager.pop()
+
+
+class ConversionThread(Thread):
+ """Media conversion thread"""
+
+ def __init__(self, process):
+ Thread.__init__(self)
+ self.process = process
+
+ def run(self):
+ self.process.start()
+ self.process.join()
+
+
+class MediaConversionHandler(object):
+ """Media conversion handler"""
+
+ def convert(self, settings):
+ ConversionThread(ConversionProcess(settings)).start()
+ return [200, 'Conversion process started']
+
+ def test(self, settings):
+ messages = ['Conversion process ready to handle requests.']
+ ffmpeg = shutil.which('ffmpeg')
+ if ffmpeg is None:
+ messages.append("WARNING: missing 'ffmpeg' command!")
+ else:
+ messages.append("'ffmpeg' command is available.")
+ return [200, '\n'.join(messages)]
+
+
+class MediaConversionMessageHandler(ZMQMessageHandler):
+ """Media conversion message handler"""
+
+ handler = MediaConversionHandler
+
+
+class MediaConversionProcess(ZMQProcess):
+ """Media conversion ZMQ process"""
+
+ def __init__(self, zmq_address, handler, registry):
+ ZMQProcess.__init__(self, zmq_address, handler)
+ self.registry = registry