--- a/src/pyams_scheduler/include.py Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/include.py Thu Jan 11 17:07:28 2018 +0100
@@ -23,13 +23,12 @@
# import interfaces
from pyams_scheduler.interfaces import SCHEDULER_HANDLER_KEY, SCHEDULER_NAME
from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
-from pyams_utils.interfaces.zeo import IZEOConnection
from pyramid.interfaces import IApplicationCreated
from zope.interface.interfaces import ComponentLookupError
# import packages
from pyams_scheduler.process import SchedulerProcess, SchedulerMessageHandler
-from pyams_utils.registry import set_local_registry, query_utility
+from pyams_utils.registry import set_local_registry
from pyams_utils.zodb import get_connection_from_settings
from pyams_zmq.process import process_exit_func
from pyramid.events import subscriber
@@ -76,28 +75,27 @@
try:
scheduler_util = sm.get(SCHEDULER_NAME)
try:
- zeo_connection_name = scheduler_util.zeo_connection
+ zodb_name = scheduler_util.zodb_name
except ComponentLookupError:
pass
else:
- zeo_connection = query_utility(IZEOConnection, name=zeo_connection_name or '')
- if zeo_connection is not None:
- # create scheduler process
- process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry)
- # load tasks
- for task in scheduler_util.values():
- trigger = task.get_trigger(registry)
- logger.debug("Adding scheduler job for task '{0.name}'".format(task))
- process.scheduler.add_job(task, trigger,
- id=str(task.internal_id),
- name=task.name,
- kwargs={'zeo_settings': zeo_connection.get_settings(),
- 'registry': registry})
- # start process
- logger.debug("Starting tasks scheduler {0!r}...".format(process))
- process.start()
- if process.is_alive():
- atexit.register(process_exit_func, process=process)
+ # create scheduler process
+ process = SchedulerProcess(start_handler, SchedulerMessageHandler, registry)
+ # load tasks
+ for task in scheduler_util.values():
+ trigger = task.get_trigger(registry)
+ logger.debug("Adding scheduler job for task '{0.name}'".format(task))
+ process.scheduler.add_job(task, trigger,
+ id=str(task.internal_id),
+ name=task.name,
+ kwargs={'zodb_name': zodb_name,
+ 'registry': registry})
+ # start process
+ logger.debug("Starting tasks scheduler {0!r}...".format(process))
+ process.start()
+ if process.is_alive():
+ atexit.register(process_exit_func, process=process)
+ logger.debug("Started tasks scheduler {0!r} with PID {1}...".format(process, process.pid))
finally:
if process and not process.is_alive():
process.terminate()
--- a/src/pyams_scheduler/interfaces/__init__.py Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/interfaces/__init__.py Thu Jan 11 17:07:28 2018 +0100
@@ -100,10 +100,11 @@
contains('pyams_scheduler.interfaces.ITask')
- zeo_connection = Choice(title=_("ZEO connection name"),
- description=_("Name of ZEO connection utility defining scheduler connection"),
- required=True,
- vocabulary="PyAMS ZEO connections")
+ zodb_name = Choice(title=_("ZODB connection name"),
+ description=_("Name of ZODB defining scheduler connection"),
+ required=False,
+ default='',
+ vocabulary="PyAMS ZODB connections")
report_mailer = Choice(title=_("Reports mailer"),
description=_("Mail delivery utility used to send mails"),
--- a/src/pyams_scheduler/process.py Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/process.py Thu Jan 11 17:07:28 2018 +0100
@@ -23,13 +23,12 @@
# import interfaces
from pyams_scheduler.interfaces import SCHEDULER_NAME
from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
-from pyams_utils.interfaces.zeo import IZEOConnection
# import packages
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from pyams_scheduler.task import ImmediateTaskTrigger
-from pyams_utils.zodb import ZEOConnection
+from pyams_utils.zodb import ZODBConnection
from pyams_zmq.handler import ZMQMessageHandler
from pyams_zmq.process import ZMQProcess
@@ -42,10 +41,8 @@
self.settings = settings
def _get_connection(self):
- zeo_settings = self.settings.get('zeo')
- connection = ZEOConnection()
- connection.update(zeo_settings)
- return connection
+ zodb_name = self.settings.get('zodb_name')
+ return ZODBConnection(name=zodb_name)
class TaskResettingThread(BaseTaskThread):
@@ -63,7 +60,7 @@
if job_id is None:
return
job_id = str(job_id)
- logger.debug("Loading ZEO connection...")
+ logger.debug("Loading ZODB connection...")
with self._get_connection() as root:
logger.debug("Loaded ZODB root {0!r}".format(root))
try:
@@ -86,12 +83,11 @@
if task is not None:
trigger = task.get_trigger(self.process.registry)
logger.debug("Getting task trigger {0!r}".format(trigger))
- zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
scheduler.add_job(task, trigger,
id=str(task.internal_id),
name=task.name,
- kwargs={'zeo_settings': zeo_connection.get_settings(),
+ kwargs={'zodb_name': scheduler_util.zodb_name,
'registry': self.process.registry})
logger.debug("Added job")
except:
@@ -108,7 +104,7 @@
if job_id is None:
return
job_id = str(job_id)
- logger.debug("Loading ZEO connection...")
+ logger.debug("Loading ZODB connection...")
with self._get_connection() as root:
logger.debug("Loaded ZODB root {0!r}".format(root))
try:
@@ -139,7 +135,7 @@
job_id = settings.get('job_id')
if job_id is None:
return
- logger.debug("Loading ZEO connection...")
+ logger.debug("Loading ZODB connection...")
with self._get_connection() as root:
logger.debug("Loaded ZODB root {0!r}".format(root))
try:
@@ -157,13 +153,12 @@
if task is not None:
trigger = ImmediateTaskTrigger()
logger.debug("Getting task trigger {0!r}".format(trigger))
- zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
scheduler.add_job(task, trigger,
id='{0.internal_id}::{1}'.format(task,
datetime.utcnow().isoformat()),
name=task.name,
- kwargs={'zeo_settings': zeo_connection.get_settings(),
+ kwargs={'zodb_name': scheduler_util.zodb_name,
'registry': self.process.registry,
'run_immediate': True})
logger.debug("Added job")
--- a/src/pyams_scheduler/scheduler.py Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/scheduler.py Thu Jan 11 17:07:28 2018 +0100
@@ -41,7 +41,7 @@
class Scheduler(Folder):
"""Scheduler utility"""
- zeo_connection = FieldProperty(IScheduler['zeo_connection'])
+ zodb_name = FieldProperty(IScheduler['zodb_name'])
report_mailer = FieldProperty(IScheduler['report_mailer'])
report_source = FieldProperty(IScheduler['report_source'])
@@ -58,12 +58,12 @@
@property
def internal_id(self):
- intids = query_utility(IIntIds, context=self)
+ intids = query_utility(IIntIds)
if intids is not None:
return intids.register(self)
def get_task(self, task_id):
- intids = query_utility(IIntIds, context=self)
+ intids = query_utility(IIntIds)
if intids is not None:
return intids.queryObject(task_id)
--- a/src/pyams_scheduler/task.py Mon Dec 11 15:26:53 2017 +0100
+++ b/src/pyams_scheduler/task.py Thu Jan 11 17:07:28 2018 +0100
@@ -26,7 +26,6 @@
from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
-from pyams_utils.interfaces.zeo import IZEOConnection
from pyramid_mailer.interfaces import IMailer
from transaction.interfaces import ITransactionManager
from zope.component.interfaces import ISite
@@ -44,7 +43,7 @@
from pyams_utils.request import check_request
from pyams_utils.timezone import tztime
from pyams_utils.traversing import get_parent
-from pyams_utils.zodb import ZEOConnection
+from pyams_utils.zodb import ZODBConnection
from pyams_zmq.socket import zmq_socket, zmq_response
from pyramid.events import subscriber
from pyramid_mailer.message import Message
@@ -195,8 +194,7 @@
if request.registry:
handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
if handler:
- zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- zmq_settings = {'zeo': zeo.get_settings(),
+ zmq_settings = {'zodb_name': scheduler_util.zodb_name,
'task_name': self.__name__,
'job_id': kwargs.get('job_id')}
logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
@@ -221,8 +219,7 @@
if request.registry:
handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
if handler:
- zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- zmq_settings = {'zeo': zeo.get_settings(),
+ zmq_settings = {'zodb_name': scheduler_util.zodb_name,
'task_name': self.__name__,
'job_id': kwargs.get('job_id')}
logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
@@ -245,9 +242,8 @@
def _run(self, report, **kwargs):
"""Task execution wrapper"""
- zeo_connection = ZEOConnection()
- zeo_connection.update(kwargs.get('zeo_settings'))
- with zeo_connection as root:
+ zodb_connection = ZODBConnection(name=kwargs.get('zodb_name', ''))
+ with zodb_connection as root:
try:
registry = kwargs.get('registry')
request = check_request()
@@ -285,7 +281,9 @@
break
except:
self._log_exception(None, "Can't execute scheduled job {0}".format(self.name))
- ITransactionManager(self).abort()
+ tm = ITransactionManager(self, None)
+ if tm is not None:
+ tm.abort()
def run(self, report):
raise NotImplemented("The 'run' method must be implemented by Task subclasses!")
@@ -372,8 +370,7 @@
if handler:
task = event.object
scheduler_util = query_utility(IScheduler)
- zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- zmq_settings = {'zeo': zeo.get_settings(),
+ zmq_settings = {'zodb_name': scheduler_util.zodb_name,
'task_name': task.__name__,
'job_id': task.internal_id}
logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))