# HG changeset patch # User Thierry Florac # Date 1440668516 -7200 # Node ID 687d616d7dfbf167bac2ea8579ce5bf8bce0e5df # Parent 4806161ea41aad2d1a14e7605a5a4ec4eb1d674c Use 'pyams_zmq' package utility functions for sockets to handle connection timeout diff -r 4806161ea41a -r 687d616d7dfb src/pyams_scheduler/scheduler.py --- a/src/pyams_scheduler/scheduler.py Thu Aug 27 11:41:00 2015 +0200 +++ b/src/pyams_scheduler/scheduler.py Thu Aug 27 11:41:56 2015 +0200 @@ -9,6 +9,7 @@ # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # +from pyams_zmq.socket import zmq_socket, zmq_response __docformat__ = 'restructuredtext' @@ -72,12 +73,11 @@ registry = get_current_registry() handler = registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) - return socket + return zmq_socket(handler) def get_jobs(self): socket = self._get_socket() + if socket is None: + return [501, "No socket handler defined in configuration file"] socket.send_json(['get_jobs', {}]) - return socket.recv_json() + return zmq_response(socket) diff -r 4806161ea41a -r 687d616d7dfb src/pyams_scheduler/task.py --- a/src/pyams_scheduler/task.py Thu Aug 27 11:41:00 2015 +0200 +++ b/src/pyams_scheduler/task.py Thu Aug 27 11:41:56 2015 +0200 @@ -18,6 +18,7 @@ logger = logging.getLogger('PyAMS (scheduler)') import traceback + from datetime import datetime, timedelta from io import StringIO @@ -35,7 +36,7 @@ # import packages import transaction -import zmq + from apscheduler.triggers.base import BaseTrigger from persistent import Persistent from pyams_utils.date import get_duration @@ -44,6 +45,7 @@ from pyams_utils.timezone import tztime from pyams_utils.traversing import get_parent from pyams_utils.zodb import ZEOConnection +from pyams_zmq.socket import zmq_socket, zmq_response from pyramid.events import subscriber from pyramid_mailer.message import Message from zope.container.contained import Contained @@ -192,15 +194,13 @@ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['reset_task', zmq_settings]) - socket.recv_json() + zmq_response(socket) def launch(self): scheduler_util = query_utility(IScheduler) @@ -220,15 +220,13 @@ handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': self.__name__, 'job_id': kwargs.get('job_id')} logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['run_task', zmq_settings]) - socket.recv_json() + zmq_response(socket) def __call__(self, *args, **kwargs): report = StringIO() @@ -372,12 +370,10 @@ task = event.object scheduler_util = query_utility(IScheduler) zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection) - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) zmq_settings = {'zeo': zeo.get_settings(), 'task_name': task.__name__, 'job_id': task.internal_id} logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings)) + socket = zmq_socket(handler) socket.send_json(['remove_task', zmq_settings]) - socket.recv_json() + zmq_response(socket) diff -r 4806161ea41a -r 687d616d7dfb src/pyams_scheduler/zmi/scheduler.py --- a/src/pyams_scheduler/zmi/scheduler.py Thu Aug 27 11:41:00 2015 +0200 +++ b/src/pyams_scheduler/zmi/scheduler.py Thu Aug 27 11:41:56 2015 +0200 @@ -32,7 +32,6 @@ from zope.component.interfaces import ISite # import packages -import zmq from pyams_form.form import AJAXEditForm from pyams_pagelet.pagelet import pagelet_config from pyams_skin.container import ContainerView @@ -49,6 +48,7 @@ from pyams_viewlet.viewlet import viewlet_config, Viewlet from pyams_zmi.form import AdminDialogEditForm, AdminDialogDisplayForm from pyams_zmi.view import AdminView +from pyams_zmq.socket import zmq_socket, zmq_response from pyramid.url import resource_url from pyramid.view import view_config from z3c.form import field @@ -79,7 +79,7 @@ """Scheduler menu""" label = _("Tasks scheduler") - icon_class = 'fa fa-fw fa-clock-o' + icon_class = 'fa-clock-o' url = '#scheduler-tasks.html' @@ -364,11 +364,13 @@ def values(self): handler = self.request.registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect('tcp://{0}'.format(handler)) + socket = zmq_socket(handler) socket.send_json(['get_jobs', {}]) - return socket.recv_json() + status, response = zmq_response(socket) + if status == 200: + return response + else: # error + return () else: return ()