16 # import standard library |
16 # import standard library |
17 import logging |
17 import logging |
18 logger = logging.getLogger('PyAMS (scheduler)') |
18 logger = logging.getLogger('PyAMS (scheduler)') |
19 |
19 |
20 import traceback |
20 import traceback |
|
21 |
21 from datetime import datetime, timedelta |
22 from datetime import datetime, timedelta |
22 from io import StringIO |
23 from io import StringIO |
23 |
24 |
24 # import interfaces |
25 # import interfaces |
25 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \ |
26 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \ |
33 from zope.intid.interfaces import IIntIds |
34 from zope.intid.interfaces import IIntIds |
34 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent |
35 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent |
35 |
36 |
36 # import packages |
37 # import packages |
37 import transaction |
38 import transaction |
38 import zmq |
39 |
39 from apscheduler.triggers.base import BaseTrigger |
40 from apscheduler.triggers.base import BaseTrigger |
40 from persistent import Persistent |
41 from persistent import Persistent |
41 from pyams_utils.date import get_duration |
42 from pyams_utils.date import get_duration |
42 from pyams_utils.registry import query_utility, get_utility |
43 from pyams_utils.registry import query_utility, get_utility |
43 from pyams_utils.request import check_request |
44 from pyams_utils.request import check_request |
44 from pyams_utils.timezone import tztime |
45 from pyams_utils.timezone import tztime |
45 from pyams_utils.traversing import get_parent |
46 from pyams_utils.traversing import get_parent |
46 from pyams_utils.zodb import ZEOConnection |
47 from pyams_utils.zodb import ZEOConnection |
|
48 from pyams_zmq.socket import zmq_socket, zmq_response |
47 from pyramid.events import subscriber |
49 from pyramid.events import subscriber |
48 from pyramid_mailer.message import Message |
50 from pyramid_mailer.message import Message |
49 from zope.container.contained import Contained |
51 from zope.container.contained import Contained |
50 from zope.container.folder import Folder |
52 from zope.container.folder import Folder |
51 from zope.interface import implementer, alsoProvides, noLongerProvides |
53 from zope.interface import implementer, alsoProvides, noLongerProvides |
190 request = check_request() |
192 request = check_request() |
191 if request.registry: |
193 if request.registry: |
192 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
194 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
193 if handler: |
195 if handler: |
194 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
196 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
195 context = zmq.Context() |
|
196 socket = context.socket(zmq.REQ) |
|
197 socket.connect('tcp://{0}'.format(handler)) |
|
198 zmq_settings = {'zeo': zeo.get_settings(), |
197 zmq_settings = {'zeo': zeo.get_settings(), |
199 'task_name': self.__name__, |
198 'task_name': self.__name__, |
200 'job_id': kwargs.get('job_id')} |
199 'job_id': kwargs.get('job_id')} |
201 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) |
200 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) |
|
201 socket = zmq_socket(handler) |
202 socket.send_json(['reset_task', zmq_settings]) |
202 socket.send_json(['reset_task', zmq_settings]) |
203 socket.recv_json() |
203 zmq_response(socket) |
204 |
204 |
205 def launch(self): |
205 def launch(self): |
206 scheduler_util = query_utility(IScheduler) |
206 scheduler_util = query_utility(IScheduler) |
207 if scheduler_util is not None: |
207 if scheduler_util is not None: |
208 # get task internal ID before transaction ends!!! |
208 # get task internal ID before transaction ends!!! |
218 request = check_request() |
218 request = check_request() |
219 if request.registry: |
219 if request.registry: |
220 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
220 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
221 if handler: |
221 if handler: |
222 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
222 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
223 context = zmq.Context() |
|
224 socket = context.socket(zmq.REQ) |
|
225 socket.connect('tcp://{0}'.format(handler)) |
|
226 zmq_settings = {'zeo': zeo.get_settings(), |
223 zmq_settings = {'zeo': zeo.get_settings(), |
227 'task_name': self.__name__, |
224 'task_name': self.__name__, |
228 'job_id': kwargs.get('job_id')} |
225 'job_id': kwargs.get('job_id')} |
229 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) |
226 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) |
|
227 socket = zmq_socket(handler) |
230 socket.send_json(['run_task', zmq_settings]) |
228 socket.send_json(['run_task', zmq_settings]) |
231 socket.recv_json() |
229 zmq_response(socket) |
232 |
230 |
233 def __call__(self, *args, **kwargs): |
231 def __call__(self, *args, **kwargs): |
234 report = StringIO() |
232 report = StringIO() |
235 self._run(report, **kwargs) |
233 self._run(report, **kwargs) |
236 |
234 |
370 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
368 handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) |
371 if handler: |
369 if handler: |
372 task = event.object |
370 task = event.object |
373 scheduler_util = query_utility(IScheduler) |
371 scheduler_util = query_utility(IScheduler) |
374 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
372 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) |
375 context = zmq.Context() |
|
376 socket = context.socket(zmq.REQ) |
|
377 socket.connect('tcp://{0}'.format(handler)) |
|
378 zmq_settings = {'zeo': zeo.get_settings(), |
373 zmq_settings = {'zeo': zeo.get_settings(), |
379 'task_name': task.__name__, |
374 'task_name': task.__name__, |
380 'job_id': task.internal_id} |
375 'job_id': task.internal_id} |
381 logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings)) |
376 logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings)) |
|
377 socket = zmq_socket(handler) |
382 socket.send_json(['remove_task', zmq_settings]) |
378 socket.send_json(['remove_task', zmq_settings]) |
383 socket.recv_json() |
379 zmq_response(socket) |