src/pyams_scheduler/task.py
changeset 0 48483b0b26fa
child 7 570db7b9f0f6
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_scheduler/task.py	Wed Mar 11 11:52:59 2015 +0100
@@ -0,0 +1,365 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (scheduler)')
+
+import traceback
+from datetime import datetime, timedelta
+from io import StringIO
+
+# import interfaces
+from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
+    SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
+from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
+from pyams_utils.interfaces.zeo import IZEOConnection
+from pyramid_mailer.interfaces import IMailer
+from transaction.interfaces import ITransactionManager
+from zope.component.interfaces import ISite
+from zope.intid.interfaces import IIntIds
+
+# import packages
+import transaction
+import zmq
+from apscheduler.triggers.base import BaseTrigger
+from persistent import Persistent
+from pyams_utils.date import get_duration
+from pyams_utils.registry import query_utility, get_utility
+from pyams_utils.request import check_request
+from pyams_utils.timezone import tztime
+from pyams_utils.traversing import get_parent
+from pyams_utils.zodb import ZEOConnection
+from pyramid.events import subscriber
+from pyramid_mailer.message import Message
+from zope.container.contained import Contained
+from zope.container.folder import Folder
+from zope.interface import implementer, alsoProvides, noLongerProvides
+from zope.lifecycleevent import ObjectRemovedEvent, ObjectModifiedEvent
+from zope.location import locate
+from zope.schema.fieldproperty import FieldProperty
+
+
+class ImmediateTaskTrigger(BaseTrigger):
+    """Immediate-style task scheduler"""
+
+    def get_next_fire_time(self, previous_fire_time, now):
+        if previous_fire_time:
+            return None
+        else:
+            return now + timedelta(seconds=5)
+
+
+@implementer(ITaskHistory)
+class TaskHistoryItem(Persistent, Contained):
+    """Task history item"""
+
+    date = FieldProperty(ITaskHistory['date'])
+    status = FieldProperty(ITaskHistory['status'])
+    report = FieldProperty(ITaskHistory['report'])
+
+    def __init__(self, **kwargs):
+        for key, value in kwargs.items():
+            setattr(self, key, value)
+
+
+@implementer(ITaskHistoryContainer)
+class TaskHistoryContainer(Folder):
+    """Task history container"""
+
+    def check_history(self, duration, length):
+        now = tztime(datetime.utcnow())
+        if duration:
+            for key in [k for k in self.keys()]:
+                if (now - self[key].date).days > duration:
+                    del self[key]
+        if length and (len(self) > length):
+            keys = sorted(self.keys(), reverse=True)[:length]
+            for key in [k for k in self.keys()]:
+                if key not in keys:
+                    del self[key]
+
+
+@implementer(ITask)
+class Task(Persistent, Contained):
+    """Task definition persistent class"""
+
+    name = FieldProperty(ITask['name'])
+    _schedule_mode = FieldProperty(ITask['schedule_mode'])
+    report_target = FieldProperty(ITask['report_target'])
+    errors_target = FieldProperty(ITask['errors_target'])
+    report_errors_only = FieldProperty(ITask['report_errors_only'])
+    send_empty_reports = FieldProperty(ITask['send_empty_reports'])
+    keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
+    _history_duration = FieldProperty(ITask['history_duration'])
+    _history_length = FieldProperty(ITask['history_length'])
+
+    def __init__(self):
+        history = self.history = TaskHistoryContainer()
+        locate(history, self, '++history++')
+
+    @property
+    def schedule_mode(self):
+        return self._schedule_mode
+
+    @schedule_mode.setter
+    def schedule_mode(self, value):
+        if self._schedule_mode is not None:
+            mode = query_utility(ITaskSchedulingMode, name=self._schedule_mode)
+            if (mode is not None) and mode.marker_interface.providedBy(self):
+                noLongerProvides(self, mode.marker_interface)
+        self._schedule_mode = value
+        if value:
+            mode = get_utility(ITaskSchedulingMode, name=value)
+            alsoProvides(self, mode.marker_interface)
+            mode.schema(self).active = False
+            self.reset()
+
+    @property
+    def history_duration(self):
+        return self._history_duration
+
+    @history_duration.setter
+    def history_duration(self, value):
+        self._history_duration = value
+
+    @property
+    def history_length(self):
+        return self._history_length
+
+    @history_length.setter
+    def history_length(self, value):
+        self._history_length = value
+
+    def check_history(self):
+        self.history.check_history(self.history_duration, self.history_length)
+
+    @property
+    def internal_id(self):
+        site = get_parent(self, ISite)
+        sm = site.getSiteManager()
+        intids = sm.queryUtility(IIntIds)
+        if intids is not None:
+            return intids.register(self)
+
+    def get_trigger(self, registry):
+        mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+        if mode is None:
+            return None
+        return mode.get_trigger(self)
+
+    def get_scheduling_info(self, registry):
+        mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+        if mode is None:
+            return None
+        return mode.schema(self, None)
+
+    def reset(self):
+        scheduler_util = query_utility(IScheduler)
+        if scheduler_util is not None:
+            request = check_request()
+            transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
+                                                                          'registry': request.registry})
+
+    def _reset_action(self, status, *args, **kwargs):
+        if not status:
+            return
+        scheduler_util = kwargs.get('scheduler')
+        if scheduler_util is None:
+            return
+        request = check_request()
+        if request.registry:
+            handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+            if handler:
+                zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+                context = zmq.Context()
+                socket = context.socket(zmq.REQ)
+                socket.connect('tcp://{0}'.format(handler))
+                zmq_settings = {'zeo': zeo.get_settings(),
+                                'task_name': self.__name__,
+                                'job_id': self.internal_id}
+                logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
+                socket.send_json(['reset_task', zmq_settings])
+                socket.recv_json()
+
+    def launch(self):
+        scheduler_util = query_utility(IScheduler)
+        if scheduler_util is not None:
+            transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util})
+
+    def _launch_action(self, status, *args, **kwargs):
+        if not status:
+            return
+        scheduler_util = kwargs.get('scheduler')
+        if scheduler_util is None:
+            return
+        request = check_request()
+        if request.registry:
+            handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+            if handler:
+                zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+                context = zmq.Context()
+                socket = context.socket(zmq.REQ)
+                socket.connect('tcp://{0}'.format(handler))
+                zmq_settings = {'zeo': zeo.get_settings(),
+                                'task_name': self.__name__,
+                                'job_id': self.internal_id}
+                logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
+                socket.send_json(['run_task', zmq_settings])
+                socket.recv_json()
+
+    def __call__(self, *args, **kwargs):
+        report = StringIO()
+        self._run(report, **kwargs)
+
+    def is_runnable(self, registry):
+        mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+        if mode is None:
+            return False
+        info = mode.schema(self, None)
+        if info is None:
+            return False
+        return info.active
+
+    def _run(self, report, **kwargs):
+        """Task execution wrapper"""
+        zeo_connection = ZEOConnection()
+        zeo_connection.update(kwargs.get('zeo_settings'))
+        with zeo_connection as root:
+            try:
+                registry = kwargs.get('registry')
+                request = check_request()
+                request.registry = registry
+                application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+                                                         PYAMS_APPLICATION_DEFAULT_NAME)
+                sm = root.get(application_name).getSiteManager()
+                scheduler_util = sm.get(SCHEDULER_NAME)
+                task = scheduler_util.get(self.__name__)
+                if task is not None:
+                    if not (kwargs.get('run_immediate') or task.is_runnable(registry)):
+                        logger.debug("Skipping inactive task {0}".format(task.name))
+                        return
+                    tm = ITransactionManager(task)
+                    for attempt in tm.attempts():
+                        with attempt as t:
+                            start = datetime.utcnow()
+                            try:
+                                registry.notify(BeforeRunJobEvent(task))
+                                task.run(report)
+                                if report.getvalue():
+                                    status = 'OK'
+                                else:
+                                    status = 'Empty'
+                                report.write('\n\nTask duration: {0}'.format(get_duration(start, request=request)))
+                            except:
+                                status = 'Error'
+                                task._log_exception(report,
+                                                    "An error occurred during execution of task '{0}'".format(task.name))
+                            registry.notify(AfterRunJobEvent(task, status))
+                            task.store_report(report, status)
+                            task.send_report(report, status, registry)
+                        if t.status == 'Committed':
+                            break
+            except:
+                self._log_exception(None, "Can't execute scheduled job {0}".format(self.name))
+        ITransactionManager(self).abort()
+
+    def run(self, report):
+        raise NotImplemented("The 'run' method must be implemented by Task subclasses!")
+
+    @staticmethod
+    def _log_report(report, message, add_timestamp=True, level=logging.INFO):
+        if isinstance(message, bytes):
+            message = message.decode()
+        if add_timestamp:
+            message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message)
+        if report is not None:
+            report.write(message + '\n')
+        logger.log(level, message)
+
+    @staticmethod
+    def _log_exception(report, message=None):
+        if isinstance(message, bytes):
+            message = message.decode()
+        message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message or 'An error occurred') + '\n\n'
+        if report is not None:
+            report.write(message)
+            report.write(traceback.format_exc() + '\n')
+        logger.exception(message)
+
+    def store_report(self, report, status):
+        if (status == 'Empty') and not self.keep_empty_reports:
+            return
+        item = TaskHistoryItem(date=tztime(datetime.utcnow()),
+                               status=status,
+                               report=report.getvalue())
+        self.history[item.date.isoformat()] = item
+        self.check_history()
+
+    def send_report(self, report, status, registry):
+        if not self.__parent__.report_mailer:
+            return
+        if ((status == 'Empty') and not self.send_empty_reports) or \
+           ((status == 'OK') and self.report_errors_only):
+            return
+        message_target = self.report_target
+        if status in ('Error', 'Warning'):
+            message_target = self.errors_target or message_target
+        if not message_target:
+            return
+        mailer = registry.queryUtility(IMailer, self.__parent__.report_mailer)
+        if mailer is not None:
+            report_source = self.__parent__.report_source
+            if status == 'Error':
+                subject = "[SCHEDULER ERROR] {0}".format(self.name)
+            else:
+                subject = "[scheduler] {0}".format(self.name)
+            for target in message_target.split(';'):
+                message = Message(subject=subject,
+                                  sender=report_source,
+                                  recipients=(target,),
+                                  body=report.getvalue())
+                mailer.send(message)
+
+
+@subscriber(ObjectModifiedEvent, context_selector=ITask)
+def handle_modified_task(event):
+    """Handle modified task"""
+    for changes in event.descriptions:
+        if (changes.interface == ITaskInfo) and \
+           (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)):
+            event.object.check_history()
+            break
+
+
+@subscriber(ObjectRemovedEvent, context_selector=ITask)
+def handle_removed_task(event):
+    """Handle removed task"""
+    request = check_request()
+    if request.registry:
+        handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+        if handler:
+            task = event.object
+            scheduler_util = query_utility(IScheduler)
+            zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+            context = zmq.Context()
+            socket = context.socket(zmq.REQ)
+            socket.connect('tcp://{0}'.format(handler))
+            zmq_settings = {'zeo': zeo.get_settings(),
+                            'task_name': task.__name__,
+                            'job_id': task.internal_id}
+            logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
+            socket.send_json(['remove_task', zmq_settings])
+            socket.recv_json()