src/pyams_media/process.py
changeset 0 fd39db613f8b
child 9 7c73df1106b4
equal deleted inserted replaced
-1:000000000000 0:fd39db613f8b
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import logging
       
    18 logger = logging.getLogger('PyAMS (media)')
       
    19 import os
       
    20 import shutil
       
    21 
       
    22 from multiprocessing import Process
       
    23 from threading import Thread
       
    24 
       
    25 # import interfaces
       
    26 from pyams_media.interfaces import IMediaConverter, IMediaConversions
       
    27 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
       
    28 from transaction.interfaces import ITransactionManager
       
    29 from zope.intid.interfaces import IIntIds
       
    30 
       
    31 # import packages
       
    32 from pyams_utils.registry import set_local_registry, get_utility
       
    33 from pyams_utils.zodb import ZEOConnection
       
    34 from pyams_zmq.handler import ZMQMessageHandler
       
    35 from pyams_zmq.process import ZMQProcess
       
    36 from pyramid.threadlocal import manager as threadlocal_manager
       
    37 from zope.component.globalregistry import getGlobalSiteManager
       
    38 
       
    39 
       
    40 class ConversionProcess(Process):
       
    41     """Media conversion process"""
       
    42 
       
    43     def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs):
       
    44         Process.__init__(self, group, target, name, *args, **kwargs)
       
    45         self.settings = settings
       
    46 
       
    47     def run(self):
       
    48         logger.debug("Starting conversion process...")
       
    49         # Lower process nice...
       
    50         os.nice(10)
       
    51         # Loading components registry
       
    52         registry = getGlobalSiteManager()
       
    53         threadlocal_manager.set({'request': None, 'registry': registry})
       
    54         logger.debug("Getting global registry: {0!r}".format(registry))
       
    55         # Check settings
       
    56         settings = self.settings
       
    57         logger.debug("Checking conversion parameters: {0}".format(str(settings)))
       
    58         zeo_settings = settings.get('zeo')
       
    59         media_id = settings.get('media')
       
    60         media_format = settings.get('format')
       
    61         if not (zeo_settings and media_id and media_format):
       
    62             logger.warning('Bad conversion request: {0}'.format(str(settings)))
       
    63             return
       
    64         converter = registry.queryUtility(IMediaConverter, media_format)
       
    65         if converter is None:
       
    66             logger.warning('Missing media converter: {0}'.format(media_format))
       
    67             return
       
    68         # Open ZEO connection
       
    69         manager = None
       
    70         connection_info = ZEOConnection()
       
    71         connection_info.update(zeo_settings)
       
    72         logger.debug("Opening ZEO connection...")
       
    73         storage, db = connection_info.get_connection(get_storage=True)
       
    74         try:
       
    75             connection = db.open()
       
    76             root = connection.root()
       
    77             logger.debug("Getting connection root {0!r}".format(root))
       
    78             application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
       
    79             application = root.get(application_name)
       
    80             logger.debug("Loading application {0!r} named {1}".format(application, application_name))
       
    81             if application is not None:
       
    82                 # set local registry
       
    83                 sm = application.getSiteManager()
       
    84                 set_local_registry(sm)
       
    85                 logger.debug("Setting local registry {0!r}".format(sm))
       
    86                 # find media
       
    87                 intids = get_utility(IIntIds)
       
    88                 media = intids.queryObject(media_id)
       
    89                 if media is None:
       
    90                     logger.warning("Can't find requested media {0}!".format(media_id))
       
    91                     return
       
    92                 # extract converter output
       
    93                 logger.debug("Starting conversion process for {0!r} to {1}".format(media, media_format))
       
    94                 manager = ITransactionManager(media)
       
    95                 for loop, conversion in converter.convert(media):
       
    96                     logger.debug("Finished FFmpeg conversion process. {0} bytes output".format(len(conversion)))
       
    97                     # add conversion in a transaction attempts loop
       
    98                     for attempt in manager.attempts():
       
    99                         with attempt as t:
       
   100                             IMediaConversions(media).add_conversion(conversion, media_format, converter.format, loop)
       
   101                         if t.status == 'Committed':
       
   102                             break
       
   103         finally:
       
   104             if manager is not None:
       
   105                 manager.abort()
       
   106             connection.close()
       
   107             storage.close()
       
   108             threadlocal_manager.pop()
       
   109 
       
   110 
       
   111 class ConversionThread(Thread):
       
   112     """Media conversion thread"""
       
   113 
       
   114     def __init__(self, process):
       
   115         Thread.__init__(self)
       
   116         self.process = process
       
   117 
       
   118     def run(self):
       
   119         self.process.start()
       
   120         self.process.join()
       
   121 
       
   122 
       
   123 class MediaConversionHandler(object):
       
   124     """Media conversion handler"""
       
   125 
       
   126     def convert(self, settings):
       
   127         ConversionThread(ConversionProcess(settings)).start()
       
   128         return [200, 'Conversion process started']
       
   129 
       
   130     def test(self, settings):
       
   131         messages = ['Conversion process ready to handle requests.']
       
   132         ffmpeg = shutil.which('ffmpeg')
       
   133         if ffmpeg is None:
       
   134             messages.append("WARNING: missing 'ffmpeg' command!")
       
   135         else:
       
   136             messages.append("'ffmpeg' command is available.")
       
   137         return [200, '\n'.join(messages)]
       
   138 
       
   139 
       
   140 class MediaConversionMessageHandler(ZMQMessageHandler):
       
   141     """Media conversion message handler"""
       
   142 
       
   143     handler = MediaConversionHandler
       
   144 
       
   145 
       
   146 class MediaConversionProcess(ZMQProcess):
       
   147     """Media conversion ZMQ process"""
       
   148 
       
   149     def __init__(self, zmq_address, handler, registry):
       
   150         ZMQProcess.__init__(self, zmq_address, handler)
       
   151         self.registry = registry