Use ZODBConnection instead of ZEOConnection to correctly handle any ZODB storage
authorThierry Florac <thierry.florac@onf.fr>
Thu, 11 Jan 2018 17:07:28 +0100
changeset 41 0b31d2492f15
parent 40 eaf9751db04a
child 42 8cacad2470d5
Use ZODBConnection instead of ZEOConnection to correctly handle any ZODB storage
src/pyams_scheduler/include.py
src/pyams_scheduler/interfaces/__init__.py
src/pyams_scheduler/process.py
src/pyams_scheduler/scheduler.py
src/pyams_scheduler/task.py
--- a/src/pyams_scheduler/include.py	Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/include.py	Thu Jan 11 17:07:28 2018 +0100
@@ -23,13 +23,12 @@
 # import interfaces
 from pyams_scheduler.interfaces import SCHEDULER_HANDLER_KEY, SCHEDULER_NAME
 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
-from pyams_utils.interfaces.zeo import IZEOConnection
 from pyramid.interfaces import IApplicationCreated
 from zope.interface.interfaces import ComponentLookupError
 
 # import packages
 from pyams_scheduler.process import SchedulerProcess, SchedulerMessageHandler
-from pyams_utils.registry import set_local_registry, query_utility
+from pyams_utils.registry import set_local_registry
 from pyams_utils.zodb import get_connection_from_settings
 from pyams_zmq.process import process_exit_func
 from pyramid.events import subscriber
@@ -76,28 +75,27 @@
             try:
                 scheduler_util = sm.get(SCHEDULER_NAME)
                 try:
-                    zeo_connection_name = scheduler_util.zeo_connection
+                    zodb_name = scheduler_util.zodb_name
                 except ComponentLookupError:
                     pass
                 else:
-                    zeo_connection = query_utility(IZEOConnection, name=zeo_connection_name or '')
-                    if zeo_connection is not None:
-                        # create scheduler process
-                        process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry)
-                        # load tasks
-                        for task in scheduler_util.values():
-                            trigger = task.get_trigger(registry)
-                            logger.debug("Adding scheduler job for task '{0.name}'".format(task))
-                            process.scheduler.add_job(task, trigger,
-                                                      id=str(task.internal_id),
-                                                      name=task.name,
-                                                      kwargs={'zeo_settings': zeo_connection.get_settings(),
-                                                              'registry': registry})
-                        # start process
-                        logger.debug("Starting tasks scheduler {0!r}...".format(process))
-                        process.start()
-                        if process.is_alive():
-                            atexit.register(process_exit_func, process=process)
+                    # create scheduler process
+                    process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry)
+                    # load tasks
+                    for task in scheduler_util.values():
+                        trigger = task.get_trigger(registry)
+                        logger.debug("Adding scheduler job for task '{0.name}'".format(task))
+                        process.scheduler.add_job(task, trigger,
+                                                  id=str(task.internal_id),
+                                                  name=task.name,
+                                                  kwargs={'zodb_name': zodb_name,
+                                                          'registry': registry})
+                    # start process
+                    logger.debug("Starting tasks scheduler {0!r}...".format(process))
+                    process.start()
+                    if process.is_alive():
+                        atexit.register(process_exit_func, process=process)
+                        logger.debug("Started tasks scheduler {0!r} with PID {1}...".format(process, process.pid))
             finally:
                 if process and not process.is_alive():
                     process.terminate()
--- a/src/pyams_scheduler/interfaces/__init__.py	Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/interfaces/__init__.py	Thu Jan 11 17:07:28 2018 +0100
@@ -100,10 +100,11 @@
 
     contains('pyams_scheduler.interfaces.ITask')
 
