src/pyams_scheduler/task.py
changeset 19 687d616d7dfb
parent 9 b83989cde81d
child 37 1eb0c2098d47
--- a/src/pyams_scheduler/task.py	Thu Aug 27 11:41:00 2015 +0200
+++ b/src/pyams_scheduler/task.py	Thu Aug 27 11:41:56 2015 +0200
@@ -18,6 +18,7 @@
 logger = logging.getLogger('PyAMS (scheduler)')
 
 import traceback
+
 from datetime import datetime, timedelta
 from io import StringIO
 
@@ -35,7 +36,7 @@
 
 # import packages
 import transaction
-import zmq
+
 from apscheduler.triggers.base import BaseTrigger
 from persistent import Persistent
 from pyams_utils.date import get_duration
@@ -44,6 +45,7 @@
 from pyams_utils.timezone import tztime
 from pyams_utils.traversing import get_parent
 from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.socket import zmq_socket, zmq_response
 from pyramid.events import subscriber
 from pyramid_mailer.message import Message
 from zope.container.contained import Contained
@@ -192,15 +194,13 @@
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                context = zmq.Context()
-                socket = context.socket(zmq.REQ)
-                socket.connect('tcp://{0}'.format(handler))
                 zmq_settings = {'zeo': zeo.get_settings(),
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
+                socket = zmq_socket(handler)
                 socket.send_json(['reset_task', zmq_settings])
-                socket.recv_json()
+                zmq_response(socket)
 
     def launch(self):
         scheduler_util = query_utility(IScheduler)
@@ -220,15 +220,13 @@
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                context = zmq.Context()
-                socket = context.socket(zmq.REQ)
-                socket.connect('tcp://{0}'.format(handler))
                 zmq_settings = {'zeo': zeo.get_settings(),
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
+                socket = zmq_socket(handler)
                 socket.send_json(['run_task', zmq_settings])
-                socket.recv_json()
+                zmq_response(socket)
 
     def __call__(self, *args, **kwargs):
         report = StringIO()
@@ -372,12 +370,10 @@
             task = event.object
             scheduler_util = query_utility(IScheduler)
             zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-            context = zmq.Context()
-            socket = context.socket(zmq.REQ)
-            socket.connect('tcp://{0}'.format(handler))
             zmq_settings = {'zeo': zeo.get_settings(),
                             'task_name': task.__name__,
                             'job_id': task.internal_id}
             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
+            socket = zmq_socket(handler)
             socket.send_json(['remove_task', zmq_settings])
-            socket.recv_json()
+            zmq_response(socket)