src/pyams_scheduler/task.py
changeset 54 9f88cbe85980
parent 51 2eea97d70f10
child 82 6473c2a27f9b
equal deleted inserted replaced
53:cd55139c5bae 54:9f88cbe85980
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
    12 from pyramid.threadlocal import RequestContext
       
    13 
    12 
    14 __docformat__ = 'restructuredtext'
    13 __docformat__ = 'restructuredtext'
    15 
    14 
    16 
    15 
    17 # import standard library
    16 # import standard library
    22 
    21 
    23 from datetime import datetime, timedelta
    22 from datetime import datetime, timedelta
    24 from io import StringIO
    23 from io import StringIO
    25 
    24 
    26 # import interfaces
    25 # import interfaces
    27 from pyams_scheduler.interfaces import ITaskHistory, ITask, ITaskHistoryContainer, ITaskSchedulingMode, IScheduler, \
    26 from pyams_scheduler.interfaces import IScheduler, ITask, ITaskInfo, ITaskHistory, ITaskHistoryContainer, \
    28     SCHEDULER_HANDLER_KEY, AfterRunJobEvent, SCHEDULER_NAME, BeforeRunJobEvent, ITaskInfo
    27     ITaskSchedulingMode, SCHEDULER_NAME, SCHEDULER_HANDLER_KEY, SCHEDULER_AUTH_KEY, AfterRunJobEvent, BeforeRunJobEvent
    29 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
    28 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
    30 from pyramid_mailer.interfaces import IMailer
    29 from pyramid_mailer.interfaces import IMailer
    31 from transaction.interfaces import ITransactionManager
    30 from transaction.interfaces import ITransactionManager
    32 from zope.component.interfaces import ISite
    31 from zope.component.interfaces import ISite
    33 from zope.interface.interfaces import ComponentLookupError
    32 from zope.interface.interfaces import ComponentLookupError
    45 from pyams_utils.timezone import tztime
    44 from pyams_utils.timezone import tztime
    46 from pyams_utils.traversing import get_parent
    45 from pyams_utils.traversing import get_parent
    47 from pyams_utils.zodb import ZODBConnection
    46 from pyams_utils.zodb import ZODBConnection
    48 from pyams_zmq.socket import zmq_socket, zmq_response
    47 from pyams_zmq.socket import zmq_socket, zmq_response
    49 from pyramid.events import subscriber
    48 from pyramid.events import subscriber
       
    49 from pyramid.threadlocal import RequestContext
    50 from pyramid_mailer.message import Message
    50 from pyramid_mailer.message import Message
    51 from zope.container.contained import Contained
    51 from zope.container.contained import Contained
    52 from zope.container.folder import Folder
    52 from zope.container.folder import Folder
    53 from zope.interface import implementer, alsoProvides, noLongerProvides
    53 from zope.interface import implementer, alsoProvides, noLongerProvides
    54 from zope.location import locate
    54 from zope.location import locate
   197             if handler:
   197             if handler:
   198                 zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   198                 zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   199                                 'task_name': self.__name__,
   199                                 'task_name': self.__name__,
   200                                 'job_id': kwargs.get('job_id')}
   200                                 'job_id': kwargs.get('job_id')}
   201                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
   201                 logger.debug("Resetting task {0.name} with {1!r}".format(self, zmq_settings))
   202                 socket = zmq_socket(handler)
   202                 socket = zmq_socket(handler, auth=request.registry.settings.get(SCHEDULER_AUTH_KEY))
   203                 socket.send_json(['reset_task', zmq_settings])
   203                 socket.send_json(['reset_task', zmq_settings])
   204                 zmq_response(socket)
   204                 zmq_response(socket)
   205 
   205 
   206     def launch(self):
   206     def launch(self):
   207         scheduler_util = query_utility(IScheduler)
   207         scheduler_util = query_utility(IScheduler)
   222             if handler:
   222             if handler:
   223                 zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   223                 zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   224                                 'task_name': self.__name__,
   224                                 'task_name': self.__name__,
   225                                 'job_id': kwargs.get('job_id')}
   225                                 'job_id': kwargs.get('job_id')}
   226                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
   226                 logger.debug("Running task {0.name} with {1!r}".format(self, zmq_settings))
   227                 socket = zmq_socket(handler)
   227                 socket = zmq_socket(handler, auth=request.registry.settings.get(SCHEDULER_AUTH_KEY))
   228                 socket.send_json(['run_task', zmq_settings])
   228                 socket.send_json(['run_task', zmq_settings])
   229                 zmq_response(socket)
   229                 zmq_response(socket)
   230 
   230 
   231     def __call__(self, *args, **kwargs):
   231     def __call__(self, *args, **kwargs):
   232         report = StringIO()
   232         report = StringIO()
   375             scheduler_util = query_utility(IScheduler)
   375             scheduler_util = query_utility(IScheduler)
   376             zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   376             zmq_settings = {'zodb_name': scheduler_util.zodb_name,
   377                             'task_name': task.__name__,
   377                             'task_name': task.__name__,
   378                             'job_id': task.internal_id}
   378                             'job_id': task.internal_id}
   379             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
   379             logger.debug("Removing task {0.name} with {1!r}".format(task, zmq_settings))
   380             socket = zmq_socket(handler)
   380             socket = zmq_socket(handler, auth=request.registry.settings.get(SCHEDULER_AUTH_KEY))
   381             socket.send_json(['remove_task', zmq_settings])
   381             socket.send_json(['remove_task', zmq_settings])
   382             zmq_response(socket)
   382             zmq_response(socket)