src/pyams_scheduler/task.py
changeset 7 570db7b9f0f6
parent 0 48483b0b26fa
child 9 b83989cde81d
equal deleted inserted replaced
6:4a1bc9eb89ad 7:570db7b9f0f6
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
       
    12 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent
    12 
    13 
    13 __docformat__ = 'restructuredtext'
    14 __docformat__ = 'restructuredtext'
    14 
    15 
    15 
    16 
    16 # import standard library
    17 # import standard library
   104     send_empty_reports = FieldProperty(ITask['send_empty_reports'])
   105     send_empty_reports = FieldProperty(ITask['send_empty_reports'])
   105     keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
   106     keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
   106     _history_duration = FieldProperty(ITask['history_duration'])
   107     _history_duration = FieldProperty(ITask['history_duration'])
   107     _history_length = FieldProperty(ITask['history_length'])
   108     _history_length = FieldProperty(ITask['history_length'])
   108 
   109 
       
   110     _internal_id = None
       
   111 
   109     def __init__(self):
   112     def __init__(self):
   110         history = self.history = TaskHistoryContainer()
   113         history = self.history = TaskHistoryContainer()
   111         locate(history, self, '++history++')
   114         locate(history, self, '++history++')
   112 
   115 
   113     @property
   116     @property
   123         self._schedule_mode = value
   126         self._schedule_mode = value
   124         if value:
   127         if value:
   125             mode = get_utility(ITaskSchedulingMode, name=value)
   128             mode = get_utility(ITaskSchedulingMode, name=value)
   126             alsoProvides(self, mode.marker_interface)
   129             alsoProvides(self, mode.marker_interface)
   127             mode.schema(self).active = False
   130             mode.schema(self).active = False
   128             self.reset()
   131             if self.__parent__ is not None:
       
   132                 self.reset()
   129 
   133 
   130     @property
   134     @property
   131     def history_duration(self):
   135     def history_duration(self):
   132         return self._history_duration
   136         return self._history_duration
   133 
   137 
   146     def check_history(self):
   150     def check_history(self):
   147         self.history.check_history(self.history_duration, self.history_length)
   151         self.history.check_history(self.history_duration, self.history_length)
   148 
   152 
   149     @property
   153     @property
   150     def internal_id(self):
   154     def internal_id(self):
   151         site = get_parent(self, ISite)
   155         if self._internal_id is None:
   152         sm = site.getSiteManager()
   156             site = get_parent(self, ISite)
   153         intids = sm.queryUtility(IIntIds)
   157             sm = site.getSiteManager()
   154         if intids is not None:
   158             intids = sm.queryUtility(IIntIds)
   155             return intids.register(self)
   159             if intids is not None:
       
   160                 self._internal_id = intids.register(self)
       
   161         return self._internal_id
   156 
   162 
   157     def get_trigger(self, registry):
   163     def get_trigger(self, registry):
   158         mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
   164         mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
   159         if mode is None:
   165         if mode is None:
   160             return None
   166             return None
   168 
   174 
   169     def reset(self):
   175     def reset(self):
   170         scheduler_util = query_utility(IScheduler)
   176         scheduler_util = query_utility(IScheduler)
   171         if scheduler_util is not None:
   177         if scheduler_util is not None:
   172             request = check_request()
   178             request = check_request()
       
   179             # get task internal ID before transaction ends!!!
   173             transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
   180             transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
   174                                                                           'registry': request.registry})
   181                                                                           'registry': request.registry,
       
   182                                                                           'job_id': self.internal_id})
   175 
   183 
   176     def _reset_action(self, status, *args, **kwargs):
   184     def _reset_action(self, status, *args, **kwargs):
   177         if not status:
   185         if not status:
   178             return
   186             return
   179         scheduler_util = kwargs.get('scheduler')
   187         scheduler_util = kwargs.get('scheduler')
   187                 context = zmq.Context()
   195                 context = zmq.Context()
   188                 socket = context.socket(zmq.REQ)
   196                 socket = context.socket(zmq.REQ)
   189                 socket.connect('tcp://{0}'.format(handler))
   197                 socket.connect('tcp://{0}'.format(handler))
   190                 zmq_settings = {'zeo': zeo.get_settings(),
   198                 zmq_settings = {'zeo': zeo.get_settings(),
   191                                 'task_name': self.__name__,
   199                                 'task_name': self.__name__,
   192                                 'job_id': self.internal_id}
   200                                 'job_id': kwargs.get('job_id')}
   193                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
   201                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
   194                 socket.send_json(['reset_task', zmq_settings])
   202                 socket.send_json(['reset_task', zmq_settings])
   195                 socket.recv_json()
   203                 socket.recv_json()
   196 
   204 
   197     def launch(self):
   205     def launch(self):
   198         scheduler_util = query_utility(IScheduler)
   206         scheduler_util = query_utility(IScheduler)
   199         if scheduler_util is not None:
   207         if scheduler_util is not None:
   200             transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util})
   208             # get task internal ID before transaction ends!!!
       
   209             transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util,
       
   210                                                                            'job_id': self.internal_id})
   201 
   211 
   202     def _launch_action(self, status, *args, **kwargs):
   212     def _launch_action(self, status, *args, **kwargs):
   203         if not status:
   213         if not status:
   204             return
   214             return
   205         scheduler_util = kwargs.get('scheduler')
   215         scheduler_util = kwargs.get('scheduler')
   213                 context = zmq.Context()
   223                 context = zmq.Context()
   214                 socket = context.socket(zmq.REQ)
   224                 socket = context.socket(zmq.REQ)
   215                 socket.connect('tcp://{0}'.format(handler))
   225                 socket.connect('tcp://{0}'.format(handler))
   216                 zmq_settings = {'zeo': zeo.get_settings(),
   226                 zmq_settings = {'zeo': zeo.get_settings(),
   217                                 'task_name': self.__name__,
   227                                 'task_name': self.__name__,
   218                                 'job_id': self.internal_id}
   228                                 'job_id': kwargs.get('job_id')}
   219                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
   229                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
   220                 socket.send_json(['run_task', zmq_settings])
   230                 socket.send_json(['run_task', zmq_settings])
   221                 socket.recv_json()
   231                 socket.recv_json()
   222 
   232 
   223     def __call__(self, *args, **kwargs):
   233     def __call__(self, *args, **kwargs):
   332                                   recipients=(target,),
   342                                   recipients=(target,),
   333                                   body=report.getvalue())
   343                                   body=report.getvalue())
   334                 mailer.send(message)
   344                 mailer.send(message)
   335 
   345 
   336 
   346 
   337 @subscriber(ObjectModifiedEvent, context_selector=ITask)
   347 @subscriber(IObjectAddedEvent, context_selector=ITask)
       
   348 def handle_new_task(event):
       
   349     """Handle new task"""
       
   350     event.object.reset()
       
   351 
       
   352 
       
   353 @subscriber(IObjectModifiedEvent, context_selector=ITask)
   338 def handle_modified_task(event):
   354 def handle_modified_task(event):
   339     """Handle modified task"""
   355     """Handle modified task"""
   340     for changes in event.descriptions:
   356     for changes in event.descriptions:
   341         if (changes.interface == ITaskInfo) and \
   357         if (changes.interface == ITaskInfo) and \
   342            (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)):
   358            (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)):
   343             event.object.check_history()
   359             event.object.check_history()
   344             break
   360             break
   345 
   361 
   346 
   362 
   347 @subscriber(ObjectRemovedEvent, context_selector=ITask)
   363 @subscriber(IObjectRemovedEvent, context_selector=ITask)
   348 def handle_removed_task(event):
   364 def handle_removed_task(event):
   349     """Handle removed task"""
   365     """Handle removed task"""
   350     request = check_request()
   366     request = check_request()
   351     if request.registry:
   367     if request.registry:
   352         handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   368         handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)