src/pyams_scheduler/task.py
changeset 0 48483b0b26fa
child 7 570db7b9f0f6
equal deleted inserted replaced
-1:000000000000 0:48483b0b26fa
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import logging
       
    18 logger = logging.getLogger('PyAMS (scheduler)')
       
    19 
       
    20 import traceback
       
    21 from datetime import datetime, timedelta
       
    22 from io import StringIO
       
    23 
       
    24 # import interfaces
       
    25 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
       
    26     SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
       
    27 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
       
    28 from pyams_utils.interfaces.zeo import IZEOConnection
       
    29 from pyramid_mailer.interfaces import IMailer
       
    30 from transaction.interfaces import ITransactionManager
       
    31 from zope.component.interfaces import ISite
       
    32 from zope.intid.interfaces import IIntIds
       
    33 
       
    34 # import packages
       
    35 import transaction
       
    36 import zmq
       
    37 from apscheduler.triggers.base import BaseTrigger
       
    38 from persistent import Persistent
       
    39 from pyams_utils.date import get_duration
       
    40 from pyams_utils.registry import query_utility, get_utility
       
    41 from pyams_utils.request import check_request
       
    42 from pyams_utils.timezone import tztime
       
    43 from pyams_utils.traversing import get_parent
       
    44 from pyams_utils.zodb import ZEOConnection
       
    45 from pyramid.events import subscriber
       
    46 from pyramid_mailer.message import Message
       
    47 from zope.container.contained import Contained
       
    48 from zope.container.folder import Folder
       
    49 from zope.interface import implementer, alsoProvides, noLongerProvides
       
    50 from zope.lifecycleevent import ObjectRemovedEvent, ObjectModifiedEvent
       
    51 from zope.location import locate
       
    52 from zope.schema.fieldproperty import FieldProperty
       
    53 
       
    54 
       
    55 class ImmediateTaskTrigger(BaseTrigger):
       
    56     """Immediate-style task scheduler"""
       
    57 
       
    58     def get_next_fire_time(self, previous_fire_time, now):
       
    59         if previous_fire_time:
       
    60             return None
       
    61         else:
       
    62             return now + timedelta(seconds=5)
       
    63 
       
    64 
       
    65 @implementer(ITaskHistory)
       
    66 class TaskHistoryItem(Persistent, Contained):
       
    67     """Task history item"""
       
    68 
       
    69     date = FieldProperty(ITaskHistory['date'])
       
    70     status = FieldProperty(ITaskHistory['status'])
       
    71     report = FieldProperty(ITaskHistory['report'])
       
    72 
       
    73     def __init__(self, **kwargs):
       
    74         for key, value in kwargs.items():
       
    75             setattr(self, key, value)
       
    76 
       
    77 
       
    78 @implementer(ITaskHistoryContainer)
       
    79 class TaskHistoryContainer(Folder):
       
    80     """Task history container"""
       
    81 
       
    82     def check_history(self, duration, length):
       
    83         now = tztime(datetime.utcnow())
       
    84         if duration:
       
    85             for key in [k for k in self.keys()]:
       
    86                 if (now - self[key].date).days > duration:
       
    87                     del self[key]
       
    88         if length and (len(self) > length):
       
    89             keys = sorted(self.keys(), reverse=True)[:length]
       
    90             for key in [k for k in self.keys()]:
       
    91                 if key not in keys:
       
    92                     del self[key]
       
    93 
       
    94 
       
    95 @implementer(ITask)
       
    96 class Task(Persistent, Contained):
       
    97     """Task definition persistent class"""
       
    98 
       
    99     name = FieldProperty(ITask['name'])
       
   100     _schedule_mode = FieldProperty(ITask['schedule_mode'])
       
   101     report_target = FieldProperty(ITask['report_target'])
       
   102     errors_target = FieldProperty(ITask['errors_target'])
       
   103     report_errors_only = FieldProperty(ITask['report_errors_only'])
       
   104     send_empty_reports = FieldProperty(ITask['send_empty_reports'])
       
   105     keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
       
   106     _history_duration = FieldProperty(ITask['history_duration'])
       
   107     _history_length = FieldProperty(ITask['history_length'])
       
   108 
       
   109     def __init__(self):
       
   110         history = self.history = TaskHistoryContainer()
       
   111         locate(history, self, '++history++')
       
   112 
       
   113     @property
       
   114     def schedule_mode(self):
       
   115         return self._schedule_mode
       
   116 
       
   117     @schedule_mode.setter
       
   118     def schedule_mode(self, value):
       
   119         if self._schedule_mode is not None:
       
   120             mode = query_utility(ITaskSchedulingMode, name=self._schedule_mode)
       
   121             if (mode is not None) and mode.marker_interface.providedBy(self):
       
   122                 noLongerProvides(self, mode.marker_interface)
       
   123         self._schedule_mode = value
       
   124         if value:
       
   125             mode = get_utility(ITaskSchedulingMode, name=value)
       
   126             alsoProvides(self, mode.marker_interface)
       
   127             mode.schema(self).active = False
       
   128             self.reset()
       
   129 
       
   130     @property
       
   131     def history_duration(self):
       
   132         return self._history_duration
       
   133 
       
   134     @history_duration.setter
       
   135     def history_duration(self, value):
       
   136         self._history_duration = value
       
   137 
       
   138     @property
       
   139     def history_length(self):
       
   140         return self._history_length
       
   141 
       
   142     @history_length.setter
       
   143     def history_length(self, value):
       
   144         self._history_length = value
       
   145 
       
   146     def check_history(self):
       
   147         self.history.check_history(self.history_duration, self.history_length)
       
   148 
       
   149     @property
       
   150     def internal_id(self):
       
   151         site = get_parent(self, ISite)
       
   152         sm = site.getSiteManager()
       
   153         intids = sm.queryUtility(IIntIds)
       
   154         if intids is not None:
       
   155             return intids.register(self)
       
   156 
       
   157     def get_trigger(self, registry):
       
   158         mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
       
   159         if mode is None:
       
   160             return None
       
   161         return mode.get_trigger(self)
       
   162 
       
   163     def get_scheduling_info(self, registry):
       
   164         mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
       
   165         if mode is None:
       
   166             return None
       
   167         return mode.schema(self, None)
       
   168 
       
   169     def reset(self):
       
   170         scheduler_util = query_utility(IScheduler)
       
   171         if scheduler_util is not None:
       
   172             request = check_request()
       
   173             transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
       
   174                                                                           'registry': request.registry})
       
   175 
       
   176     def _reset_action(self, status, *args, **kwargs):
       
   177         if not status:
       
   178             return
       
   179         scheduler_util = kwargs.get('scheduler')
       
   180         if scheduler_util is None:
       
   181             return
       
   182         request = check_request()
       
   183         if request.registry:
       
   184             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
       
   185             if handler:
       
   186                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
       
   187                 context = zmq.Context()
       
   188                 socket = context.socket(zmq.REQ)
       
   189                 socket.connect('tcp://{0}'.format(handler))
       
   190                 zmq_settings = {'zeo': zeo.get_settings(),
       
   191                                 'task_name': self.__name__,
       
   192                                 'job_id': self.internal_id}
       
   193                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
       
   194                 socket.send_json(['reset_task', zmq_settings])
       
   195                 socket.recv_json()
       
   196 
       
   197     def launch(self):
       
   198         scheduler_util = query_utility(IScheduler)
       
   199         if scheduler_util is not None:
       
   200             transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util})
       
   201 
       
   202     def _launch_action(self, status, *args, **kwargs):
       
   203         if not status:
       
   204             return
       
   205         scheduler_util = kwargs.get('scheduler')
       
   206         if scheduler_util is None:
       
   207             return
       
   208         request = check_request()
       
   209         if request.registry:
       
   210             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
       
   211             if handler:
       
   212                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
       
   213                 context = zmq.Context()
       
   214                 socket = context.socket(zmq.REQ)
       
   215                 socket.connect('tcp://{0}'.format(handler))
       
   216                 zmq_settings = {'zeo': zeo.get_settings(),
       
   217                                 'task_name': self.__name__,
       
   218                                 'job_id': self.internal_id}
       
   219                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
       
   220                 socket.send_json(['run_task', zmq_settings])
       
   221                 socket.recv_json()
       
   222 
       
   223     def __call__(self, *args, **kwargs):
       
   224         report = StringIO()
       
   225         self._run(report, **kwargs)
       
   226 
       
   227     def is_runnable(self, registry):
       
   228         mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
       
   229         if mode is None:
       
   230             return False
       
   231         info = mode.schema(self, None)
       
   232         if info is None:
       
   233             return False
       
   234         return info.active
       
   235 
       
   236     def _run(self, report, **kwargs):
       
   237         """Task execution wrapper"""
       
   238         zeo_connection = ZEOConnection()
       
   239         zeo_connection.update(kwargs.get('zeo_settings'))
       
   240         with zeo_connection as root:
       
   241             try:
       
   242                 registry = kwargs.get('registry')
       
   243                 request = check_request()
       
   244                 request.registry = registry
       
   245                 application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
       
   246                                                          PYAMS_APPLICATION_DEFAULT_NAME)
       
   247                 sm = root.get(application_name).getSiteManager()
       
   248                 scheduler_util = sm.get(SCHEDULER_NAME)
       
   249                 task = scheduler_util.get(self.__name__)
       
   250                 if task is not None:
       
   251                     if not (kwargs.get('run_immediate') or task.is_runnable(registry)):
       
   252                         logger.debug("Skipping inactive task {0}".format(task.name))
       
   253                         return
       
   254                     tm = ITransactionManager(task)
       
   255                     for attempt in tm.attempts():
       
   256                         with attempt as t:
       
   257                             start = datetime.utcnow()
       
   258                             try:
       
   259                                 registry.notify(BeforeRunJobEvent(task))
       
   260                                 task.run(report)
       
   261                                 if report.getvalue():
       
   262                                     status = 'OK'
       
   263                                 else:
       
   264                                     status = 'Empty'
       
   265                                 report.write('\n\nTask duration: {0}'.format(get_duration(start, request=request)))
       
   266                             except:
       
   267                                 status = 'Error'
       
   268                                 task._log_exception(report,
       
   269                                                     "An error occurred during execution of task '{0}'".format(task.name))
       
   270                             registry.notify(AfterRunJobEvent(task, status))
       
   271                             task.store_report(report, status)
       
   272                             task.send_report(report, status, registry)
       
   273                         if t.status == 'Committed':
       
   274                             break
       
   275             except:
       
   276                 self._log_exception(None, "Can't execute scheduled job {0}".format(self.name))
       
   277         ITransactionManager(self).abort()
       
   278 
       
   279     def run(self, report):
       
   280         raise NotImplemented("The 'run' method must be implemented by Task subclasses!")
       
   281 
       
   282     @staticmethod
       
   283     def _log_report(report, message, add_timestamp=True, level=logging.INFO):
       
   284         if isinstance(message, bytes):
       
   285             message = message.decode()
       
   286         if add_timestamp:
       
   287             message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message)
       
   288         if report is not None:
       
   289             report.write(message + '\n')
       
   290         logger.log(level, message)
       
   291 
       
   292     @staticmethod
       
   293     def _log_exception(report, message=None):
       
   294         if isinstance(message, bytes):
       
   295             message = message.decode()
       
   296         message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message or 'An error occurred') + '\n\n'
       
   297         if report is not None:
       
   298             report.write(message)
       
   299             report.write(traceback.format_exc() + '\n')
       
   300         logger.exception(message)
       
   301 
       
   302     def store_report(self, report, status):
       
   303         if (status == 'Empty') and not self.keep_empty_reports:
       
   304             return
       
   305         item = TaskHistoryItem(date=tztime(datetime.utcnow()),
       
   306                                status=status,
       
   307                                report=report.getvalue())
       
   308         self.history[item.date.isoformat()] = item
       
   309         self.check_history()
       
   310 
       
   311     def send_report(self, report, status, registry):
       
   312         if not self.__parent__.report_mailer:
       
   313             return
       
   314         if ((status == 'Empty') and not self.send_empty_reports) or \
       
   315            ((status == 'OK') and self.report_errors_only):
       
   316             return
       
   317         message_target = self.report_target
       
   318         if status in ('Error', 'Warning'):
       
   319             message_target = self.errors_target or message_target
       
   320         if not message_target:
       
   321             return
       
   322         mailer = registry.queryUtility(IMailer, self.__parent__.report_mailer)
       
   323         if mailer is not None:
       
   324             report_source = self.__parent__.report_source
       
   325             if status == 'Error':
       
   326                 subject = "[SCHEDULER ERROR] {0}".format(self.name)
       
   327             else:
       
   328                 subject = "[scheduler] {0}".format(self.name)
       
   329             for target in message_target.split(';'):
       
   330                 message = Message(subject=subject,
       
   331                                   sender=report_source,
       
   332                                   recipients=(target,),
       
   333                                   body=report.getvalue())
       
   334                 mailer.send(message)
       
   335 
       
   336 
       
   337 @subscriber(ObjectModifiedEvent, context_selector=ITask)
       
   338 def handle_modified_task(event):
       
   339     """Handle modified task"""
       
   340     for changes in event.descriptions:
       
   341         if (changes.interface == ITaskInfo) and \
       
   342            (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)):
       
   343             event.object.check_history()
       
   344             break
       
   345 
       
   346 
       
   347 @subscriber(ObjectRemovedEvent, context_selector=ITask)
       
   348 def handle_removed_task(event):
       
   349     """Handle removed task"""
       
   350     request = check_request()
       
   351     if request.registry:
       
   352         handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
       
   353         if handler:
       
   354             task = event.object
       
   355             scheduler_util = query_utility(IScheduler)
       
   356             zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
       
   357             context = zmq.Context()
       
   358             socket = context.socket(zmq.REQ)
       
   359             socket.connect('tcp://{0}'.format(handler))
       
   360             zmq_settings = {'zeo': zeo.get_settings(),
       
   361                             'task_name': task.__name__,
       
   362                             'job_id': task.internal_id}
       
   363             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
       
   364             socket.send_json(['remove_task', zmq_settings])
       
   365             socket.recv_json()