--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_scheduler/task.py Wed Mar 11 11:52:59 2015 +0100
@@ -0,0 +1,365 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import logging
+logger = logging.getLogger('PyAMS (scheduler)')
+
+import traceback
+from datetime import datetime, timedelta
+from io import StringIO
+
+# import interfaces
+from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
+ SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
+from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
+from pyams_utils.interfaces.zeo import IZEOConnection
+from pyramid_mailer.interfaces import IMailer
+from transaction.interfaces import ITransactionManager
+from zope.component.interfaces import ISite
+from zope.intid.interfaces import IIntIds
+
+# import packages
+import transaction
+import zmq
+from apscheduler.triggers.base import BaseTrigger
+from persistent import Persistent
+from pyams_utils.date import get_duration
+from pyams_utils.registry import query_utility, get_utility
+from pyams_utils.request import check_request
+from pyams_utils.timezone import tztime
+from pyams_utils.traversing import get_parent
+from pyams_utils.zodb import ZEOConnection
+from pyramid.events import subscriber
+from pyramid_mailer.message import Message
+from zope.container.contained import Contained
+from zope.container.folder import Folder
+from zope.interface import implementer, alsoProvides, noLongerProvides
+from zope.lifecycleevent import ObjectRemovedEvent, ObjectModifiedEvent
+from zope.location import locate
+from zope.schema.fieldproperty import FieldProperty
+
+
+class ImmediateTaskTrigger(BaseTrigger):
+ """Immediate-style task scheduler"""
+
+ def get_next_fire_time(self, previous_fire_time, now):
+ if previous_fire_time:
+ return None
+ else:
+ return now + timedelta(seconds=5)
+
+
+@implementer(ITaskHistory)
+class TaskHistoryItem(Persistent, Contained):
+ """Task history item"""
+
+ date = FieldProperty(ITaskHistory['date'])
+ status = FieldProperty(ITaskHistory['status'])
+ report = FieldProperty(ITaskHistory['report'])
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ setattr(self, key, value)
+
+
+@implementer(ITaskHistoryContainer)
+class TaskHistoryContainer(Folder):
+ """Task history container"""
+
+ def check_history(self, duration, length):
+ now = tztime(datetime.utcnow())
+ if duration:
+ for key in [k for k in self.keys()]:
+ if (now - self[key].date).days > duration:
+ del self[key]
+ if length and (len(self) > length):
+ keys = sorted(self.keys(), reverse=True)[:length]
+ for key in [k for k in self.keys()]:
+ if key not in keys:
+ del self[key]
+
+
+@implementer(ITask)
+class Task(Persistent, Contained):
+ """Task definition persistent class"""
+
+ name = FieldProperty(ITask['name'])
+ _schedule_mode = FieldProperty(ITask['schedule_mode'])
+ report_target = FieldProperty(ITask['report_target'])
+ errors_target = FieldProperty(ITask['errors_target'])
+ report_errors_only = FieldProperty(ITask['report_errors_only'])
+ send_empty_reports = FieldProperty(ITask['send_empty_reports'])
+ keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
+ _history_duration = FieldProperty(ITask['history_duration'])
+ _history_length = FieldProperty(ITask['history_length'])
+
+ def __init__(self):
+ history = self.history = TaskHistoryContainer()
+ locate(history, self, '++history++')
+
+ @property
+ def schedule_mode(self):
+ return self._schedule_mode
+
+ @schedule_mode.setter
+ def schedule_mode(self, value):
+ if self._schedule_mode is not None:
+ mode = query_utility(ITaskSchedulingMode, name=self._schedule_mode)
+ if (mode is not None) and mode.marker_interface.providedBy(self):
+ noLongerProvides(self, mode.marker_interface)
+ self._schedule_mode = value
+ if value:
+ mode = get_utility(ITaskSchedulingMode, name=value)
+ alsoProvides(self, mode.marker_interface)
+ mode.schema(self).active = False
+ self.reset()
+
+ @property
+ def history_duration(self):
+ return self._history_duration
+
+ @history_duration.setter
+ def history_duration(self, value):
+ self._history_duration = value
+
+ @property
+ def history_length(self):
+ return self._history_length
+
+ @history_length.setter
+ def history_length(self, value):
+ self._history_length = value
+
+ def check_history(self):
+ self.history.check_history(self.history_duration, self.history_length)
+
+ @property
+ def internal_id(self):
+ site = get_parent(self, ISite)
+ sm = site.getSiteManager()
+ intids = sm.queryUtility(IIntIds)
+ if intids is not None:
+ return intids.register(self)
+
+ def get_trigger(self, registry):
+ mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+ if mode is None:
+ return None
+ return mode.get_trigger(self)
+
+ def get_scheduling_info(self, registry):
+ mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+ if mode is None:
+ return None
+ return mode.schema(self, None)
+
+ def reset(self):
+ scheduler_util = query_utility(IScheduler)
+ if scheduler_util is not None:
+ request = check_request()
+ transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
+ 'registry': request.registry})
+
+ def _reset_action(self, status, *args, **kwargs):
+ if not status:
+ return
+ scheduler_util = kwargs.get('scheduler')
+ if scheduler_util is None:
+ return
+ request = check_request()
+ if request.registry:
+ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+ if handler:
+ zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+ context = zmq.Context()
+ socket = context.socket(zmq.REQ)
+ socket.connect('tcp://{0}'.format(handler))
+ zmq_settings = {'zeo': zeo.get_settings(),
+ 'task_name': self.__name__,
+ 'job_id': self.internal_id}
+ logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
+ socket.send_json(['reset_task', zmq_settings])
+ socket.recv_json()
+
+ def launch(self):
+ scheduler_util = query_utility(IScheduler)
+ if scheduler_util is not None:
+ transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util})
+
+ def _launch_action(self, status, *args, **kwargs):
+ if not status:
+ return
+ scheduler_util = kwargs.get('scheduler')
+ if scheduler_util is None:
+ return
+ request = check_request()
+ if request.registry:
+ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+ if handler:
+ zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+ context = zmq.Context()
+ socket = context.socket(zmq.REQ)
+ socket.connect('tcp://{0}'.format(handler))
+ zmq_settings = {'zeo': zeo.get_settings(),
+ 'task_name': self.__name__,
+ 'job_id': self.internal_id}
+ logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
+ socket.send_json(['run_task', zmq_settings])
+ socket.recv_json()
+
+ def __call__(self, *args, **kwargs):
+ report = StringIO()
+ self._run(report, **kwargs)
+
+ def is_runnable(self, registry):
+ mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
+ if mode is None:
+ return False
+ info = mode.schema(self, None)
+ if info is None:
+ return False
+ return info.active
+
+ def _run(self, report, **kwargs):
+ """Task execution wrapper"""
+ zeo_connection = ZEOConnection()
+ zeo_connection.update(kwargs.get('zeo_settings'))
+ with zeo_connection as root:
+ try:
+ registry = kwargs.get('registry')
+ request = check_request()
+ request.registry = registry
+ application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
+ PYAMS_APPLICATION_DEFAULT_NAME)
+ sm = root.get(application_name).getSiteManager()
+ scheduler_util = sm.get(SCHEDULER_NAME)
+ task = scheduler_util.get(self.__name__)
+ if task is not None:
+ if not (kwargs.get('run_immediate') or task.is_runnable(registry)):
+ logger.debug("Skipping inactive task {0}".format(task.name))
+ return
+ tm = ITransactionManager(task)
+ for attempt in tm.attempts():
+ with attempt as t:
+ start = datetime.utcnow()
+ try:
+ registry.notify(BeforeRunJobEvent(task))
+ task.run(report)
+ if report.getvalue():
+ status = 'OK'
+ else:
+ status = 'Empty'
+ report.write('\n\nTask duration: {0}'.format(get_duration(start, request=request)))
+ except:
+ status = 'Error'
+ task._log_exception(report,
+ "An error occurred during execution of task '{0}'".format(task.name))
+ registry.notify(AfterRunJobEvent(task, status))
+ task.store_report(report, status)
+ task.send_report(report, status, registry)
+ if t.status == 'Committed':
+ break
+ except:
+ self._log_exception(None, "Can't execute scheduled job {0}".format(self.name))
+ ITransactionManager(self).abort()
+
+ def run(self, report):
+ raise NotImplemented("The 'run' method must be implemented by Task subclasses!")
+
+ @staticmethod
+ def _log_report(report, message, add_timestamp=True, level=logging.INFO):
+ if isinstance(message, bytes):
+ message = message.decode()
+ if add_timestamp:
+ message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message)
+ if report is not None:
+ report.write(message + '\n')
+ logger.log(level, message)
+
+ @staticmethod
+ def _log_exception(report, message=None):
+ if isinstance(message, bytes):
+ message = message.decode()
+ message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message or 'An error occurred') + '\n\n'
+ if report is not None:
+ report.write(message)
+ report.write(traceback.format_exc() + '\n')
+ logger.exception(message)
+
+ def store_report(self, report, status):
+ if (status == 'Empty') and not self.keep_empty_reports:
+ return
+ item = TaskHistoryItem(date=tztime(datetime.utcnow()),
+ status=status,
+ report=report.getvalue())
+ self.history[item.date.isoformat()] = item
+ self.check_history()
+
+ def send_report(self, report, status, registry):
+ if not self.__parent__.report_mailer:
+ return
+ if ((status == 'Empty') and not self.send_empty_reports) or \
+ ((status == 'OK') and self.report_errors_only):
+ return
+ message_target = self.report_target
+ if status in ('Error', 'Warning'):
+ message_target = self.errors_target or message_target
+ if not message_target:
+ return
+ mailer = registry.queryUtility(IMailer, self.__parent__.report_mailer)
+ if mailer is not None:
+ report_source = self.__parent__.report_source
+ if status == 'Error':
+ subject = "[SCHEDULER ERROR] {0}".format(self.name)
+ else:
+ subject = "[scheduler] {0}".format(self.name)
+ for target in message_target.split(';'):
+ message = Message(subject=subject,
+ sender=report_source,
+ recipients=(target,),
+ body=report.getvalue())
+ mailer.send(message)
+
+
+@subscriber(ObjectModifiedEvent, context_selector=ITask)
+def handle_modified_task(event):
+ """Handle modified task"""
+ for changes in event.descriptions:
+ if (changes.interface == ITaskInfo) and \
+ (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)):
+ event.object.check_history()
+ break
+
+
+@subscriber(ObjectRemovedEvent, context_selector=ITask)
+def handle_removed_task(event):
+ """Handle removed task"""
+ request = check_request()
+ if request.registry:
+ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
+ if handler:
+ task = event.object
+ scheduler_util = query_utility(IScheduler)
+ zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
+ context = zmq.Context()
+ socket = context.socket(zmq.REQ)
+ socket.connect('tcp://{0}'.format(handler))
+ zmq_settings = {'zeo': zeo.get_settings(),
+ 'task_name': task.__name__,
+ 'job_id': task.internal_id}
+ logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
+ socket.send_json(['remove_task', zmq_settings])
+ socket.recv_json()