src/pyams_scheduler/task.py
changeset 82 6473c2a27f9b
parent 54 9f88cbe85980
equal deleted inserted replaced
81:64b3a28afb23 82:6473c2a27f9b
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
    12 
    12 
    13 __docformat__ = 'restructuredtext'
    13 __docformat__ = 'restructuredtext'
    14 
    14 
    15 
       
    16 # import standard library
       
    17 import logging
    15 import logging
    18 logger = logging.getLogger('PyAMS (scheduler)')
       
    19 
       
    20 import traceback
    16 import traceback
    21 
       
    22 from datetime import datetime, timedelta
    17 from datetime import datetime, timedelta
    23 from io import StringIO
    18 from io import StringIO
    24 
    19 
    25 # import interfaces
    20 import transaction
    26 from pyams_scheduler.interfaces import IScheduler, ITask, ITaskInfo, ITaskHistory, ITaskHistoryContainer, \
    21 from apscheduler.triggers.base import BaseTrigger
    27     ITaskSchedulingMode, SCHEDULER_NAME, SCHEDULER_HANDLER_KEY, SCHEDULER_AUTH_KEY, AfterRunJobEvent, BeforeRunJobEvent
    22 from persistent import Persistent
    28 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
    23 from pyramid.events import subscriber
       
    24 from pyramid.threadlocal import RequestContext
    29 from pyramid_mailer.interfaces import IMailer
    25 from pyramid_mailer.interfaces import IMailer
       
    26 from pyramid_mailer.message import Message
    30 from transaction.interfaces import ITransactionManager
    27 from transaction.interfaces import ITransactionManager
    31 from zope.component.interfaces import ISite
    28 from zope.component.interfaces import ISite
       
    29 from zope.container.contained import Contained
       
    30 from zope.container.folder import Folder
       
    31 from zope.interface import alsoProvides, implementer, noLongerProvides
    32 from zope.interface.interfaces import ComponentLookupError
    32 from zope.interface.interfaces import ComponentLookupError
    33 from zope.intid.interfaces import IIntIds
    33 from zope.intid.interfaces import IIntIds
    34 from zope.lifecycleevent.interfaces import IObjectRemovedEvent, IObjectAddedEvent, IObjectModifiedEvent
    34 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, IObjectRemovedEvent
    35 
    35 from zope.location import locate
    36 # import packages
    36 from zope.schema.fieldproperty import FieldProperty
    37 import transaction
    37 
    38 
    38 from pyams_scheduler.interfaces import AfterRunJobEvent, BeforeRunJobEvent, IScheduler, ITask, ITaskHistory, \
    39 from apscheduler.triggers.base import BaseTrigger
    39     ITaskHistoryContainer, ITaskInfo, ITaskSchedulingMode, SCHEDULER_AUTH_KEY, SCHEDULER_HANDLER_KEY, SCHEDULER_NAME
    40 from persistent import Persistent
       
    41 from pyams_utils.date import get_duration
    40 from pyams_utils.date import get_duration
    42 from pyams_utils.registry import query_utility, get_utility, set_local_registry, get_global_registry
    41 from pyams_utils.interfaces import PYAMS_APPLICATION_DEFAULT_NAME, PYAMS_APPLICATION_SETTINGS_KEY
       
    42 from pyams_utils.registry import get_utility, query_utility, set_local_registry
    43 from pyams_utils.request import check_request
    43 from pyams_utils.request import check_request
    44 from pyams_utils.timezone import tztime
    44 from pyams_utils.timezone import tztime
    45 from pyams_utils.traversing import get_parent
    45 from pyams_utils.traversing import get_parent
    46 from pyams_utils.zodb import ZODBConnection
    46 from pyams_utils.zodb import ZODBConnection
    47 from pyams_zmq.socket import zmq_socket, zmq_response
    47 from pyams_zmq.socket import zmq_response, zmq_socket
    48 from pyramid.events import subscriber
    48 
    49 from pyramid.threadlocal import RequestContext
    49 
    50 from pyramid_mailer.message import Message
    50 logger = logging.getLogger('PyAMS (scheduler)')
    51 from zope.container.contained import Contained
       
    52 from zope.container.folder import Folder
       
    53 from zope.interface import implementer, alsoProvides, noLongerProvides
       
    54 from zope.location import locate
       
    55 from zope.schema.fieldproperty import FieldProperty
       
    56 
    51 
    57 
    52 
    58 class ImmediateTaskTrigger(BaseTrigger):
    53 class ImmediateTaskTrigger(BaseTrigger):
    59     """Immediate-style task scheduler"""
    54     """Immediate-style task scheduler"""
    60 
    55 
   108     keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
   103     keep_empty_reports = FieldProperty(ITask['keep_empty_reports'])
   109     _history_duration = FieldProperty(ITask['history_duration'])
   104     _history_duration = FieldProperty(ITask['history_duration'])
   110     _history_length = FieldProperty(ITask['history_length'])
   105     _history_length = FieldProperty(ITask['history_length'])
   111 
   106 
   112     settings_view_name = FieldProperty(ITask['settings_view_name'])
   107     settings_view_name = FieldProperty(ITask['settings_view_name'])
       
   108     principal_id = None
   113 
   109 
   114     _internal_id = None
   110     _internal_id = None
   115 
   111 
   116     def __init__(self):
   112     def __init__(self):
   117         history = self.history = TaskHistoryContainer()
   113         history = self.history = TaskHistoryContainer()
   245         """Task execution wrapper"""
   241         """Task execution wrapper"""
   246         zodb_connection = ZODBConnection(name=kwargs.get('zodb_name', ''))
   242         zodb_connection = ZODBConnection(name=kwargs.get('zodb_name', ''))
   247         with zodb_connection as root:
   243         with zodb_connection as root:
   248             try:
   244             try:
   249                 registry = kwargs.get('registry')
   245                 registry = kwargs.get('registry')
   250                 request = check_request(registry=registry)
   246                 application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   251                 with RequestContext(request):
   247                                                          PYAMS_APPLICATION_DEFAULT_NAME)
   252                     application_name = registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   248                 sm = root.get(application_name).getSiteManager()
   253                                                              PYAMS_APPLICATION_DEFAULT_NAME)
   249                 scheduler_util = sm.get(SCHEDULER_NAME)
   254                     sm = root.get(application_name).getSiteManager()
   250                 task = scheduler_util.get(self.__name__)
   255                     scheduler_util = sm.get(SCHEDULER_NAME)
   251                 if task is not None:
   256                     task = scheduler_util.get(self.__name__)
   252                     set_local_registry(sm)
   257                     if task is not None:
   253                     request = check_request(registry=registry, principal_id=self.principal_id)
       
   254                     with RequestContext(request):
   258                         if not (kwargs.get('run_immediate') or task.is_runnable(registry)):
   255                         if not (kwargs.get('run_immediate') or task.is_runnable(registry)):
   259                             logger.debug("Skipping inactive task {0}".format(task.name))
   256                             logger.debug("Skipping inactive task {0}".format(task.name))
   260                             return
   257                             return
   261                         set_local_registry(sm)
       
   262                         tm = ITransactionManager(task)
   258                         tm = ITransactionManager(task)
   263                         for attempt in tm.attempts():
   259                         for attempt in tm.attempts():
   264                             with attempt as t:
   260                             with attempt as t:
   265                                 start = datetime.utcnow()
   261                                 start = datetime.utcnow()
   266                                 try:
   262                                 try:
   321         self.check_history()
   317         self.check_history()
   322 
   318 
   323     def send_report(self, report, status, registry):
   319     def send_report(self, report, status, registry):
   324         try:
   320         try:
   325             mailer_name = self.__parent__.report_mailer
   321             mailer_name = self.__parent__.report_mailer
   326         except (TypeError, ComponentLookupError):
   322         except (TypeError, AttributeError, ComponentLookupError):
   327             return
   323             return
   328         if ((status == 'Empty') and not self.send_empty_reports) or \
   324         if ((status == 'Empty') and not self.send_empty_reports) or \
   329            ((status == 'OK') and self.report_errors_only):
   325            ((status == 'OK') and self.report_errors_only):
   330             return
   326             return
   331         message_target = self.report_target
   327         message_target = self.report_target