src/pyams_scheduler/task.py
changeset 19 687d616d7dfb
parent 9 b83989cde81d
child 37 1eb0c2098d47
equal deleted inserted replaced
18:4806161ea41a 19:687d616d7dfb
    16 # import standard library
    16 # import standard library
    17 import logging
    17 import logging
    18 logger = logging.getLogger('PyAMS (scheduler)')
    18 logger = logging.getLogger('PyAMS (scheduler)')
    19 
    19 
    20 import traceback
    20 import traceback
       
    21 
    21 from datetime import datetime, timedelta
    22 from datetime import datetime, timedelta
    22 from io import StringIO
    23 from io import StringIO
    23 
    24 
    24 # import interfaces
    25 # import interfaces
    25 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
    26 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
    33 from zope.intid.interfaces import IIntIds
    34 from zope.intid.interfaces import IIntIds
    34 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent
    35 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent
    35 
    36 
    36 # import packages
    37 # import packages
    37 import transaction
    38 import transaction
    38 import zmq
    39 
    39 from apscheduler.triggers.base import BaseTrigger
    40 from apscheduler.triggers.base import BaseTrigger
    40 from persistent import Persistent
    41 from persistent import Persistent
    41 from pyams_utils.date import get_duration
    42 from pyams_utils.date import get_duration
    42 from pyams_utils.registry import query_utility, get_utility
    43 from pyams_utils.registry import query_utility, get_utility
    43 from pyams_utils.request import check_request
    44 from pyams_utils.request import check_request
    44 from pyams_utils.timezone import tztime
    45 from pyams_utils.timezone import tztime
    45 from pyams_utils.traversing import get_parent
    46 from pyams_utils.traversing import get_parent
    46 from pyams_utils.zodb import ZEOConnection
    47 from pyams_utils.zodb import ZEOConnection
       
    48 from pyams_zmq.socket import zmq_socket, zmq_response
    47 from pyramid.events import subscriber
    49 from pyramid.events import subscriber
    48 from pyramid_mailer.message import Message
    50 from pyramid_mailer.message import Message
    49 from zope.container.contained import Contained
    51 from zope.container.contained import Contained
    50 from zope.container.folder import Folder
    52 from zope.container.folder import Folder
    51 from zope.interface import implementer, alsoProvides, noLongerProvides
    53 from zope.interface import implementer, alsoProvides, noLongerProvides
   190         request = check_request()
   192         request = check_request()
   191         if request.registry:
   193         if request.registry:
   192             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   194             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   193             if handler:
   195             if handler:
   194                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   196                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   195                 context = zmq.Context()
       
   196                 socket = context.socket(zmq.REQ)
       
   197                 socket.connect('tcp://{0}'.format(handler))
       
   198                 zmq_settings = {'zeo': zeo.get_settings(),
   197                 zmq_settings = {'zeo': zeo.get_settings(),
   199                                 'task_name': self.__name__,
   198                                 'task_name': self.__name__,
   200                                 'job_id': kwargs.get('job_id')}
   199                                 'job_id': kwargs.get('job_id')}
   201                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
   200                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
       
   201                 socket = zmq_socket(handler)
   202                 socket.send_json(['reset_task', zmq_settings])
   202                 socket.send_json(['reset_task', zmq_settings])
   203                 socket.recv_json()
   203                 zmq_response(socket)
   204 
   204 
   205     def launch(self):
   205     def launch(self):
   206         scheduler_util = query_utility(IScheduler)
   206         scheduler_util = query_utility(IScheduler)
   207         if scheduler_util is not None:
   207         if scheduler_util is not None:
   208             # get task internal ID before transaction ends!!!
   208             # get task internal ID before transaction ends!!!
   218         request = check_request()
   218         request = check_request()
   219         if request.registry:
   219         if request.registry:
   220             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   220             handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   221             if handler:
   221             if handler:
   222                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   222                 zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   223                 context = zmq.Context()
       
   224                 socket = context.socket(zmq.REQ)
       
   225                 socket.connect('tcp://{0}'.format(handler))
       
   226                 zmq_settings = {'zeo': zeo.get_settings(),
   223                 zmq_settings = {'zeo': zeo.get_settings(),
   227                                 'task_name': self.__name__,
   224                                 'task_name': self.__name__,
   228                                 'job_id': kwargs.get('job_id')}
   225                                 'job_id': kwargs.get('job_id')}
   229                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
   226                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
       
   227                 socket = zmq_socket(handler)
   230                 socket.send_json(['run_task', zmq_settings])
   228                 socket.send_json(['run_task', zmq_settings])
   231                 socket.recv_json()
   229                 zmq_response(socket)
   232 
   230 
   233     def __call__(self, *args, **kwargs):
   231     def __call__(self, *args, **kwargs):
   234         report = StringIO()
   232         report = StringIO()
   235         self._run(report, **kwargs)
   233         self._run(report, **kwargs)
   236 
   234 
   370         handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   368         handler = request.registry.settings.get(SCHEDULER_HANDLER_KEY, False)
   371         if handler:
   369         if handler:
   372             task = event.object
   370             task = event.object
   373             scheduler_util = query_utility(IScheduler)
   371             scheduler_util = query_utility(IScheduler)
   374             zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   372             zeo = get_utility(IZEOConnection, scheduler_util.zeo_connection)
   375             context = zmq.Context()
       
   376             socket = context.socket(zmq.REQ)
       
   377             socket.connect('tcp://{0}'.format(handler))
       
   378             zmq_settings = {'zeo': zeo.get_settings(),
   373             zmq_settings = {'zeo': zeo.get_settings(),
   379                             'task_name': task.__name__,
   374                             'task_name': task.__name__,
   380                             'job_id': task.internal_id}
   375                             'job_id': task.internal_id}
   381             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
   376             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
       
   377             socket = zmq_socket(handler)
   382             socket.send_json(['remove_task', zmq_settings])
   378             socket.send_json(['remove_task', zmq_settings])
   383             socket.recv_json()
   379             zmq_response(socket)