64 return |
64 return |
65 job_id = str(job_id) |
65 job_id = str(job_id) |
66 logger.debug("Loading ZEO connection...") |
66 logger.debug("Loading ZEO connection...") |
67 with self._get_connection() as root: |
67 with self._get_connection() as root: |
68 logger.debug("Loaded ZODB root {0!r}".format(root)) |
68 logger.debug("Loaded ZODB root {0!r}".format(root)) |
69 tm = None |
|
70 try: |
69 try: |
71 try: |
70 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
72 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
71 PYAMS_APPLICATION_DEFAULT_NAME) |
73 PYAMS_APPLICATION_DEFAULT_NAME) |
72 application = root.get(application_name) |
74 application = root.get(application_name) |
73 logger.debug("Loaded application {0!r}".format(application)) |
75 logger.debug("Loaded application {0!r}".format(application)) |
74 sm = application.getSiteManager() |
76 sm = application.getSiteManager() |
75 scheduler_util = sm.get(SCHEDULER_NAME) |
77 scheduler_util = sm.get(SCHEDULER_NAME) |
76 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
78 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
77 scheduler = self.process.scheduler |
79 scheduler = self.process.scheduler |
78 logger.debug("Removing job '{0}'".format(job_id)) |
80 logger.debug("Removing job '{0}'".format(job_id)) |
79 job = scheduler.get_job(job_id) |
81 job = scheduler.get_job(job_id) |
80 if job is not None: |
82 if job is not None: |
81 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
83 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
82 scheduler.remove_job(job.id) |
84 scheduler.remove_job(job.id) |
83 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
85 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
84 task = scheduler_util.get(settings.get('task_name').lower()) |
86 task = scheduler_util.get(settings.get('task_name').lower()) |
85 logger.debug("Loaded scheduler task {0!r}".format(task)) |
87 logger.debug("Loaded scheduler task {0!r}".format(task)) |
86 if task is not None: |
88 if task is not None: |
87 trigger = task.get_trigger(self.process.registry) |
89 trigger = task.get_trigger(self.process.registry) |
88 logger.debug("Getting task trigger {0!r}".format(trigger)) |
90 logger.debug("Getting task trigger {0!r}".format(trigger)) |
89 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
91 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
90 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
92 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
91 scheduler.add_job(task, trigger, |
93 scheduler.add_job(task, trigger, |
92 id=str(task.internal_id), |
94 id=str(task.internal_id), |
93 name=task.name, |
95 name=task.name, |
94 kwargs={'zeo_settings': zeo_connection.get_settings(), |
96 kwargs={'zeo_settings': zeo_connection.get_settings(), |
95 'registry': self.process.registry}) |
97 'registry': self.process.registry}) |
96 logger.debug("Added job") |
98 logger.debug("Added job") |
97 except: |
99 except: |
98 logger.exception("An exception occurred:") |
100 logger.exception("An exception occurred:") |
|
101 finally: |
|
102 if tm is not None: |
|
103 tm.abort() |
|
104 |
99 |
105 |
100 |
106 class TaskRemoverThread(BaseTaskThread): |
101 class TaskRemoverThread(BaseTaskThread): |
107 """Task remover thread""" |
102 """Task remover thread""" |
108 |
103 |
114 return |
109 return |
115 job_id = str(job_id) |
110 job_id = str(job_id) |
116 logger.debug("Loading ZEO connection...") |
111 logger.debug("Loading ZEO connection...") |
117 with self._get_connection() as root: |
112 with self._get_connection() as root: |
118 logger.debug("Loaded ZODB root {0!r}".format(root)) |
113 logger.debug("Loaded ZODB root {0!r}".format(root)) |
119 tm = None |
|
120 try: |
114 try: |
121 try: |
115 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
122 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
116 PYAMS_APPLICATION_DEFAULT_NAME) |
123 PYAMS_APPLICATION_DEFAULT_NAME) |
117 application = root.get(application_name) |
124 application = root.get(application_name) |
118 logger.debug("Loaded application {0!r}".format(application)) |
125 logger.debug("Loaded application {0!r}".format(application)) |
119 sm = application.getSiteManager() |
126 sm = application.getSiteManager() |
120 scheduler_util = sm.get(SCHEDULER_NAME) |
127 scheduler_util = sm.get(SCHEDULER_NAME) |
121 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
128 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
122 scheduler = self.process.scheduler |
129 scheduler = self.process.scheduler |
123 logger.debug("Removing job '{0}'".format(job_id)) |
130 logger.debug("Removing job '{0}'".format(job_id)) |
124 job = scheduler.get_job(job_id) |
131 job = scheduler.get_job(job_id) |
125 if job is not None: |
132 if job is not None: |
126 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
133 logger.debug("Loaded job {0!r} ({0.id!r})".format(job)) |
127 scheduler.remove_job(job.id) |
134 scheduler.remove_job(job.id) |
128 logger.debug("Removed job") |
135 logger.debug("Removed job") |
129 except: |
136 except: |
130 logger.exception("An exception occurred:") |
137 logger.exception("An exception occurred:") |
|
138 finally: |
|
139 if tm is not None: |
|
140 tm.abort() |
|
141 |
131 |
142 |
132 |
143 class TaskRunnerThread(BaseTaskThread): |
133 class TaskRunnerThread(BaseTaskThread): |
144 """Task immediate runner thread""" |
134 """Task immediate runner thread""" |
145 |
135 |
150 if job_id is None: |
140 if job_id is None: |
151 return |
141 return |
152 logger.debug("Loading ZEO connection...") |
142 logger.debug("Loading ZEO connection...") |
153 with self._get_connection() as root: |
143 with self._get_connection() as root: |
154 logger.debug("Loaded ZODB root {0!r}".format(root)) |
144 logger.debug("Loaded ZODB root {0!r}".format(root)) |
155 tm = None |
|
156 try: |
145 try: |
157 try: |
146 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
158 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY, |
147 PYAMS_APPLICATION_DEFAULT_NAME) |
159 PYAMS_APPLICATION_DEFAULT_NAME) |
148 application = root.get(application_name) |
160 application = root.get(application_name) |
149 logger.debug("Loaded application {0!r}".format(application)) |
161 logger.debug("Loaded application {0!r}".format(application)) |
150 sm = application.getSiteManager() |
162 sm = application.getSiteManager() |
151 scheduler_util = sm.get(SCHEDULER_NAME) |
163 scheduler_util = sm.get(SCHEDULER_NAME) |
152 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
164 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util)) |
153 scheduler = self.process.scheduler |
165 scheduler = self.process.scheduler |
154 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
166 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower())) |
155 task = scheduler_util.get(settings.get('task_name').lower()) |
167 task = scheduler_util.get(settings.get('task_name').lower()) |
156 logger.debug("Loaded scheduler task {0!r}".format(task)) |
168 logger.debug("Loaded scheduler task {0!r}".format(task)) |
157 if task is not None: |
169 if task is not None: |
158 trigger = ImmediateTaskTrigger() |
170 trigger = ImmediateTaskTrigger() |
159 logger.debug("Getting task trigger {0!r}".format(trigger)) |
171 logger.debug("Getting task trigger {0!r}".format(trigger)) |
160 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
172 zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection) |
161 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
173 logger.debug("Adding new job to scheduler {0!r}".format(scheduler)) |
162 scheduler.add_job(task, trigger, |
174 scheduler.add_job(task, trigger, |
163 id='{0.internal_id}::{1}'.format(task, |
175 id='{0.internal_id}::{1}'.format(task, |
164 datetime.utcnow().isoformat()), |
176 datetime.utcnow().isoformat()), |
165 name=task.name, |
177 name=task.name, |
166 kwargs={'zeo_settings': zeo_connection.get_settings(), |
178 kwargs={'zeo_settings': zeo_connection.get_settings(), |
167 'registry': self.process.registry, |
179 'registry': self.process.registry, |
168 'run_immediate': True}) |
180 'run_immediate': True}) |
169 logger.debug("Added job") |
181 logger.debug("Added job") |
170 except: |
182 except: |
171 logger.exception("An exception occurred:") |
183 logger.exception("An exception occurred:") |
|
184 finally: |
|
185 if tm is not None: |
|
186 tm.abort() |
|
187 |
172 |
188 |
173 |
189 class SchedulerHandler(object): |
174 class SchedulerHandler(object): |
190 """Scheduler handler""" |
175 """Scheduler handler""" |
191 |
176 |