diff -r 4806161ea41a -r 687d616d7dfb src/pyams_scheduler/task.py --- a/src/pyams_scheduler/task.py Thu Aug 27 11:41:00 2015 +0200 +++ b/src/pyams_scheduler/task.py Thu Aug 27 11:41:56 2015 +0200 @@ -18,6 +18,7 @@ logger = logging.getLogger('PyAMS (scheduler)') import traceback + from datetime import datetime, timedelta from io import StringIO @@ -35,7 +36,7 @@ # import packages import transaction -import zmq + from apscheduler.triggers.base import BaseTrigger from persistent import Persistent from pyams_utils.date import get_duration @@ -44,6 +45,7 @@ from pyams_utils.timezone import tztime from pyams_utils.traversing import get_parent from pyams_utils.zodb import ZEOConnection +from pyams_zmq.socket import zmq_socket, zmq_response from pyramid.events import subscriber from pyramid_mailer.message import Message from zope.container.contained import Contained @@ -192,15 +194,13 @@ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['reset_task', zmq_settings]) - socket.recv_json() + zmq_response(socket) def launch(self): scheduler_util = query_utility(IScheduler) @@ -220,15 +220,13 @@ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['run_task', zmq_settings]) - socket.recv_json() + zmq_response(socket) def __call__(self, *args, **kwargs): report = StringIO() @@ -372,12 +370,10 @@ task = event.object scheduler_util = query_utility(IScheduler) zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': task.__name__, 'job_id': task.internal_id} logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['remove_task', zmq_settings]) - socket.recv_json() + zmq_response(socket)