|
1 # |
|
2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net> |
|
3 # All Rights Reserved. |
|
4 # |
|
5 # This software is subject to the provisions of the Zope Public License, |
|
6 # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|
10 # FOR A PARTICULAR PURPOSE. |
|
11 # |
|
12 |
|
13 __docformat__ = 'restructuredtext' |
|
14 |
|
15 |
|
16 # import standard library |
|
17 import logging |
|
18 logger = logging.getLogger('PyAMS (scheduler)') |
|
19 |
|
20 from datetime import datetime |
|
21 from threading import Thread |
|
22 |
|
23 # import interfaces |
|
24 from pyams_scheduler.interfaces import SCHEDULER_NAME |
|
25 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME |
|
26 from pyams_utils.interfaces.zeo import IZEOConnection |
|
27 |
|
28 # import packages |
|
29 from apscheduler.jobstores.memory import MemoryJobStore |
|
30 from apscheduler.schedulers.background import BackgroundScheduler |
|
31 from pyams_scheduler.task import ImmediateTaskTrigger |
|
32 from pyams_utils.zodb import ZEOConnection |
|
33 from pyams_zmq.handler import ZMQMessageHandler |
|
34 from pyams_zmq.process import ZMQProcess |
|
35 |
|
36 |
|
37 class BaseTaskThread(Thread): |
|
38 |
|
39 def __init__(self, process, settings): |
|
40 Thread.__init__(self) |
|
41 self.process = process |
|
42 self.settings = settings |
|
43 |
|
44 def _get_connection(self): |
|
45 zeo_settings = self.settings.get('zeo') |
|
46 connection = ZEOConnection() |
|
47 connection.update(zeo_settings) |
|
48 return connection |
|
49 |
|
50 |
|
51 class TaskResettingThread(BaseTaskThread): |
|
52 """Task resetting thread |
|
53 |
|
54 Task reset is run in another thread, so that: |
|
55 - other transactions applied on updated tasks are visible |
|
56 - ØMQ request returns immediately to calling process |
|
57 """ |
|
58 |
|
59 def run(self): |
|
60 logger.debug("Starting task resetting thread...") |
|
61 settings = self.settings |
|
62 job_id = settings.get('job_id') |
|
63 if job_id is None: |
|
64 return |
|
65 job_id = str(job_id) |
|
66 logger.debug("Loading ZEO connection...") |
|
67 with self._get_connection() as root: |
|
68 logger.debug("Loaded ZODB root {0!r}".format(root)) |
|
69 tm = None |
|
70 try: |
|
71 try: |
|
72 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
|
73 PYAMS_APPLICATION_DEFAULT_NAME) |
|
74 application = root.get(application_name) |
|
75 logger.debug("Loaded application {0!r}".format(application)) |
|
76 sm = application.getSiteManager() |
|
77 scheduler_util = sm.get(SCHEDULER_NAME) |
|
78 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
|
79 scheduler = self.process.scheduler |
|
80 logger.debug("Removing job '{0}'".format(job_id)) |
|
81 job = scheduler.get_job(job_id) |
|
82 if job is not None: |
|
83 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
|
84 scheduler.remove_job(job.id) |
|
85 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
|
86 task = scheduler_util.get(settings.get('task_name').lower()) |
|
87 logger.debug("Loaded scheduler task {0!r}".format(task)) |
|
88 if task is not None: |
|
89 trigger = task.get_trigger(self.process.registry) |
|
90 logger.debug("Getting task trigger {0!r}".format(trigger)) |
|
91 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
|
92 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
|
93 scheduler.add_job(task, trigger, |
|
94 id=str(task.internal_id), |
|
95 name=task.name, |
|
96 kwargs={'zeo_settings': zeo_connection.get_settings(), |
|
97 'registry': self.process.registry}) |
|
98 logger.debug("Added job") |
|
99 except: |
|
100 logger.exception("An exception occurred:") |
|
101 finally: |
|
102 if tm is not None: |
|
103 tm.abort() |
|
104 |
|
105 |
|
106 class TaskRemoverThread(BaseTaskThread): |
|
107 """Task remover thread""" |
|
108 |
|
109 def run(self): |
|
110 logger.debug("Starting task remover thread...") |
|
111 settings = self.settings |
|
112 job_id = settings.get('job_id') |
|
113 if job_id is None: |
|
114 return |
|
115 job_id = str(job_id) |
|
116 logger.debug("Loading ZEO connection...") |
|
117 with self._get_connection() as root: |
|
118 logger.debug("Loaded ZODB root {0!r}".format(root)) |
|
119 tm = None |
|
120 try: |
|
121 try: |
|
122 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
|
123 PYAMS_APPLICATION_DEFAULT_NAME) |
|
124 application = root.get(application_name) |
|
125 logger.debug("Loaded application {0!r}".format(application)) |
|
126 sm = application.getSiteManager() |
|
127 scheduler_util = sm.get(SCHEDULER_NAME) |
|
128 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
|
129 scheduler = self.process.scheduler |
|
130 logger.debug("Removing job '{0}'".format(job_id)) |
|
131 job = scheduler.get_job(job_id) |
|
132 if job is not None: |
|
133 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
|
134 scheduler.remove_job(job.id) |
|
135 logger.debug("Removed job") |
|
136 except: |
|
137 logger.exception("An exception occurred:") |
|
138 finally: |
|
139 if tm is not None: |
|
140 tm.abort() |
|
141 |
|
142 |
|
143 class TaskRunnerThread(BaseTaskThread): |
|
144 """Task immediate runner thread""" |
|
145 |
|
146 def run(self): |
|
147 logger.debug("Starting task runner thread...") |
|
148 settings = self.settings |
|
149 job_id = settings.get('job_id') |
|
150 if job_id is None: |
|
151 return |
|
152 logger.debug("Loading ZEO connection...") |
|
153 with self._get_connection() as root: |
|
154 logger.debug("Loaded ZODB root {0!r}".format(root)) |
|
155 tm = None |
|
156 try: |
|
157 try: |
|
158 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
|
159 PYAMS_APPLICATION_DEFAULT_NAME) |
|
160 application = root.get(application_name) |
|
161 logger.debug("Loaded application {0!r}".format(application)) |
|
162 sm = application.getSiteManager() |
|
163 scheduler_util = sm.get(SCHEDULER_NAME) |
|
164 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
|
165 scheduler = self.process.scheduler |
|
166 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
|
167 task = scheduler_util.get(settings.get('task_name').lower()) |
|
168 logger.debug("Loaded scheduler task {0!r}".format(task)) |
|
169 if task is not None: |
|
170 trigger = ImmediateTaskTrigger() |
|
171 logger.debug("Getting task trigger {0!r}".format(trigger)) |
|
172 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
|
173 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
|
174 scheduler.add_job(task, trigger, |
|
175 id='{0.internal_id}::{1}'.format(task, |
|
176 datetime.utcnow().isoformat()), |
|
177 name=task.name, |
|
178 kwargs={'zeo_settings': zeo_connection.get_settings(), |
|
179 'registry': self.process.registry, |
|
180 'run_immediate': True}) |
|
181 logger.debug("Added job") |
|
182 except: |
|
183 logger.exception("An exception occurred:") |
|
184 finally: |
|
185 if tm is not None: |
|
186 tm.abort() |
|
187 |
|
188 |
|
189 class SchedulerHandler(object): |
|
190 """Scheduler handler""" |
|
191 |
|
192 def get_jobs(self, settings): |
|
193 scheduler = self.process.scheduler |
|
194 return [{'id': job.id, |
|
195 'name': job.name, |
|
196 'trigger': '{0!s}'.format(job.trigger), |
|
197 'next_run': job.next_run_time.timestamp()} for job in scheduler.get_jobs()] |
|
198 |
|
199 def reset_task(self, settings): |
|
200 TaskResettingThread(self.process, settings).start() |
|
201 return 'OK' |
|
202 |
|
203 def remove_task(self, settings): |
|
204 TaskRemoverThread(self.process, settings).start() |
|
205 return 'OK' |
|
206 |
|
207 def run_task(self, settings): |
|
208 TaskRunnerThread(self.process, settings).start() |
|
209 return 'OK' |
|
210 |
|
211 |
|
212 class SchedulerMessageHandler(ZMQMessageHandler): |
|
213 """ØMQ scheduler messages handler""" |
|
214 |
|
215 handler = SchedulerHandler |
|
216 |
|
217 |
|
218 class SchedulerProcess(ZMQProcess): |
|
219 """ØMQ tasks scheduler process""" |
|
220 |
|
221 def __init__(self, zmq_address, handler, registry): |
|
222 ZMQProcess.__init__(self, zmq_address, handler) |
|
223 self.registry = registry |
|
224 self.scheduler = BackgroundScheduler() |
|
225 self.jobstore = MemoryJobStore() |
|
226 |
|
227 def run(self): |
|
228 if self.scheduler is not None: |
|
229 self.scheduler.add_jobstore(self.jobstore, 'default') |
|
230 self.scheduler.start() |
|
231 ZMQProcess.run(self) |