diff -r e13ddd7964aa -r 92e1d7b3022e src/pyams_content_es/process.py --- a/src/pyams_content_es/process.py Sun Feb 18 12:43:46 2018 +0100 +++ b/src/pyams_content_es/process.py Sun Feb 18 12:44:23 2018 +0100 @@ -10,7 +10,6 @@ # FOR A PARTICULAR PURPOSE. # - __docformat__ = 'restructuredtext' @@ -29,10 +28,11 @@ # import packages from pyams_utils.registry import set_local_registry, get_utility, get_global_registry +from pyams_utils.request import check_request from pyams_utils.zodb import ZODBConnection from pyams_zmq.handler import ZMQMessageHandler from pyams_zmq.process import ZMQProcess -from pyramid.threadlocal import manager as threadlocal_manager +from pyramid.threadlocal import RequestContext class BaseIndexerProcess(Process): @@ -44,15 +44,6 @@ def run(self): logger.debug("Starting indexer thread...") - # Loading components registry - registry = get_global_registry() - threadlocal_manager.set({'request': None, 'registry': registry}) - logger.debug("Getting global registry: {0!r}".format(registry)) - # Get ES client - es_client = getattr(registry, 'pyramid_es_client', None) - if es_client is None: - logger.debug("Missing ElasticSearch client in registry!") - return # Check settings settings = self.settings logger.debug("Checking index parameters: {0}".format(str(settings))) @@ -60,39 +51,52 @@ if not document_id: logger.warning('Bad indexer request: {0}'.format(str(settings))) return - # Open ZODB connection - manager = None - zodb_name = settings.get('zodb_name') - logger.debug("Opening ZODB connection...") - with ZODBConnection(name=zodb_name) as root: - try: - logger.debug("Getting connection root {0!r}".format(root)) - application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME) - application = root.get(application_name) - logger.debug("Loading application {0!r} named {1}".format(application, application_name)) - if application is not None: - # set local registry - sm = application.getSiteManager() - set_local_registry(sm) - logger.debug("Setting local registry {0!r}".format(sm)) - # find document - intids = get_utility(IIntIds) - document = intids.queryObject(document_id) - if document is None: - logger.warning("Can't find requested document {0}!".format(document_id)) - return - # index document - logger.debug("Starting indexing for {0!r}".format(document)) - manager = ITransactionManager(document) - for attempt in manager.attempts(): - with attempt as t: - self.update_index(es_client, document) - if t.status == 'Committed': - break - finally: - if manager is not None: - manager.abort() - threadlocal_manager.pop() + # Loading components registry + registry = get_global_registry() + logger.debug("Getting global registry {0!r}".format(registry)) + # Get ES client + es_client = getattr(registry, 'pyramid_es_client', None) + if es_client is None: + logger.debug("Missing ElasticSearch client in registry!") + return + # Create new request + request = check_request() + request.registry = registry + logger.debug("Creating new request {0!r}".format(request)) + with RequestContext(request): + # Open ZODB connection + zodb_name = settings.get('zodb_name') + logger.debug("Opening ZODB connection...") + with ZODBConnection(name=zodb_name) as root: + manager = None + try: + logger.debug("Getting connection root {0!r}".format(root)) + application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, + PYAMS_APPLICATION_DEFAULT_NAME) + application = root.get(application_name) + logger.debug("Loading application {0!r} named {1}".format(application, application_name)) + if application is not None: + # set local registry + sm = application.getSiteManager() + set_local_registry(sm) + logger.debug("Setting local registry {0!r}".format(sm)) + # find document + intids = get_utility(IIntIds) + document = intids.queryObject(document_id) + if document is None: + logger.warning("Can't find requested document {0}!".format(document_id)) + return + # index document + logger.debug("Starting indexing for {0!r}".format(document)) + manager = ITransactionManager(document) + for attempt in manager.attempts(): + with attempt as t: + self.update_index(es_client, document) + if t.status == 'Committed': + break + finally: + if manager is not None: + manager.abort() def update_index(self, client, document): """Update index"""