# HG changeset patch # User Thierry Florac # Date 1515686848 -3600 # Node ID 0b31d2492f15596207bfa88415fb19f3143a5ea6 # Parent eaf9751db04a416feee61e181d2f1cfb6e39487f Use ZODBConnection instead of ZEOConnection to correctly handle any ZODB storage diff -r eaf9751db04a -r 0b31d2492f15 src/pyams_scheduler/include.py --- a/src/pyams_scheduler/include.py Mon Dec 11 15:26:53 2017 +0100 +++ b/src/pyams_scheduler/include.py Thu Jan 11 17:07:28 2018 +0100 @@ -23,13 +23,12 @@ # import interfaces from pyams_scheduler.interfaces import SCHEDULER_HANDLER_KEY, SCHEDULER_NAME from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME -from pyams_utils.interfaces.zeo import IZEOConnection from pyramid.interfaces import IApplicationCreated from zope.interface.interfaces import ComponentLookupError # import packages from pyams_scheduler.process import SchedulerProcess, SchedulerMessageHandler -from pyams_utils.registry import set_local_registry, query_utility +from pyams_utils.registry import set_local_registry from pyams_utils.zodb import get_connection_from_settings from pyams_zmq.process import process_exit_func from pyramid.events import subscriber @@ -76,28 +75,27 @@ try: scheduler_util = sm.get(SCHEDULER_NAME) try: - zeo_connection_name = scheduler_util.zeo_connection + zodb_name = scheduler_util.zodb_name except ComponentLookupError: pass else: - zeo_connection = query_utility(IZEOConnection, name=zeo_connection_name or '') - if zeo_connection is not None: - # create scheduler process - process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry) - # load tasks - for task in scheduler_util.values(): - trigger = task.get_trigger(registry) - logger.debug("Adding scheduler job for task '{0.name}'".format(task)) - process.scheduler.add_job(task, trigger, - id=str(task.internal_id), - name=task.name, - kwargs={'zeo_settings': zeo_connection.get_settings(), - 'registry': registry}) - # start process - logger.debug("Starting tasks scheduler {0!r}...".format(process)) - process.start() - if process.is_alive(): - atexit.register(process_exit_func, process=process) + # create scheduler process + process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry) + # load tasks + for task in scheduler_util.values(): + trigger = task.get_trigger(registry) + logger.debug("Adding scheduler job for task '{0.name}'".format(task)) + process.scheduler.add_job(task, trigger, + id=str(task.internal_id), + name=task.name, + kwargs={'zodb_name': zodb_name, + 'registry': registry}) + # start process + logger.debug("Starting tasks scheduler {0!r}...".format(process)) + process.start() + if process.is_alive(): + atexit.register(process_exit_func, process=process) + logger.debug("Started tasks scheduler {0!r} with PID {1}...".format(process, process.pid)) finally: if process and not process.is_alive(): process.terminate() diff -r eaf9751db04a -r 0b31d2492f15 src/pyams_scheduler/interfaces/__init__.py --- a/src/pyams_scheduler/interfaces/__init__.py Mon Dec 11 15:26:53 2017 +0100 +++ b/src/pyams_scheduler/interfaces/__init__.py Thu Jan 11 17:07:28 2018 +0100 @@ -100,10 +100,11 @@ contains('pyams_scheduler.interfaces.ITask') - zeo_connection = Choice(title=_("ZEO connection name"), - description=_("Name of ZEO connection utility defining scheduler connection"), - required=True, - vocabulary="PyAMS ZEO connections") + zodb_name = Choice(title=_("ZODB connection name"), + description=_("Name of ZODB defining scheduler connection"), + required=False, + default='', + vocabulary="PyAMS ZODB connections") report_mailer = Choice(title=_("Reports mailer"), description=_("Mail delivery utility used to send mails"), diff -r eaf9751db04a -r 0b31d2492f15 src/pyams_scheduler/process.py --- a/src/pyams_scheduler/process.py Mon Dec 11 15:26:53 2017 +0100 +++ b/src/pyams_scheduler/process.py Thu Jan 11 17:07:28 2018 +0100 @@ -23,13 +23,12 @@ # import interfaces from pyams_scheduler.interfaces import SCHEDULER_NAME from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME -from pyams_utils.interfaces.zeo import IZEOConnection # import packages from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from pyams_scheduler.task import ImmediateTaskTrigger -from pyams_utils.zodb import ZEOConnection +from pyams_utils.zodb import ZODBConnection from pyams_zmq.handler import ZMQMessageHandler from pyams_zmq.process import ZMQProcess @@ -42,10 +41,8 @@ self.settings = settings def _get_connection(self): - zeo_settings = self.settings.get('zeo') - connection = ZEOConnection() - connection.update(zeo_settings) - return connection + zodb_name = self.settings.get('zodb_name') + return ZODBConnection(name=zodb_name) class TaskResettingThread(BaseTaskThread): @@ -63,7 +60,7 @@ if job_id is None: return job_id = str(job_id) - logger.debug("Loading ZEO connection...") + logger.debug("Loading ZODB connection...") with self._get_connection() as root: logger.debug("Loaded ZODB root {0!r}".format(root)) try: @@ -86,12 +83,11 @@ if task is not None: trigger = task.get_trigger(self.process.registry) logger.debug("Getting task trigger {0!r}".format(trigger)) - zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) scheduler.add_job(task, trigger, id=str(task.internal_id), name=task.name, - kwargs={'zeo_settings': zeo_connection.get_settings(), + kwargs={'zodb_name': scheduler_util.zodb_name, 'registry': self.process.registry}) logger.debug("Added job") except: @@ -108,7 +104,7 @@ if job_id is None: return job_id = str(job_id) - logger.debug("Loading ZEO connection...") + logger.debug("Loading ZODB connection...") with self._get_connection() as root: logger.debug("Loaded ZODB root {0!r}".format(root)) try: @@ -139,7 +135,7 @@ job_id = settings.get('job_id') if job_id is None: return - logger.debug("Loading ZEO connection...") + logger.debug("Loading ZODB connection...") with self._get_connection() as root: logger.debug("Loaded ZODB root {0!r}".format(root)) try: @@ -157,13 +153,12 @@ if task is not None: trigger = ImmediateTaskTrigger() logger.debug("Getting task trigger {0!r}".format(trigger)) - zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) scheduler.add_job(task, trigger, id='{0.internal_id}::{1}'.format(task, datetime.utcnow().isoformat()), name=task.name, - kwargs={'zeo_settings': zeo_connection.get_settings(), + kwargs={'zodb_name': scheduler_util.zodb_name, 'registry': self.process.registry, 'run_immediate': True}) logger.debug("Added job") diff -r eaf9751db04a -r 0b31d2492f15 src/pyams_scheduler/scheduler.py --- a/src/pyams_scheduler/scheduler.py Mon Dec 11 15:26:53 2017 +0100 +++ b/src/pyams_scheduler/scheduler.py Thu Jan 11 17:07:28 2018 +0100 @@ -41,7 +41,7 @@ class Scheduler(Folder): """Scheduler utility""" - zeo_connection = FieldProperty(IScheduler['zeo_connection']) + zodb_name = FieldProperty(IScheduler['zodb_name']) report_mailer = FieldProperty(IScheduler['report_mailer']) report_source = FieldProperty(IScheduler['report_source']) @@ -58,12 +58,12 @@ @property def internal_id(self): - intids = query_utility(IIntIds, context=self) + intids = query_utility(IIntIds) if intids is not None: return intids.register(self) def get_task(self, task_id): - intids = query_utility(IIntIds, context=self) + intids = query_utility(IIntIds) if intids is not None: return intids.queryObject(task_id) diff -r eaf9751db04a -r 0b31d2492f15 src/pyams_scheduler/task.py --- a/src/pyams_scheduler/task.py Mon Dec 11 15:26:53 2017 +0100 +++ b/src/pyams_scheduler/task.py Thu Jan 11 17:07:28 2018 +0100 @@ -26,7 +26,6 @@ from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \ SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY -from pyams_utils.interfaces.zeo import IZEOConnection from pyramid_mailer.interfaces import IMailer from transaction.interfaces import ITransactionManager from zope.component.interfaces import ISite @@ -44,7 +43,7 @@ from pyams_utils.request import check_request from pyams_utils.timezone import tztime from pyams_utils.traversing import get_parent -from pyams_utils.zodb import ZEOConnection +from pyams_utils.zodb import ZODBConnection from pyams_zmq.socket import zmq_socket, zmq_response from pyramid.events import subscriber from pyramid_mailer.message import Message @@ -195,8 +194,7 @@ if request.registry: handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: - zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - zmq_settings = {'zeo': zeo.get_settings(), + zmq_settings = {'zodb_name': scheduler_util.zodb_name, 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) @@ -221,8 +219,7 @@ if request.registry: handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: - zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - zmq_settings = {'zeo': zeo.get_settings(), + zmq_settings = {'zodb_name': scheduler_util.zodb_name, 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) @@ -245,9 +242,8 @@ def _run(self, report, **kwargs): """Task execution wrapper""" - zeo_connection = ZEOConnection() - zeo_connection.update(kwargs.get('zeo_settings')) - with zeo_connection as root: + zodb_connection = ZODBConnection(name=kwargs.get('zodb_name', '')) + with zodb_connection as root: try: registry = kwargs.get('registry') request = check_request() @@ -285,7 +281,9 @@ break except: self._log_exception(None, "Can't execute scheduled job {0}".format(self.name)) - ITransactionManager(self).abort() + tm = ITransactionManager(self, None) + if tm is not None: + tm.abort() def run(self, report): raise NotImplemented("The 'run' method must be implemented by Task subclasses!") @@ -372,8 +370,7 @@ if handler: task = event.object scheduler_util = query_utility(IScheduler) - zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - zmq_settings = {'zeo': zeo.get_settings(), + zmq_settings = {'zodb_name': scheduler_util.zodb_name, 'task_name': task.__name__, 'job_id': task.internal_id} logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))