src/pyams_scheduler/process.py
changeset 0 48483b0b26fa
child 5 0fe262326e0e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_scheduler/process.py	Wed Mar 11 11:52:59 2015 +0100
@@ -0,0 +1,231 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (scheduler)')
+
+from datetime import datetime
+from threading import Thread
+
+# import interfaces
+from pyams_scheduler.interfaces import SCHEDULER_NAME
+from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
+from pyams_utils.interfaces.zeo import IZEOConnection
+
+# import packages
+from apscheduler.jobstores.memory import MemoryJobStore
+from apscheduler.schedulers.background import BackgroundScheduler
+from pyams_scheduler.task import ImmediateTaskTrigger
+from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.handler import ZMQMessageHandler
+from pyams_zmq.process import ZMQProcess
+
+
+class BaseTaskThread(Thread):
+
+    def __init__(self, process, settings):
+        Thread.__init__(self)
+        self.process = process
+        self.settings = settings
+
+    def _get_connection(self):
+        zeo_settings = self.settings.get('zeo')
+        connection = ZEOConnection()
+        connection.update(zeo_settings)
+        return connection
+
+
+class TaskResettingThread(BaseTaskThread):
+    """Task resetting thread
+
+    Task reset is run in another thread, so that:
+    - other transactions applied on updated tasks are visible
+    - ØMQ request returns immediately to calling process
+    """
+
+    def run(self):
+        logger.debug("Starting task resetting thread...")
+        settings = self.settings
+        job_id = settings.get('job_id')
+        if job_id is None:
+            return
+        job_id = str(job_id)
+        logger.debug("Loading ZEO connection...")
+        with self._get_connection() as root:
+            logger.debug("Loaded ZODB root {0!r}".format(root))
+            tm = None
+            try:
+                try:
+                    application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+                                                                          PYAMS_APPLICATION_DEFAULT_NAME)
+                    application = root.get(application_name)
+                    logger.debug("Loaded application {0!r}".format(application))
+                    sm = application.getSiteManager()
+                    scheduler_util = sm.get(SCHEDULER_NAME)
+                    logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+                    scheduler = self.process.scheduler
+                    logger.debug("Removing job '{0}'".format(job_id))
+                    job = scheduler.get_job(job_id)
+                    if job is not None:
+                        logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
+                        scheduler.remove_job(job.id)
+                    logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
+                    task = scheduler_util.get(settings.get('task_name').lower())
+                    logger.debug("Loaded scheduler task {0!r}".format(task))
+                    if task is not None:
+                        trigger = task.get_trigger(self.process.registry)
+                        logger.debug("Getting task trigger {0!r}".format(trigger))
+                        zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
+                        logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
+                        scheduler.add_job(task, trigger,
+                                          id=str(task.internal_id),
+                                          name=task.name,
+                                          kwargs={'zeo_settings': zeo_connection.get_settings(),
+                                                  'registry': self.process.registry})
+                        logger.debug("Added job")
+                except:
+                    logger.exception("An exception occurred:")
+            finally:
+                if tm is not None:
+                    tm.abort()
+
+
+class TaskRemoverThread(BaseTaskThread):
+    """Task remover thread"""
+
+    def run(self):
+        logger.debug("Starting task remover thread...")
+        settings = self.settings
+        job_id = settings.get('job_id')
+        if job_id is None:
+            return
+        job_id = str(job_id)
+        logger.debug("Loading ZEO connection...")
+        with self._get_connection() as root:
+            logger.debug("Loaded ZODB root {0!r}".format(root))
+            tm = None
+            try:
+                try:
+                    application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+                                                                          PYAMS_APPLICATION_DEFAULT_NAME)
+                    application = root.get(application_name)
+                    logger.debug("Loaded application {0!r}".format(application))
+                    sm = application.getSiteManager()
+                    scheduler_util = sm.get(SCHEDULER_NAME)
+                    logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+                    scheduler = self.process.scheduler
+                    logger.debug("Removing job '{0}'".format(job_id))
+                    job = scheduler.get_job(job_id)
+                    if job is not None:
+                        logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
+                        scheduler.remove_job(job.id)
+                    logger.debug("Removed job")
+                except:
+                    logger.exception("An exception occurred:")
+            finally:
+                if tm is not None:
+                    tm.abort()
+
+
+class TaskRunnerThread(BaseTaskThread):
+    """Task immediate runner thread"""
+
+    def run(self):
+        logger.debug("Starting task runner thread...")
+        settings = self.settings
+        job_id = settings.get('job_id')
+        if job_id is None:
+            return
+        logger.debug("Loading ZEO connection...")
+        with self._get_connection() as root:
+            logger.debug("Loaded ZODB root {0!r}".format(root))
+            tm = None
+            try:
+                try:
+                    application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+                                                                          PYAMS_APPLICATION_DEFAULT_NAME)
+                    application = root.get(application_name)
+                    logger.debug("Loaded application {0!r}".format(application))
+                    sm = application.getSiteManager()
+                    scheduler_util = sm.get(SCHEDULER_NAME)
+                    logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
+                    scheduler = self.process.scheduler
+                    logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
+                    task = scheduler_util.get(settings.get('task_name').lower())
+                    logger.debug("Loaded scheduler task {0!r}".format(task))
+                    if task is not None:
+                        trigger = ImmediateTaskTrigger()
+                        logger.debug("Getting task trigger {0!r}".format(trigger))
+                        zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
+                        logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
+                        scheduler.add_job(task, trigger,
+                                          id='{0.internal_id}::{1}'.format(task,
+                                                                           datetime.utcnow().isoformat()),
+                                          name=task.name,
+                                          kwargs={'zeo_settings': zeo_connection.get_settings(),
+                                                  'registry': self.process.registry,
+                                                  'run_immediate': True})
+                        logger.debug("Added job")
+                except:
+                    logger.exception("An exception occurred:")
+            finally:
+                if tm is not None:
+                    tm.abort()
+
+
+class SchedulerHandler(object):
+    """Scheduler handler"""
+
+    def get_jobs(self, settings):
+        scheduler = self.process.scheduler
+        return [{'id': job.id,
+                 'name': job.name,
+                 'trigger': '{0!s}'.format(job.trigger),
+                 'next_run': job.next_run_time.timestamp()} for job in scheduler.get_jobs()]
+
+    def reset_task(self, settings):
+        TaskResettingThread(self.process, settings).start()
+        return 'OK'
+
+    def remove_task(self, settings):
+        TaskRemoverThread(self.process, settings).start()
+        return 'OK'
+
+    def run_task(self, settings):
+        TaskRunnerThread(self.process, settings).start()
+        return 'OK'
+
+
+class SchedulerMessageHandler(ZMQMessageHandler):
+    """ØMQ scheduler messages handler"""
+
+    handler = SchedulerHandler
+
+
+class SchedulerProcess(ZMQProcess):
+    """ØMQ tasks scheduler process"""
+
+    def __init__(self, zmq_address, handler, registry):
+        ZMQProcess.__init__(self, zmq_address, handler)
+        self.registry = registry
+        self.scheduler = BackgroundScheduler()
+        self.jobstore = MemoryJobStore()
+
+    def run(self):
+        if self.scheduler is not None:
+            self.scheduler.add_jobstore(self.jobstore, 'default')
+            self.scheduler.start()
+        ZMQProcess.run(self)