# HG changeset patch # User Thierry Florac # Date 1426336182 -3600 # Node ID 570db7b9f0f6a836f43b2a6fc68d1d4f29ac2c35 # Parent 4a1bc9eb89ad727a2adbc294f2f2b24c7b49c891 Changed internal ID management diff -r 4a1bc9eb89ad -r 570db7b9f0f6 src/pyams_scheduler/task.py --- a/src/pyams_scheduler/task.py Sat Mar 14 13:29:14 2015 +0100 +++ b/src/pyams_scheduler/task.py Sat Mar 14 13:29:42 2015 +0100 @@ -9,6 +9,7 @@ # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # +from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent __docformat__ = 'restructuredtext' @@ -106,6 +107,8 @@ _history_duration = FieldProperty(ITask['history_duration']) _history_length = FieldProperty(ITask['history_length']) + _internal_id = None + def __init__(self): history = self.history = TaskHistoryContainer() locate(history, self, '++history++') @@ -125,7 +128,8 @@ mode = get_utility(ITaskSchedulingMode, name=value) alsoProvides(self, mode.marker_interface) mode.schema(self).active = False - self.reset() + if self.__parent__ is not None: + self.reset() @property def history_duration(self): @@ -148,11 +152,13 @@ @property def internal_id(self): - site = get_parent(self, ISite) - sm = site.getSiteManager() - intids = sm.queryUtility(IIntIds) - if intids is not None: - return intids.register(self) + if self._internal_id is None: + site = get_parent(self, ISite) + sm = site.getSiteManager() + intids = sm.queryUtility(IIntIds) + if intids is not None: + self._internal_id = intids.register(self) + return self._internal_id def get_trigger(self, registry): mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) @@ -170,8 +176,10 @@ scheduler_util = query_utility(IScheduler) if scheduler_util is not None: request = check_request() + # get task internal ID before transaction ends!!! transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util, - 'registry': request.registry}) + 'registry': request.registry, + 'job_id': self.internal_id}) def _reset_action(self, status, *args, **kwargs): if not status: @@ -189,7 +197,7 @@ socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, - 'job_id': self.internal_id} + 'job_id': kwargs.get('job_id')} logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) socket.send_json(['reset_task', zmq_settings]) socket.recv_json() @@ -197,7 +205,9 @@ def launch(self): scheduler_util = query_utility(IScheduler) if scheduler_util is not None: - transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util}) + # get task internal ID before transaction ends!!! + transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util, + 'job_id': self.internal_id}) def _launch_action(self, status, *args, **kwargs): if not status: @@ -215,7 +225,7 @@ socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, - 'job_id': self.internal_id} + 'job_id': kwargs.get('job_id')} logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) socket.send_json(['run_task', zmq_settings]) socket.recv_json() @@ -334,7 +344,13 @@ mailer.send(message) -@subscriber(ObjectModifiedEvent, context_selector=ITask) +@subscriber(IObjectAddedEvent, context_selector=ITask) +def handle_new_task(event): + """Handle new task""" + event.object.reset() + + +@subscriber(IObjectModifiedEvent, context_selector=ITask) def handle_modified_task(event): """Handle modified task""" for changes in event.descriptions: @@ -344,7 +360,7 @@ break -@subscriber(ObjectRemovedEvent, context_selector=ITask) +@subscriber(IObjectRemovedEvent, context_selector=ITask) def handle_removed_task(event): """Handle removed task""" request = check_request()