--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_scheduler/process.py Wed Mar 11 11:52:59 2015 +0100
@@ -0,0 +1,231 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (scheduler)')
+
+from datetime import datetime
+from threading import Thread
+
+# import interfaces
+from pyams_scheduler.interfaces import SCHEDULER_NAME
+from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
+from pyams_utils.interfaces.zeo import IZEOConnection
+
+# import packages
+from apscheduler.jobstores.memory import MemoryJobStore
+from apscheduler.schedulers.background import BackgroundScheduler
+from pyams_scheduler.task import ImmediateTaskTrigger
+from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.handler import ZMQMessageHandler
+from pyams_zmq.process import ZMQProcess
+
+
+class BaseTaskThread(Thread):
+
+ def __init__(self, process, settings):
+ Thread.__init__(self)
+ self.process = process
+ self.settings = settings
+
+ def _get_connection(self):
+ zeo_settings = self.settings.get('zeo')
+ connection = ZEOConnection()
+ connection.update(zeo_settings)
+ return connection
+
+
+class TaskResettingThread(BaseTaskThread):
+ """Task resetting thread
+
+ Task reset is run in another thread, so that:
+ - other transactions applied on updated tasks are visible
+ - ØMQ request returns immediately to calling process
+ """
+
+ def run(self):
+ logger.debug("Starting task resetting thread...")
+ settings = self.settings
+ job_id = settings.get('job_id')
+ if job_id is None:
+ return
+ job_id = str(job_id)
+ logger.debug("Loading ZEO connection...")
+ with self._get_connection() as root:
+ logger.debug("Loaded ZODB root {0!r}".format(root))
+ tm = None
+ try:
+ try:
+ application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+ PYAMS_APPLICATION_DEFAULT_NAME)
+ application = root.get(application_name)
+ logger.debug("Loaded application {0!r}".format(application))
+ sm = application.getSiteManager()
+ scheduler_util = sm.get(SCHEDULER_NAME)
+ logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+ scheduler = self.process.scheduler
+ logger.debug("Removing job '{0}'".format(job_id))
+ job = scheduler.get_job(job_id)
+ if job is not None:
+ logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
+ scheduler.remove_job(job.id)
+ logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
+ task = scheduler_util.get(settings.get('task_name').lower())
+ logger.debug("Loaded scheduler task {0!r}".format(task))
+ if task is not None:
+ trigger = task.get_trigger(self.process.registry)
+ logger.debug("Getting task trigger {0!r}".format(trigger))
+ zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
+ logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
+ scheduler.add_job(task, trigger,
+ id=str(task.internal_id),
+ name=task.name,
+ kwargs={'zeo_settings': zeo_connection.get_settings(),
+ 'registry': self.process.registry})
+ logger.debug("Added job")
+ except:
+ logger.exception("An exception occurred:")
+ finally:
+ if tm is not None:
+ tm.abort()
+
+
+class TaskRemoverThread(BaseTaskThread):
+ """Task remover thread"""
+
+ def run(self):
+ logger.debug("Starting task remover thread...")
+ settings = self.settings
+ job_id = settings.get('job_id')
+ if job_id is None:
+ return
+ job_id = str(job_id)
+ logger.debug("Loading ZEO connection...")
+ with self._get_connection() as root:
+ logger.debug("Loaded ZODB root {0!r}".format(root))
+ tm = None
+ try:
+ try:
+ application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+ PYAMS_APPLICATION_DEFAULT_NAME)
+ application = root.get(application_name)
+ logger.debug("Loaded application {0!r}".format(application))
+ sm = application.getSiteManager()
+ scheduler_util = sm.get(SCHEDULER_NAME)
+ logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+ scheduler = self.process.scheduler
+ logger.debug("Removing job '{0}'".format(job_id))
+ job = scheduler.get_job(job_id)
+ if job is not None:
+ logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
+ scheduler.remove_job(job.id)
+ logger.debug("Removed job")
+ except:
+ logger.exception("An exception occurred:")
+ finally:
+ if tm is not None:
+ tm.abort()
+
+
+class TaskRunnerThread(BaseTaskThread):
+ """Task immediate runner thread"""
+
+ def run(self):
+ logger.debug("Starting task runner thread...")
+ settings = self.settings
+ job_id = settings.get('job_id')
+ if job_id is None:
+ return
+ logger.debug("Loading ZEO connection...")
+ with self._get_connection() as root:
+ logger.debug("Loaded ZODB root {0!r}".format(root))
+ tm = None
+ try:
+ try:
+ application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+ PYAMS_APPLICATION_DEFAULT_NAME)
+ application = root.get(application_name)
+ logger.debug("Loaded application {0!r}".format(application))
+ sm = application.getSiteManager()
+ scheduler_util = sm.get(SCHEDULER_NAME)
+ logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+ scheduler = self.process.scheduler
+ logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
+ task = scheduler_util.get(settings.get('task_name').lower())
+ logger.debug("Loaded scheduler task {0!r}".format(task))
+ if task is not None:
+ trigger = ImmediateTaskTrigger()
+ logger.debug("Getting task trigger {0!r}".format(trigger))
+ zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
+ logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
+ scheduler.add_job(task, trigger,
+ id='{0.internal_id}::{1}'.format(task,
+ datetime.utcnow().isoformat()),
+ name=task.name,
+ kwargs={'zeo_settings': zeo_connection.get_settings(),
+ 'registry': self.process.registry,
+ 'run_immediate': True})
+ logger.debug("Added job")
+ except:
+ logger.exception("An exception occurred:")
+ finally:
+ if tm is not None:
+ tm.abort()
+
+
+class SchedulerHandler(object):
+ """Scheduler handler"""
+
+ def get_jobs(self, settings):
+ scheduler = self.process.scheduler
+ return [{'id': job.id,
+ 'name': job.name,
+ 'trigger': '{0!s}'.format(job.trigger),
+ 'next_run': job.next_run_time.timestamp()} for job in scheduler.get_jobs()]
+
+ def reset_task(self, settings):
+ TaskResettingThread(self.process, settings).start()
+ return 'OK'
+
+ def remove_task(self, settings):
+ TaskRemoverThread(self.process, settings).start()
+ return 'OK'
+
+ def run_task(self, settings):
+ TaskRunnerThread(self.process, settings).start()
+ return 'OK'
+
+
+class SchedulerMessageHandler(ZMQMessageHandler):
+ """ØMQ scheduler messages handler"""
+
+ handler = SchedulerHandler
+
+
+class SchedulerProcess(ZMQProcess):
+ """ØMQ tasks scheduler process"""
+
+ def __init__(self, zmq_address, handler, registry):
+ ZMQProcess.__init__(self, zmq_address, handler)
+ self.registry = registry
+ self.scheduler = BackgroundScheduler()
+ self.jobstore = MemoryJobStore()
+
+ def run(self):
+ if self.scheduler is not None:
+ self.scheduler.add_jobstore(self.jobstore, 'default')
+ self.scheduler.start()
+ ZMQProcess.run(self)