Use 'pyams_zmq' package utility functions for sockets to handle connection timeout
authorThierry Florac <thierry.florac@onf.fr>
Thu, 27 Aug 2015 11:41:56 +0200
changeset 19 687d616d7dfb
parent 18 4806161ea41a
child 20 5f69e8334fa3
Use 'pyams_zmq' package utility functions for sockets to handle connection timeout
src/pyams_scheduler/scheduler.py
src/pyams_scheduler/task.py
src/pyams_scheduler/zmi/scheduler.py
--- a/src/pyams_scheduler/scheduler.py	Thu Aug 27 11:41:00 2015 +0200
+++ b/src/pyams_scheduler/scheduler.py	Thu Aug 27 11:41:56 2015 +0200
@@ -9,6 +9,7 @@
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
 #
+from pyams_zmq.socket import zmq_socket, zmq_response
 
 __docformat__ = 'restructuredtext'
 
@@ -72,12 +73,11 @@
         registry = get_current_registry()
         handler = registry.settings.get(SCHEDULER_HANDLER_KEY, False)
         if handler:
-            context = zmq.Context()
-            socket = context.socket(zmq.REQ)
-            socket.connect('tcp://{0}'.format(handler))
-            return socket
+            return zmq_socket(handler)
 
     def get_jobs(self):
         socket = self._get_socket()
+        if socket is None:
+            return [501, "No socket handler defined in configuration file"]
         socket.send_json(['get_jobs', {}])
-        return socket.recv_json()
+        return zmq_response(socket)
--- a/src/pyams_scheduler/task.py	Thu Aug 27 11:41:00 2015 +0200
+++ b/src/pyams_scheduler/task.py	Thu Aug 27 11:41:56 2015 +0200
@@ -18,6 +18,7 @@
 logger = logging.getLogger('PyAMS (scheduler)')
 
 import traceback
+
 from datetime import datetime, timedelta
 from io import StringIO
 
@@ -35,7 +36,7 @@
 
 # import packages
 import transaction
-import zmq
+
 from apscheduler.triggers.base import BaseTrigger
 from persistent import Persistent
 from pyams_utils.date import get_duration
@@ -44,6 +45,7 @@
 from pyams_utils.timezone import tztime
 from pyams_utils.traversing import get_parent
 from pyams_utils.zodb import ZEOConnection
+from pyams_zmq.socket import zmq_socket, zmq_response
 from pyramid.events import subscriber
 from pyramid_mailer.message import Message
 from zope.container.contained import Contained
@@ -192,15 +194,13 @@
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                context = zmq.Context()
-                socket = context.socket(zmq.REQ)
-                socket.connect('tcp://{0}'.format(handler))
                 zmq_settings = {'zeo': zeo.get_settings(),
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
+                socket = zmq_socket(handler)
                 socket.send_json(['reset_task', zmq_settings])
-                socket.recv_json()
+                zmq_response(socket)
 
     def launch(self):
         scheduler_util = query_utility(IScheduler)
@@ -220,15 +220,13 @@
             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
             if handler:
                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-                context = zmq.Context()
-                socket = context.socket(zmq.REQ)
-                socket.connect('tcp://{0}'.format(handler))
                 zmq_settings = {'zeo': zeo.get_settings(),
                                 'task_name': self.__name__,
                                 'job_id': kwargs.get('job_id')}
                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
+                socket = zmq_socket(handler)
                 socket.send_json(['run_task', zmq_settings])
-                socket.recv_json()
+                zmq_response(socket)
 
     def __call__(self, *args, **kwargs):
         report = StringIO()
@@ -372,12 +370,10 @@
             task = event.object
             scheduler_util = query_utility(IScheduler)
             zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
-            context = zmq.Context()
-            socket = context.socket(zmq.REQ)
-            socket.connect('tcp://{0}'.format(handler))
             zmq_settings = {'zeo': zeo.get_settings(),
                             'task_name': task.__name__,
                             'job_id': task.internal_id}
             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
+            socket = zmq_socket(handler)
             socket.send_json(['remove_task', zmq_settings])
-            socket.recv_json()
+            zmq_response(socket)
--- a/src/pyams_scheduler/zmi/scheduler.py	Thu Aug 27 11:41:00 2015 +0200
+++ b/src/pyams_scheduler/zmi/scheduler.py	Thu Aug 27 11:41:56 2015 +0200
@@ -32,7 +32,6 @@
 from zope.component.interfaces import ISite
 
 # import packages
-import zmq
 from pyams_form.form import AJAXEditForm
 from pyams_pagelet.pagelet import pagelet_config
 from pyams_skin.container import ContainerView
@@ -49,6 +48,7 @@
 from pyams_viewlet.viewlet import viewlet_config, Viewlet
 from pyams_zmi.form import AdminDialogEditForm, AdminDialogDisplayForm
 from pyams_zmi.view import AdminView
+from pyams_zmq.socket import zmq_socket, zmq_response
 from pyramid.url import resource_url
 from pyramid.view import view_config
 from z3c.form import field
@@ -79,7 +79,7 @@
     """Scheduler menu"""
 
     label = _("Tasks scheduler")
-    icon_class = 'fa fa-fw fa-clock-o'
+    icon_class = 'fa-clock-o'
     url = '#scheduler-tasks.html'
 
 
@@ -364,11 +364,13 @@
     def values(self):
         handler = self.request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
         if handler:
-            context = zmq.Context()
-            socket = context.socket(zmq.REQ)
-            socket.connect('tcp://{0}'.format(handler))
+            socket = zmq_socket(handler)
             socket.send_json(['get_jobs', {}])
-            return socket.recv_json()
+            status, response = zmq_response(socket)
+            if status == 200:
+                return response
+            else:  # error
+                return ()
         else:
             return ()