|
1 # |
|
2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net> |
|
3 # All Rights Reserved. |
|
4 # |
|
5 # This software is subject to the provisions of the Zope Public License, |
|
6 # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|
10 # FOR A PARTICULAR PURPOSE. |
|
11 # |
|
12 |
|
13 __docformat__ = 'restructuredtext' |
|
14 |
|
15 |
|
16 # import standard library |
|
17 import logging |
|
18 logger = logging.getLogger('PyAMS (media)') |
|
19 import os |
|
20 import shutil |
|
21 |
|
22 from multiprocessing import Process |
|
23 from threading import Thread |
|
24 |
|
25 # import interfaces |
|
26 from pyams_media.interfaces import IMediaConverter, IMediaConversions |
|
27 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME |
|
28 from transaction.interfaces import ITransactionManager |
|
29 from zope.intid.interfaces import IIntIds |
|
30 |
|
31 # import packages |
|
32 from pyams_utils.registry import set_local_registry, get_utility |
|
33 from pyams_utils.zodb import ZEOConnection |
|
34 from pyams_zmq.handler import ZMQMessageHandler |
|
35 from pyams_zmq.process import ZMQProcess |
|
36 from pyramid.threadlocal import manager as threadlocal_manager |
|
37 from zope.component.globalregistry import getGlobalSiteManager |
|
38 |
|
39 |
|
40 class ConversionProcess(Process): |
|
41 """Media conversion process""" |
|
42 |
|
43 def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): |
|
44 Process.__init__(self, group, target, name, *args, **kwargs) |
|
45 self.settings = settings |
|
46 |
|
47 def run(self): |
|
48 logger.debug("Starting conversion process...") |
|
49 # Lower process nice... |
|
50 os.nice(10) |
|
51 # Loading components registry |
|
52 registry = getGlobalSiteManager() |
|
53 threadlocal_manager.set({'request': None, 'registry': registry}) |
|
54 logger.debug("Getting global registry: {0!r}".format(registry)) |
|
55 # Check settings |
|
56 settings = self.settings |
|
57 logger.debug("Checking conversion parameters: {0}".format(str(settings))) |
|
58 zeo_settings = settings.get('zeo') |
|
59 media_id = settings.get('media') |
|
60 media_format = settings.get('format') |
|
61 if not (zeo_settings and media_id and media_format): |
|
62 logger.warning('Bad conversion request: {0}'.format(str(settings))) |
|
63 return |
|
64 converter = registry.queryUtility(IMediaConverter, media_format) |
|
65 if converter is None: |
|
66 logger.warning('Missing media converter: {0}'.format(media_format)) |
|
67 return |
|
68 # Open ZEO connection |
|
69 manager = None |
|
70 connection_info = ZEOConnection() |
|
71 connection_info.update(zeo_settings) |
|
72 logger.debug("Opening ZEO connection...") |
|
73 storage, db = connection_info.get_connection(get_storage=True) |
|
74 try: |
|
75 connection = db.open() |
|
76 root = connection.root() |
|
77 logger.debug("Getting connection root {0!r}".format(root)) |
|
78 application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME) |
|
79 application = root.get(application_name) |
|
80 logger.debug("Loading application {0!r} named {1}".format(application, application_name)) |
|
81 if application is not None: |
|
82 # set local registry |
|
83 sm = application.getSiteManager() |
|
84 set_local_registry(sm) |
|
85 logger.debug("Setting local registry {0!r}".format(sm)) |
|
86 # find media |
|
87 intids = get_utility(IIntIds) |
|
88 media = intids.queryObject(media_id) |
|
89 if media is None: |
|
90 logger.warning("Can't find requested media {0}!".format(media_id)) |
|
91 return |
|
92 # extract converter output |
|
93 logger.debug("Starting conversion process for {0!r} to {1}".format(media, media_format)) |
|
94 manager = ITransactionManager(media) |
|
95 for loop, conversion in converter.convert(media): |
|
96 logger.debug("Finished FFmpeg conversion process. {0} bytes output".format(len(conversion))) |
|
97 # add conversion in a transaction attempts loop |
|
98 for attempt in manager.attempts(): |
|
99 with attempt as t: |
|
100 IMediaConversions(media).add_conversion(conversion, media_format, converter.format, loop) |
|
101 if t.status == 'Committed': |
|
102 break |
|
103 finally: |
|
104 if manager is not None: |
|
105 manager.abort() |
|
106 connection.close() |
|
107 storage.close() |
|
108 threadlocal_manager.pop() |
|
109 |
|
110 |
|
111 class ConversionThread(Thread): |
|
112 """Media conversion thread""" |
|
113 |
|
114 def __init__(self, process): |
|
115 Thread.__init__(self) |
|
116 self.process = process |
|
117 |
|
118 def run(self): |
|
119 self.process.start() |
|
120 self.process.join() |
|
121 |
|
122 |
|
123 class MediaConversionHandler(object): |
|
124 """Media conversion handler""" |
|
125 |
|
126 def convert(self, settings): |
|
127 ConversionThread(ConversionProcess(settings)).start() |
|
128 return [200, 'Conversion process started'] |
|
129 |
|
130 def test(self, settings): |
|
131 messages = ['Conversion process ready to handle requests.'] |
|
132 ffmpeg = shutil.which('ffmpeg') |
|
133 if ffmpeg is None: |
|
134 messages.append("WARNING: missing 'ffmpeg' command!") |
|
135 else: |
|
136 messages.append("'ffmpeg' command is available.") |
|
137 return [200, '\n'.join(messages)] |
|
138 |
|
139 |
|
140 class MediaConversionMessageHandler(ZMQMessageHandler): |
|
141 """Media conversion message handler""" |
|
142 |
|
143 handler = MediaConversionHandler |
|
144 |
|
145 |
|
146 class MediaConversionProcess(ZMQProcess): |
|
147 """Media conversion ZMQ process""" |
|
148 |
|
149 def __init__(self, zmq_address, handler, registry): |
|
150 ZMQProcess.__init__(self, zmq_address, handler) |
|
151 self.registry = registry |