diff -r 000000000000 -r fd39db613f8b src/pyams_media/process.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/pyams_media/process.py Wed Sep 02 15:31:55 2015 +0200 @@ -0,0 +1,151 @@ +# +# Copyright (c) 2008-2015 Thierry Florac +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# + +__docformat__ = 'restructuredtext' + + +# import standard library +import logging +logger = logging.getLogger('PyAMS (media)') +import os +import shutil + +from multiprocessing import Process +from threading import Thread + +# import interfaces +from pyams_media.interfaces import IMediaConverter, IMediaConversions +from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME +from transaction.interfaces import ITransactionManager +from zope.intid.interfaces import IIntIds + +# import packages +from pyams_utils.registry import set_local_registry, get_utility +from pyams_utils.zodb import ZEOConnection +from pyams_zmq.handler import ZMQMessageHandler +from pyams_zmq.process import ZMQProcess +from pyramid.threadlocal import manager as threadlocal_manager +from zope.component.globalregistry import getGlobalSiteManager + + +class ConversionProcess(Process): + """Media conversion process""" + + def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): + Process.__init__(self, group, target, name, *args, **kwargs) + self.settings = settings + + def run(self): + logger.debug("Starting conversion process...") + # Lower process nice... + os.nice(10) + # Loading components registry + registry = getGlobalSiteManager() + threadlocal_manager.set({'request': None, 'registry': registry}) + logger.debug("Getting global registry: {0!r}".format(registry)) + # Check settings + settings = self.settings + logger.debug("Checking conversion parameters: {0}".format(str(settings))) + zeo_settings = settings.get('zeo') + media_id = settings.get('media') + media_format = settings.get('format') + if not (zeo_settings and media_id and media_format): + logger.warning('Bad conversion request: {0}'.format(str(settings))) + return + converter = registry.queryUtility(IMediaConverter, media_format) + if converter is None: + logger.warning('Missing media converter: {0}'.format(media_format)) + return + # Open ZEO connection + manager = None + connection_info = ZEOConnection() + connection_info.update(zeo_settings) + logger.debug("Opening ZEO connection...") + storage, db = connection_info.get_connection(get_storage=True) + try: + connection = db.open() + root = connection.root() + logger.debug("Getting connection root {0!r}".format(root)) + application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME) + application = root.get(application_name) + logger.debug("Loading application {0!r} named {1}".format(application, application_name)) + if application is not None: + # set local registry + sm = application.getSiteManager() + set_local_registry(sm) + logger.debug("Setting local registry {0!r}".format(sm)) + # find media + intids = get_utility(IIntIds) + media = intids.queryObject(media_id) + if media is None: + logger.warning("Can't find requested media {0}!".format(media_id)) + return + # extract converter output + logger.debug("Starting conversion process for {0!r} to {1}".format(media, media_format)) + manager = ITransactionManager(media) + for loop, conversion in converter.convert(media): + logger.debug("Finished FFmpeg conversion process. {0} bytes output".format(len(conversion))) + # add conversion in a transaction attempts loop + for attempt in manager.attempts(): + with attempt as t: + IMediaConversions(media).add_conversion(conversion, media_format, converter.format, loop) + if t.status == 'Committed': + break + finally: + if manager is not None: + manager.abort() + connection.close() + storage.close() + threadlocal_manager.pop() + + +class ConversionThread(Thread): + """Media conversion thread""" + + def __init__(self, process): + Thread.__init__(self) + self.process = process + + def run(self): + self.process.start() + self.process.join() + + +class MediaConversionHandler(object): + """Media conversion handler""" + + def convert(self, settings): + ConversionThread(ConversionProcess(settings)).start() + return [200, 'Conversion process started'] + + def test(self, settings): + messages = ['Conversion process ready to handle requests.'] + ffmpeg = shutil.which('ffmpeg') + if ffmpeg is None: + messages.append("WARNING: missing 'ffmpeg' command!") + else: + messages.append("'ffmpeg' command is available.") + return [200, '\n'.join(messages)] + + +class MediaConversionMessageHandler(ZMQMessageHandler): + """Media conversion message handler""" + + handler = MediaConversionHandler + + +class MediaConversionProcess(ZMQProcess): + """Media conversion ZMQ process""" + + def __init__(self, zmq_address, handler, registry): + ZMQProcess.__init__(self, zmq_address, handler) + self.registry = registry