-    zeo_connection = Choice(title=_("ZEO connection name"),
-                            description=_("Name of ZEO connection utility defining scheduler connection"),
-                            required=True,
-                            vocabulary="PyAMS ZEO connections")
+    zodb_name = Choice(title=_("ZODB connection name"),
+                       description=_("Name of ZODB defining scheduler connection"),
+                       required=False,
+                       default='',
+                       vocabulary="PyAMS ZODB connections")
 
     report_mailer = Choice(title=_("Reports mailer"),
                            description=_("Mail delivery utility used to send mails"),
--- a/src/pyams_scheduler/process.py	Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/process.py	Thu Jan 11 17:07:28 2018 +0100
@@ -23,13 +23,12 @@
 # import interfaces
 from pyams_scheduler.interfaces import SCHEDULER_NAME
 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
-from pyams_utils.interfaces.zeo import IZEOConnection
 
 # import packages
 from apscheduler.jobstores.memory import MemoryJobStore
 from apscheduler.schedulers.background import BackgroundScheduler
 from pyams_scheduler.task import ImmediateTaskTrigger
-from pyams_utils.zodb import ZEOConnection
+from pyams_utils.zodb import ZODBConnection
 from pyams_zmq.handler import ZMQMessageHandler
 from pyams_zmq.process import ZMQProcess
 
@@ -42,10 +41,8 @@
         self.settings = settings
 
     def _get_connection(self):
-        zeo_settings = self.settings.get('zeo')
-        connection = ZEOConnection()
-        connection.update(zeo_settings)
-        return connection
+        zodb_name = self.settings.get('zodb_name')
+        return ZODBConnection(name=zodb_name)
 
 
 class TaskResettingThread(BaseTaskThread):
@@ -63,7 +60,7 @@
         if job_id is None:
             return
         job_id = str(job_id)
-        logger.debug("Loading ZEO connection...")
+        logger.debug("Loading ZODB connection...")
         with self._get_connection() as root:
             logger.debug("Loaded ZODB root {0!r}".format(root))
             try:
@@ -86,12 +83,11 @@
                 if task is not None:
                     trigger = task.get_trigger(self.process.registry)
                     logger.debug("Getting task trigger {0!r}".format(trigger))
-                    zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
                     logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
                     scheduler.add_job(task, trigger,
                                       id=str(task.internal_id),
                                       name=task.name,
-                                      kwargs={'zeo_settings': zeo_connection.get_settings(),
+                                      kwargs={'zodb_name': scheduler_util.zodb_name,
                                               'registry': self.process.registry})
                     logger.debug("Added job")
             except:
@@ -108,7 +104,7 @@
         if job_id is None:
             return
         job_id = str(job_id)
-        logger.debug("Loading ZEO connection...")
+        logger.debug("Loading ZODB connection...")
         with self._get_connection() as root:
             logger.debug("Loaded ZODB root {0!r}".format(root))
             try:
@@ -139,7 +135,7 @@
         job_id = settings.get('job_id')
         if job_id is None:
             return
-        logger.debug("Loading ZEO connection...")
+        logger.debug("Loading ZODB connection...")
         with self._get_connection() as root:
             logger.debug("Loaded ZODB root {0!r}".format(root))
             try:
@@ -157,13 +153,12 @@
                 if task is not None:
                     trigger = ImmediateTaskTrigger()
                     logger.debug("Getting task trigger {0!r}".format(trigger))
-                    zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
                     logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
                     scheduler.add_job(task, trigger,
                                       id='{0.internal_id}::{1}'.format(task,
                                                                        datetime.utcnow().isoformat()),
                                       name=task.name,
-                                      kwargs={'zeo_settings': zeo_connection.get_settings(),
+                                      kwargs={'zodb_name': scheduler_util.zodb_name,
                                               'registry': self.process.registry,
                                               'run_immediate': True})
                     logger.debug("Added job")
--- a/src/pyams_scheduler/scheduler.py	Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/scheduler.py	Thu Jan 11 17:07:28 2018 +0100
@@ -41,7 +41,7 @@
 class Scheduler(Folder):
     """Scheduler utility"""
 
-    zeo_connection = FieldProperty(IScheduler['zeo_connection'])
+    zodb_name = FieldProperty(IScheduler['zodb_name'])
     report_mailer = FieldProperty(IScheduler['report_mailer'])
     report_source = FieldProperty(IScheduler['report_source'])
 
@@ -58,12 +58,12 @@
 
     @property
     def internal_id(self):
-        intids = query_utility(IIntIds, context=self)
+        intids = query_utility(IIntIds)
         if intids is not None:
             return intids.register(self)
 
     def get_task(self, task_id):
-        intids = query_utility(IIntIds, context=self)
+        intids = query_utility(IIntIds)
         if intids is not None:
             return intids.queryObject(task_id)
 
--- a/src/pyams_scheduler/task.py	Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/task.py	Thu Jan 11 17:07:28 2018 +0100
@@ -26,7 +26,6 @@
 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
     SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
-from pyams_utils.interfaces.zeo import IZEOConnection
 from pyramid_mailer.interfaces import IMailer
 from transaction.interfaces import ITransactionManager
 from zope.component.interfaces import ISite
@@ -44,7 +43,7 @@
 from pyams_utils.request import check_request
 from pyams_utils.timezone import tztime
 from pyams_utils.traversing import get_parent
-from pyams_utils.zodb import ZEOConnection
+from pyams_utils.zodb import ZODBConnection
 from pyams_zmq.socket import zmq_socket, zmq_response
 from pyramid.events import subscriber
 from pyramid_mailer.message import Message
@@ -195,8 +194,7 @@
         if request.registry:
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
-                zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                zmq_settings = {'zeo': zeo.get_settings(),
+                zmq_settings = {'zodb_name': scheduler_util.zodb_name,
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
@@ -221,8 +219,7 @@
         if request.registry:
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
-                zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                zmq_settings = {'zeo': zeo.get_settings(),
+                zmq_settings = {'zodb_name': scheduler_util.zodb_name,
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
@@ -245,9 +242,8 @@
 
     def _run(self, report, **kwargs):
         """Task execution wrapper"""
-        zeo_connection = ZEOConnection()
-        zeo_connection.update(kwargs.get('zeo_settings'))
-        with zeo_connection as root:
+        zodb_connection = ZODBConnection(name=kwargs.get('zodb_name', ''))
+        with zodb_connection as root:
             try:
                 registry = kwargs.get('registry')
                 request = check_request()
@@ -285,7 +281,9 @@
                             break
             except:
                 self._log_exception(None, "Can't execute scheduled job {0}".format(self.name))
-        ITransactionManager(self).abort()
+            tm = ITransactionManager(self, None)
+            if tm is not None:
+                tm.abort()
 
     def run(self, report):
         raise NotImplemented("The 'run' method must be implemented by Task subclasses!")
@@ -372,8 +370,7 @@
         if handler:
             task = event.object
             scheduler_util = query_utility(IScheduler)
-            zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-            zmq_settings = {'zeo': zeo.get_settings(),
+            zmq_settings = {'zodb_name': scheduler_util.zodb_name,
                             'task_name': task.__name__,
                             'job_id': task.internal_id}
             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))