|
1 # |
|
2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net> |
|
3 # All Rights Reserved. |
|
4 # |
|
5 # This software is subject to the provisions of the Zope Public License, |
|
6 # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|
10 # FOR A PARTICULAR PURPOSE. |
|
11 # |
|
12 |
|
13 __docformat__ = 'restructuredtext' |
|
14 |
|
15 |
|
16 # import standard library |
|
17 import logging |
|
18 logger = logging.getLogger('PyAMS (scheduler)') |
|
19 |
|
20 import traceback |
|
21 from datetime import datetime, timedelta |
|
22 from io import StringIO |
|
23 |
|
24 # import interfaces |
|
25 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \ |
|
26 SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo |
|
27 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY |
|
28 from pyams_utils.interfaces.zeo import IZEOConnection |
|
29 from pyramid_mailer.interfaces import IMailer |
|
30 from transaction.interfaces import ITransactionManager |
|
31 from zope.component.interfaces import ISite |
|
32 from zope.intid.interfaces import IIntIds |
|
33 |
|
34 # import packages |
|
35 import transaction |
|
36 import zmq |
|
37 from apscheduler.triggers.base import BaseTrigger |
|
38 from persistent import Persistent |
|
39 from pyams_utils.date import get_duration |
|
40 from pyams_utils.registry import query_utility, get_utility |
|
41 from pyams_utils.request import check_request |
|
42 from pyams_utils.timezone import tztime |
|
43 from pyams_utils.traversing import get_parent |
|
44 from pyams_utils.zodb import ZEOConnection |
|
45 from pyramid.events import subscriber |
|
46 from pyramid_mailer.message import Message |
|
47 from zope.container.contained import Contained |
|
48 from zope.container.folder import Folder |
|
49 from zope.interface import implementer, alsoProvides, noLongerProvides |
|
50 from zope.lifecycleevent import ObjectRemovedEvent, ObjectModifiedEvent |
|
51 from zope.location import locate |
|
52 from zope.schema.fieldproperty import FieldProperty |
|
53 |
|
54 |
|
55 class ImmediateTaskTrigger(BaseTrigger): |
|
56 """Immediate-style task scheduler""" |
|
57 |
|
58 def get_next_fire_time(self, previous_fire_time, now): |
|
59 if previous_fire_time: |
|
60 return None |
|
61 else: |
|
62 return now + timedelta(seconds=5) |
|
63 |
|
64 |
|
65 @implementer(ITaskHistory) |
|
66 class TaskHistoryItem(Persistent, Contained): |
|
67 """Task history item""" |
|
68 |
|
69 date = FieldProperty(ITaskHistory['date']) |
|
70 status = FieldProperty(ITaskHistory['status']) |
|
71 report = FieldProperty(ITaskHistory['report']) |
|
72 |
|
73 def __init__(self, **kwargs): |
|
74 for key, value in kwargs.items(): |
|
75 setattr(self, key, value) |
|
76 |
|
77 |
|
78 @implementer(ITaskHistoryContainer) |
|
79 class TaskHistoryContainer(Folder): |
|
80 """Task history container""" |
|
81 |
|
82 def check_history(self, duration, length): |
|
83 now = tztime(datetime.utcnow()) |
|
84 if duration: |
|
85 for key in [k for k in self.keys()]: |
|
86 if (now - self[key].date).days > duration: |
|
87 del self[key] |
|
88 if length and (len(self) > length): |
|
89 keys = sorted(self.keys(), reverse=True)[:length] |
|
90 for key in [k for k in self.keys()]: |
|
91 if key not in keys: |
|
92 del self[key] |
|
93 |
|
94 |
|
95 @implementer(ITask) |
|
96 class Task(Persistent, Contained): |
|
97 """Task definition persistent class""" |
|
98 |
|
99 name = FieldProperty(ITask['name']) |
|
100 _schedule_mode = FieldProperty(ITask['schedule_mode']) |
|
101 report_target = FieldProperty(ITask['report_target']) |
|
102 errors_target = FieldProperty(ITask['errors_target']) |
|
103 report_errors_only = FieldProperty(ITask['report_errors_only']) |
|
104 send_empty_reports = FieldProperty(ITask['send_empty_reports']) |
|
105 keep_empty_reports = FieldProperty(ITask['keep_empty_reports']) |
|
106 _history_duration = FieldProperty(ITask['history_duration']) |
|
107 _history_length = FieldProperty(ITask['history_length']) |
|
108 |
|
109 def __init__(self): |
|
110 history = self.history = TaskHistoryContainer() |
|
111 locate(history, self, '++history++') |
|
112 |
|
113 @property |
|
114 def schedule_mode(self): |
|
115 return self._schedule_mode |
|
116 |
|
117 @schedule_mode.setter |
|
118 def schedule_mode(self, value): |
|
119 if self._schedule_mode is not None: |
|
120 mode = query_utility(ITaskSchedulingMode, name=self._schedule_mode) |
|
121 if (mode is not None) and mode.marker_interface.providedBy(self): |
|
122 noLongerProvides(self, mode.marker_interface) |
|
123 self._schedule_mode = value |
|
124 if value: |
|
125 mode = get_utility(ITaskSchedulingMode, name=value) |
|
126 alsoProvides(self, mode.marker_interface) |
|
127 mode.schema(self).active = False |
|
128 self.reset() |
|
129 |
|
130 @property |
|
131 def history_duration(self): |
|
132 return self._history_duration |
|
133 |
|
134 @history_duration.setter |
|
135 def history_duration(self, value): |
|
136 self._history_duration = value |
|
137 |
|
138 @property |
|
139 def history_length(self): |
|
140 return self._history_length |
|
141 |
|
142 @history_length.setter |
|
143 def history_length(self, value): |
|
144 self._history_length = value |
|
145 |
|
146 def check_history(self): |
|
147 self.history.check_history(self.history_duration, self.history_length) |
|
148 |
|
149 @property |
|
150 def internal_id(self): |
|
151 site = get_parent(self, ISite) |
|
152 sm = site.getSiteManager() |
|
153 intids = sm.queryUtility(IIntIds) |
|
154 if intids is not None: |
|
155 return intids.register(self) |
|
156 |
|
157 def get_trigger(self, registry): |
|
158 mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) |
|
159 if mode is None: |
|
160 return None |
|
161 return mode.get_trigger(self) |
|
162 |
|
163 def get_scheduling_info(self, registry): |
|
164 mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) |
|
165 if mode is None: |
|
166 return None |
|
167 return mode.schema(self, None) |
|
168 |
|
169 def reset(self): |
|
170 scheduler_util = query_utility(IScheduler) |
|
171 if scheduler_util is not None: |
|
172 request = check_request() |
|
173 transaction.get().addAfterCommitHook(self._reset_action, kws={'scheduler': scheduler_util, |
|
174 'registry': request.registry}) |
|
175 |
|
176 def _reset_action(self, status, *args, **kwargs): |
|
177 if not status: |
|
178 return |
|
179 scheduler_util = kwargs.get('scheduler') |
|
180 if scheduler_util is None: |
|
181 return |
|
182 request = check_request() |
|
183 if request.registry: |
|
184 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
|
185 if handler: |
|
186 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
|
187 context = zmq.Context() |
|
188 socket = context.socket(zmq.REQ) |
|
189 socket.connect('tcp://{0}'.format(handler)) |
|
190 zmq_settings = {'zeo': zeo.get_settings(), |
|
191 'task_name': self.__name__, |
|
192 'job_id': self.internal_id} |
|
193 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) |
|
194 socket.send_json(['reset_task', zmq_settings]) |
|
195 socket.recv_json() |
|
196 |
|
197 def launch(self): |
|
198 scheduler_util = query_utility(IScheduler) |
|
199 if scheduler_util is not None: |
|
200 transaction.get().addAfterCommitHook(self._launch_action, kws={'scheduler': scheduler_util}) |
|
201 |
|
202 def _launch_action(self, status, *args, **kwargs): |
|
203 if not status: |
|
204 return |
|
205 scheduler_util = kwargs.get('scheduler') |
|
206 if scheduler_util is None: |
|
207 return |
|
208 request = check_request() |
|
209 if request.registry: |
|
210 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
|
211 if handler: |
|
212 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
|
213 context = zmq.Context() |
|
214 socket = context.socket(zmq.REQ) |
|
215 socket.connect('tcp://{0}'.format(handler)) |
|
216 zmq_settings = {'zeo': zeo.get_settings(), |
|
217 'task_name': self.__name__, |
|
218 'job_id': self.internal_id} |
|
219 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) |
|
220 socket.send_json(['run_task', zmq_settings]) |
|
221 socket.recv_json() |
|
222 |
|
223 def __call__(self, *args, **kwargs): |
|
224 report = StringIO() |
|
225 self._run(report, **kwargs) |
|
226 |
|
227 def is_runnable(self, registry): |
|
228 mode = registry.queryUtility(ITaskSchedulingMode, self.schedule_mode) |
|
229 if mode is None: |
|
230 return False |
|
231 info = mode.schema(self, None) |
|
232 if info is None: |
|
233 return False |
|
234 return info.active |
|
235 |
|
236 def _run(self, report, **kwargs): |
|
237 """Task execution wrapper""" |
|
238 zeo_connection = ZEOConnection() |
|
239 zeo_connection.update(kwargs.get('zeo_settings')) |
|
240 with zeo_connection as root: |
|
241 try: |
|
242 registry = kwargs.get('registry') |
|
243 request = check_request() |
|
244 request.registry = registry |
|
245 application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
|
246 PYAMS_APPLICATION_DEFAULT_NAME) |
|
247 sm = root.get(application_name).getSiteManager() |
|
248 scheduler_util = sm.get(SCHEDULER_NAME) |
|
249 task = scheduler_util.get(self.__name__) |
|
250 if task is not None: |
|
251 if not (kwargs.get('run_immediate') or task.is_runnable(registry)): |
|
252 logger.debug("Skipping inactive task {0}".format(task.name)) |
|
253 return |
|
254 tm = ITransactionManager(task) |
|
255 for attempt in tm.attempts(): |
|
256 with attempt as t: |
|
257 start = datetime.utcnow() |
|
258 try: |
|
259 registry.notify(BeforeRunJobEvent(task)) |
|
260 task.run(report) |
|
261 if report.getvalue(): |
|
262 status = 'OK' |
|
263 else: |
|
264 status = 'Empty' |
|
265 report.write('\n\nTask duration: {0}'.format(get_duration(start, request=request))) |
|
266 except: |
|
267 status = 'Error' |
|
268 task._log_exception(report, |
|
269 "An error occurred during execution of task '{0}'".format(task.name)) |
|
270 registry.notify(AfterRunJobEvent(task, status)) |
|
271 task.store_report(report, status) |
|
272 task.send_report(report, status, registry) |
|
273 if t.status == 'Committed': |
|
274 break |
|
275 except: |
|
276 self._log_exception(None, "Can't execute scheduled job {0}".format(self.name)) |
|
277 ITransactionManager(self).abort() |
|
278 |
|
279 def run(self, report): |
|
280 raise NotImplemented("The 'run' method must be implemented by Task subclasses!") |
|
281 |
|
282 @staticmethod |
|
283 def _log_report(report, message, add_timestamp=True, level=logging.INFO): |
|
284 if isinstance(message, bytes): |
|
285 message = message.decode() |
|
286 if add_timestamp: |
|
287 message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message) |
|
288 if report is not None: |
|
289 report.write(message + '\n') |
|
290 logger.log(level, message) |
|
291 |
|
292 @staticmethod |
|
293 def _log_exception(report, message=None): |
|
294 if isinstance(message, bytes): |
|
295 message = message.decode() |
|
296 message = '{0} - {1}'.format(tztime(datetime.utcnow()).strftime('%c'), message or 'An error occurred') + '\n\n' |
|
297 if report is not None: |
|
298 report.write(message) |
|
299 report.write(traceback.format_exc() + '\n') |
|
300 logger.exception(message) |
|
301 |
|
302 def store_report(self, report, status): |
|
303 if (status == 'Empty') and not self.keep_empty_reports: |
|
304 return |
|
305 item = TaskHistoryItem(date=tztime(datetime.utcnow()), |
|
306 status=status, |
|
307 report=report.getvalue()) |
|
308 self.history[item.date.isoformat()] = item |
|
309 self.check_history() |
|
310 |
|
311 def send_report(self, report, status, registry): |
|
312 if not self.__parent__.report_mailer: |
|
313 return |
|
314 if ((status == 'Empty') and not self.send_empty_reports) or \ |
|
315 ((status == 'OK') and self.report_errors_only): |
|
316 return |
|
317 message_target = self.report_target |
|
318 if status in ('Error', 'Warning'): |
|
319 message_target = self.errors_target or message_target |
|
320 if not message_target: |
|
321 return |
|
322 mailer = registry.queryUtility(IMailer, self.__parent__.report_mailer) |
|
323 if mailer is not None: |
|
324 report_source = self.__parent__.report_source |
|
325 if status == 'Error': |
|
326 subject = "[SCHEDULER ERROR] {0}".format(self.name) |
|
327 else: |
|
328 subject = "[scheduler] {0}".format(self.name) |
|
329 for target in message_target.split(';'): |
|
330 message = Message(subject=subject, |
|
331 sender=report_source, |
|
332 recipients=(target,), |
|
333 body=report.getvalue()) |
|
334 mailer.send(message) |
|
335 |
|
336 |
|
337 @subscriber(ObjectModifiedEvent, context_selector=ITask) |
|
338 def handle_modified_task(event): |
|
339 """Handle modified task""" |
|
340 for changes in event.descriptions: |
|
341 if (changes.interface == ITaskInfo) and \ |
|
342 (('history_duration' in changes.attributes) or ('history_length' in changes.attributes)): |
|
343 event.object.check_history() |
|
344 break |
|
345 |
|
346 |
|
347 @subscriber(ObjectRemovedEvent, context_selector=ITask) |
|
348 def handle_removed_task(event): |
|
349 """Handle removed task""" |
|
350 request = check_request() |
|
351 if request.registry: |
|
352 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
|
353 if handler: |
|
354 task = event.object |
|
355 scheduler_util = query_utility(IScheduler) |
|
356 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
|
357 context = zmq.Context() |
|
358 socket = context.socket(zmq.REQ) |
|
359 socket.connect('tcp://{0}'.format(handler)) |
|
360 zmq_settings = {'zeo': zeo.get_settings(), |
|
361 'task_name': task.__name__, |
|
362 'job_id': task.internal_id} |
|
363 logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings)) |
|
364 socket.send_json(['remove_task', zmq_settings]) |
|
365 socket.recv_json() |