--- a/src/pyams_scheduler/task.py Sat Mar 14 13:29:14 2015 +0100
+++ b/src/pyams_scheduler/task.py Sat Mar 14 13:29:42 2015 +0100
@@ -9,6 +9,7 @@
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
+from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent
__docformat__ = 'restructuredtext'
@@ -106,6 +107,8 @@
_history_duration = FieldProperty(ITask['history_duration'])
_history_length = FieldProperty(ITask['history_length'])
+ _internal_id = None
+
def __init__(self):
history = self.history = TaskHistoryContainer()
locate(history, self, '++history++')
@@ -125,7 +128,8 @@
mode = get_utility(ITaskSchedulingMode, name=value)
alsoProvides(self, mode.marker_interface)
mode.schema(self).active = False
- self.reset()
+ if self.__parent__ is not None:
+ self.reset()
@property
def history_duration(self):
@@ -148,11 +152,13 @@
@property
def internal_id(self):
- site = get_parent(self, ISite)
- sm = site.getSiteManager()
- intids = sm.queryUtility(IIntIds)
- if intids is not None:
- return intids.register(self)
+ if self._internal_id is None:
+ site = get_parent(self, ISite)
+ sm = site.getSiteManager()
+ intids = sm.queryUtility(IIntIds)
+ if intids is not None:
+ self._internal_id = intids.register(self)
+ return self._internal_id
def get_trigger(self, registry):
mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode)
@@ -170,8 +176,10 @@
scheduler_util = query_utility(IScheduler)
if scheduler_util is not None:
request = check_request()
+ # get task internal ID before transaction ends!!!
transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util,
- 'registry': request.registry})
+ 'registry': request.registry,
+ 'job_id': self.internal_id})
def _reset_action(self, status, *args, **kwargs):
if not status:
@@ -189,7 +197,7 @@
socket.connect('tcp://{0}'.format(handler))
zmq_settings = {'zeo': zeo.get_settings(),
'task_name': self.__name__,
- 'job_id': self.internal_id}
+ 'job_id': kwargs.get('job_id')}
logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
socket.send_json(['reset_task', zmq_settings])
socket.recv_json()
@@ -197,7 +205,9 @@
def launch(self):
scheduler_util = query_utility(IScheduler)
if scheduler_util is not None:
- transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util})
+ # get task internal ID before transaction ends!!!
+ transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util,
+ 'job_id': self.internal_id})
def _launch_action(self, status, *args, **kwargs):
if not status:
@@ -215,7 +225,7 @@
socket.connect('tcp://{0}'.format(handler))
zmq_settings = {'zeo': zeo.get_settings(),
'task_name': self.__name__,
- 'job_id': self.internal_id}
+ 'job_id': kwargs.get('job_id')}
logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
socket.send_json(['run_task', zmq_settings])
socket.recv_json()
@@ -334,7 +344,13 @@
mailer.send(message)
-@subscriber(ObjectModifiedEvent, context_selector=ITask)
+@subscriber(IObjectAddedEvent, context_selector=ITask)
+def handle_new_task(event):
+ """Handle new task"""
+ event.object.reset()
+
+
+@subscriber(IObjectModifiedEvent, context_selector=ITask)
def handle_modified_task(event):
"""Handle modified task"""
for changes in event.descriptions:
@@ -344,7 +360,7 @@
break
-@subscriber(ObjectRemovedEvent, context_selector=ITask)
+@subscriber(IObjectRemovedEvent, context_selector=ITask)
def handle_removed_task(event):
"""Handle removed task"""
request = check_request()