--- a/src/pyams_scheduler/task.py Thu Aug 27 11:41:00 2015 +0200
+++ b/src/pyams_scheduler/task.py Thu Aug 27 11:41:56 2015 +0200
@@ -18,6 +18,7 @@
logger = logging.getLogger('PyAMS (scheduler)')
import traceback
+
from datetime import datetime, timedelta
from io import StringIO
@@ -35,7 +36,7 @@
# import packages
import transaction
-import zmq
+
from apscheduler.triggers.base import BaseTrigger
from persistent import Persistent
from pyams_utils.date import get_duration
@@ -44,6 +45,7 @@
from pyams_utils.timezone import tztime
from pyams_utils.traversing import get_parent
from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.socket import zmq_socket, zmq_response
from pyramid.events import subscriber
from pyramid_mailer.message import Message
from zope.container.contained import Contained
@@ -192,15 +194,13 @@
handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
if handler:
zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- context = zmq.Context()
- socket = context.socket(zmq.REQ)
- socket.connect('tcp://{0}'.format(handler))
zmq_settings = {'zeo': zeo.get_settings(),
'task_name': self.__name__,
'job_id': kwargs.get('job_id')}
logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
+ socket = zmq_socket(handler)
socket.send_json(['reset_task', zmq_settings])
- socket.recv_json()
+ zmq_response(socket)
def launch(self):
scheduler_util = query_utility(IScheduler)
@@ -220,15 +220,13 @@
handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
if handler:
zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- context = zmq.Context()
- socket = context.socket(zmq.REQ)
- socket.connect('tcp://{0}'.format(handler))
zmq_settings = {'zeo': zeo.get_settings(),
'task_name': self.__name__,
'job_id': kwargs.get('job_id')}
logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
+ socket = zmq_socket(handler)
socket.send_json(['run_task', zmq_settings])
- socket.recv_json()
+ zmq_response(socket)
def __call__(self, *args, **kwargs):
report = StringIO()
@@ -372,12 +370,10 @@
task = event.object
scheduler_util = query_utility(IScheduler)
zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
- context = zmq.Context()
- socket = context.socket(zmq.REQ)
- socket.connect('tcp://{0}'.format(handler))
zmq_settings = {'zeo': zeo.get_settings(),
'task_name': task.__name__,
'job_id': task.internal_id}
logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
+ socket = zmq_socket(handler)
socket.send_json(['remove_task', zmq_settings])
- socket.recv_json()
+ zmq_response(socket)