7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
10 # FOR A PARTICULAR PURPOSE. |
10 # FOR A PARTICULAR PURPOSE. |
11 # |
11 # |
|
12 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent |
12 |
13 |
13 __docformat__ = 'restructuredtext' |
14 __docformat__ = 'restructuredtext' |
14 |
15 |
15 |
16 |
16 # import standard library |
17 # import standard library |
104 send_empty_reports = FieldProperty(ITask['send_empty_reports']) |
105 send_empty_reports = FieldProperty(ITask['send_empty_reports']) |
105 keep_empty_reports = FieldProperty(ITask['keep_empty_reports']) |
106 keep_empty_reports = FieldProperty(ITask['keep_empty_reports']) |
106 _history_duration = FieldProperty(ITask['history_duration']) |
107 _history_duration = FieldProperty(ITask['history_duration']) |
107 _history_length = FieldProperty(ITask['history_length']) |
108 _history_length = FieldProperty(ITask['history_length']) |
108 |
109 |
|
110 _internal_id = None |
|
111 |
109 def __init__(self): |
112 def __init__(self): |
110 history = self.history = TaskHistoryContainer() |
113 history = self.history = TaskHistoryContainer() |
111 locate(history, self, '++history++') |
114 locate(history, self, '++history++') |
112 |
115 |
113 @property |
116 @property |
123 self._schedule_mode = value |
126 self._schedule_mode = value |
124 if value: |
127 if value: |
125 mode = get_utility(ITaskSchedulingMode, name=value) |
128 mode = get_utility(ITaskSchedulingMode, name=value) |
126 alsoProvides(self, mode.marker_interface) |
129 alsoProvides(self, mode.marker_interface) |
127 mode.schema(self).active = False |
130 mode.schema(self).active = False |
128 self.reset() |
131 if self.__parent__ is not None: |
|
132 self.reset() |
129 |
133 |
130 @property |
134 @property |
131 def history_duration(self): |
135 def history_duration(self): |
132 return self._history_duration |
136 return self._history_duration |
133 |
137 |
146 def check_history(self): |
150 def check_history(self): |
147 self.history.check_history(self.history_duration, self.history_length) |
151 self.history.check_history(self.history_duration, self.history_length) |
148 |
152 |
149 @property |
153 @property |
150 def internal_id(self): |
154 def internal_id(self): |
151 site = get_parent(self, ISite) |
155 if self._internal_id is None: |
152 sm = site.getSiteManager() |
156 site = get_parent(self, ISite) |
153 intids = sm.queryUtility(IIntIds) |
157 sm = site.getSiteManager() |
154 if intids is not None: |
158 intids = sm.queryUtility(IIntIds) |
155 return intids.register(self) |
159 if intids is not None: |
|
160 self._internal_id = intids.register(self) |
|
161 return self._internal_id |
156 |
162 |
157 def get_trigger(self, registry): |
163 def get_trigger(self, registry): |
158 mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) |
164 mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) |
159 if mode is None: |
165 if mode is None: |
160 return None |
166 return None |
168 |
174 |
169 def reset(self): |
175 def reset(self): |
170 scheduler_util = query_utility(IScheduler) |
176 scheduler_util = query_utility(IScheduler) |
171 if scheduler_util is not None: |
177 if scheduler_util is not None: |
172 request = check_request() |
178 request = check_request() |
|
179 # get task internal ID before transaction ends!!! |
173 transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util, |
180 transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util, |
174 'registry': request.registry}) |
181 'registry': request.registry, |
|
182 'job_id': self.internal_id}) |
175 |
183 |
176 def _reset_action(self, status, *args, **kwargs): |
184 def _reset_action(self, status, *args, **kwargs): |
177 if not status: |
185 if not status: |
178 return |
186 return |
179 scheduler_util = kwargs.get('scheduler') |
187 scheduler_util = kwargs.get('scheduler') |
187 context = zmq.Context() |
195 context = zmq.Context() |
188 socket = context.socket(zmq.REQ) |
196 socket = context.socket(zmq.REQ) |
189 socket.connect('tcp://{0}'.format(handler)) |
197 socket.connect('tcp://{0}'.format(handler)) |
190 zmq_settings = {'zeo': zeo.get_settings(), |
198 zmq_settings = {'zeo': zeo.get_settings(), |
191 'task_name': self.__name__, |
199 'task_name': self.__name__, |
192 'job_id': self.internal_id} |
200 'job_id': kwargs.get('job_id')} |
193 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) |
201 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) |
194 socket.send_json(['reset_task', zmq_settings]) |
202 socket.send_json(['reset_task', zmq_settings]) |
195 socket.recv_json() |
203 socket.recv_json() |
196 |
204 |
197 def launch(self): |
205 def launch(self): |
198 scheduler_util = query_utility(IScheduler) |
206 scheduler_util = query_utility(IScheduler) |
199 if scheduler_util is not None: |
207 if scheduler_util is not None: |
200 transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util}) |
208 # get task internal ID before transaction ends!!! |
|
209 transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util, |
|
210 'job_id': self.internal_id}) |
201 |
211 |
202 def _launch_action(self, status, *args, **kwargs): |
212 def _launch_action(self, status, *args, **kwargs): |
203 if not status: |
213 if not status: |
204 return |
214 return |
205 scheduler_util = kwargs.get('scheduler') |
215 scheduler_util = kwargs.get('scheduler') |
213 context = zmq.Context() |
223 context = zmq.Context() |
214 socket = context.socket(zmq.REQ) |
224 socket = context.socket(zmq.REQ) |
215 socket.connect('tcp://{0}'.format(handler)) |
225 socket.connect('tcp://{0}'.format(handler)) |
216 zmq_settings = {'zeo': zeo.get_settings(), |
226 zmq_settings = {'zeo': zeo.get_settings(), |
217 'task_name': self.__name__, |
227 'task_name': self.__name__, |
218 'job_id': self.internal_id} |
228 'job_id': kwargs.get('job_id')} |
219 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) |
229 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) |
220 socket.send_json(['run_task', zmq_settings]) |
230 socket.send_json(['run_task', zmq_settings]) |
221 socket.recv_json() |
231 socket.recv_json() |
222 |
232 |
223 def __call__(self, *args, **kwargs): |
233 def __call__(self, *args, **kwargs): |
332 recipients=(target,), |
342 recipients=(target,), |
333 body=report.getvalue()) |
343 body=report.getvalue()) |
334 mailer.send(message) |
344 mailer.send(message) |
335 |
345 |
336 |
346 |
337 @subscriber(ObjectModifiedEvent, context_selector=ITask) |
347 @subscriber(IObjectAddedEvent, context_selector=ITask) |
|
348 def handle_new_task(event): |
|
349 """Handle new task""" |
|
350 event.object.reset() |
|
351 |
|
352 |
|
353 @subscriber(IObjectModifiedEvent, context_selector=ITask) |
338 def handle_modified_task(event): |
354 def handle_modified_task(event): |
339 """Handle modified task""" |
355 """Handle modified task""" |
340 for changes in event.descriptions: |
356 for changes in event.descriptions: |
341 if (changes.interface == ITaskInfo) and \ |
357 if (changes.interface == ITaskInfo) and \ |
342 (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)): |
358 (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)): |
343 event.object.check_history() |
359 event.object.check_history() |
344 break |
360 break |
345 |
361 |
346 |
362 |
347 @subscriber(ObjectRemovedEvent, context_selector=ITask) |
363 @subscriber(IObjectRemovedEvent, context_selector=ITask) |
348 def handle_removed_task(event): |
364 def handle_removed_task(event): |
349 """Handle removed task""" |
365 """Handle removed task""" |
350 request = check_request() |
366 request = check_request() |
351 if request.registry: |
367 if request.registry: |
352 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
368 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |