--- a/src/pyams_content_es/process.py Sun Feb 18 12:43:46 2018 +0100
+++ b/src/pyams_content_es/process.py Sun Feb 18 12:44:23 2018 +0100
@@ -10,7 +10,6 @@
# FOR A PARTICULAR PURPOSE.
#
-
__docformat__ = 'restructuredtext'
@@ -29,10 +28,11 @@
# import packages
from pyams_utils.registry import set_local_registry, get_utility, get_global_registry
+from pyams_utils.request import check_request
from pyams_utils.zodb import ZODBConnection
from pyams_zmq.handler import ZMQMessageHandler
from pyams_zmq.process import ZMQProcess
-from pyramid.threadlocal import manager as threadlocal_manager
+from pyramid.threadlocal import RequestContext
class BaseIndexerProcess(Process):
@@ -44,15 +44,6 @@
def run(self):
logger.debug("Starting indexer thread...")
- # Loading components registry
- registry = get_global_registry()
- threadlocal_manager.set({'request': None, 'registry': registry})
- logger.debug("Getting global registry: {0!r}".format(registry))
- # Get ES client
- es_client = getattr(registry, 'pyramid_es_client', None)
- if es_client is None:
- logger.debug("Missing ElasticSearch client in registry!")
- return
# Check settings
settings = self.settings
logger.debug("Checking index parameters: {0}".format(str(settings)))
@@ -60,39 +51,52 @@
if not document_id:
logger.warning('Bad indexer request: {0}'.format(str(settings)))
return
- # Open ZODB connection
- manager = None
- zodb_name = settings.get('zodb_name')
- logger.debug("Opening ZODB connection...")
- with ZODBConnection(name=zodb_name) as root:
- try:
- logger.debug("Getting connection root {0!r}".format(root))
- application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
- application = root.get(application_name)
- logger.debug("Loading application {0!r} named {1}".format(application, application_name))
- if application is not None:
- # set local registry
- sm = application.getSiteManager()
- set_local_registry(sm)
- logger.debug("Setting local registry {0!r}".format(sm))
- # find document
- intids = get_utility(IIntIds)
- document = intids.queryObject(document_id)
- if document is None:
- logger.warning("Can't find requested document {0}!".format(document_id))
- return
- # index document
- logger.debug("Starting indexing for {0!r}".format(document))
- manager = ITransactionManager(document)
- for attempt in manager.attempts():
- with attempt as t:
- self.update_index(es_client, document)
- if t.status == 'Committed':
- break
- finally:
- if manager is not None:
- manager.abort()
- threadlocal_manager.pop()
+ # Loading components registry
+ registry = get_global_registry()
+ logger.debug("Getting global registry {0!r}".format(registry))
+ # Get ES client
+ es_client = getattr(registry, 'pyramid_es_client', None)
+ if es_client is None:
+ logger.debug("Missing ElasticSearch client in registry!")
+ return
+ # Create new request
+ request = check_request()
+ request.registry = registry
+ logger.debug("Creating new request {0!r}".format(request))
+ with RequestContext(request):
+ # Open ZODB connection
+ zodb_name = settings.get('zodb_name')
+ logger.debug("Opening ZODB connection...")
+ with ZODBConnection(name=zodb_name) as root:
+ manager = None
+ try:
+ logger.debug("Getting connection root {0!r}".format(root))
+ application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+ PYAMS_APPLICATION_DEFAULT_NAME)
+ application = root.get(application_name)
+ logger.debug("Loading application {0!r} named {1}".format(application, application_name))
+ if application is not None:
+ # set local registry
+ sm = application.getSiteManager()
+ set_local_registry(sm)
+ logger.debug("Setting local registry {0!r}".format(sm))
+ # find document
+ intids = get_utility(IIntIds)
+ document = intids.queryObject(document_id)
+ if document is None:
+ logger.warning("Can't find requested document {0}!".format(document_id))
+ return
+ # index document
+ logger.debug("Starting indexing for {0!r}".format(document))
+ manager = ITransactionManager(document)
+ for attempt in manager.attempts():
+ with attempt as t:
+ self.update_index(es_client, document)
+ if t.status == 'Committed':
+ break
+ finally:
+ if manager is not None:
+ manager.abort()
def update_index(self, client, document):
"""Update index"""