src/pyams_scheduler/process.py
changeset 0 48483b0b26fa
child 5 0fe262326e0e
equal deleted inserted replaced
-1:000000000000 0:48483b0b26fa
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import logging
       
    18 logger = logging.getLogger('PyAMS (scheduler)')
       
    19 
       
    20 from datetime import datetime
       
    21 from threading import Thread
       
    22 
       
    23 # import interfaces
       
    24 from pyams_scheduler.interfaces import SCHEDULER_NAME
       
    25 from pyams_utils.interfaces import PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME
       
    26 from pyams_utils.interfaces.zeo import IZEOConnection
       
    27 
       
    28 # import packages
       
    29 from apscheduler.jobstores.memory import MemoryJobStore
       
    30 from apscheduler.schedulers.background import BackgroundScheduler
       
    31 from pyams_scheduler.task import ImmediateTaskTrigger
       
    32 from pyams_utils.zodb import ZEOConnection
       
    33 from pyams_zmq.handler import ZMQMessageHandler
       
    34 from pyams_zmq.process import ZMQProcess
       
    35 
       
    36 
       
    37 class BaseTaskThread(Thread):
       
    38 
       
    39     def __init__(self, process, settings):
       
    40         Thread.__init__(self)
       
    41         self.process = process
       
    42         self.settings = settings
       
    43 
       
    44     def _get_connection(self):
       
    45         zeo_settings = self.settings.get('zeo')
       
    46         connection = ZEOConnection()
       
    47         connection.update(zeo_settings)
       
    48         return connection
       
    49 
       
    50 
       
    51 class TaskResettingThread(BaseTaskThread):
       
    52     """Task resetting thread
       
    53 
       
    54     Task reset is run in another thread, so that:
       
    55     - other transactions applied on updated tasks are visible
       
    56     - ØMQ request returns immediately to calling process
       
    57     """
       
    58 
       
    59     def run(self):
       
    60         logger.debug("Starting task resetting thread...")
       
    61         settings = self.settings
       
    62         job_id = settings.get('job_id')
       
    63         if job_id is None:
       
    64             return
       
    65         job_id = str(job_id)
       
    66         logger.debug("Loading ZEO connection...")
       
    67         with self._get_connection() as root:
       
    68             logger.debug("Loaded ZODB root {0!r}".format(root))
       
    69             tm = None
       
    70             try:
       
    71                 try:
       
    72                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
       
    73                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
       
    74                     application = root.get(application_name)
       
    75                     logger.debug("Loaded application {0!r}".format(application))
       
    76                     sm = application.getSiteManager()
       
    77                     scheduler_util = sm.get(SCHEDULER_NAME)
       
    78                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
       
    79                     scheduler = self.process.scheduler
       
    80                     logger.debug("Removing job '{0}'".format(job_id))
       
    81                     job = scheduler.get_job(job_id)
       
    82                     if job is not None:
       
    83                         logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
       
    84                         scheduler.remove_job(job.id)
       
    85                     logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
       
    86                     task = scheduler_util.get(settings.get('task_name').lower())
       
    87                     logger.debug("Loaded scheduler task {0!r}".format(task))
       
    88                     if task is not None:
       
    89                         trigger = task.get_trigger(self.process.registry)
       
    90                         logger.debug("Getting task trigger {0!r}".format(trigger))
       
    91                         zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
       
    92                         logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
       
    93                         scheduler.add_job(task, trigger,
       
    94                                           id=str(task.internal_id),
       
    95                                           name=task.name,
       
    96                                           kwargs={'zeo_settings': zeo_connection.get_settings(),
       
    97                                                   'registry': self.process.registry})
       
    98                         logger.debug("Added job")
       
    99                 except:
       
   100                     logger.exception("An exception occurred:")
       
   101             finally:
       
   102                 if tm is not None:
       
   103                     tm.abort()
       
   104 
       
   105 
       
   106 class TaskRemoverThread(BaseTaskThread):
       
   107     """Task remover thread"""
       
   108 
       
   109     def run(self):
       
   110         logger.debug("Starting task remover thread...")
       
   111         settings = self.settings
       
   112         job_id = settings.get('job_id')
       
   113         if job_id is None:
       
   114             return
       
   115         job_id = str(job_id)
       
   116         logger.debug("Loading ZEO connection...")
       
   117         with self._get_connection() as root:
       
   118             logger.debug("Loaded ZODB root {0!r}".format(root))
       
   119             tm = None
       
   120             try:
       
   121                 try:
       
   122                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
       
   123                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
       
   124                     application = root.get(application_name)
       
   125                     logger.debug("Loaded application {0!r}".format(application))
       
   126                     sm = application.getSiteManager()
       
   127                     scheduler_util = sm.get(SCHEDULER_NAME)
       
   128                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
       
   129                     scheduler = self.process.scheduler
       
   130                     logger.debug("Removing job '{0}'".format(job_id))
       
   131                     job = scheduler.get_job(job_id)
       
   132                     if job is not None:
       
   133                         logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
       
   134                         scheduler.remove_job(job.id)
       
   135                     logger.debug("Removed job")
       
   136                 except:
       
   137                     logger.exception("An exception occurred:")
       
   138             finally:
       
   139                 if tm is not None:
       
   140                     tm.abort()
       
   141 
       
   142 
       
   143 class TaskRunnerThread(BaseTaskThread):
       
   144     """Task immediate runner thread"""
       
   145 
       
   146     def run(self):
       
   147         logger.debug("Starting task runner thread...")
       
   148         settings = self.settings
       
   149         job_id = settings.get('job_id')
       
   150         if job_id is None:
       
   151             return
       
   152         logger.debug("Loading ZEO connection...")
       
   153         with self._get_connection() as root:
       
   154             logger.debug("Loaded ZODB root {0!r}".format(root))
       
   155             tm = None
       
   156             try:
       
   157                 try:
       
   158                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
       
   159                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
       
   160                     application = root.get(application_name)
       
   161                     logger.debug("Loaded application {0!r}".format(application))
       
   162                     sm = application.getSiteManager()
       
   163                     scheduler_util = sm.get(SCHEDULER_NAME)
       
   164                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
       
   165                     scheduler = self.process.scheduler
       
   166                     logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
       
   167                     task = scheduler_util.get(settings.get('task_name').lower())
       
   168                     logger.debug("Loaded scheduler task {0!r}".format(task))
       
   169                     if task is not None:
       
   170                         trigger = ImmediateTaskTrigger()
       
   171                         logger.debug("Getting task trigger {0!r}".format(trigger))
       
   172                         zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
       
   173                         logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
       
   174                         scheduler.add_job(task, trigger,
       
   175                                           id='{0.internal_id}::{1}'.format(task,
       
   176                                                                            datetime.utcnow().isoformat()),
       
   177                                           name=task.name,
       
   178                                           kwargs={'zeo_settings': zeo_connection.get_settings(),
       
   179                                                   'registry': self.process.registry,
       
   180                                                   'run_immediate': True})
       
   181                         logger.debug("Added job")
       
   182                 except:
       
   183                     logger.exception("An exception occurred:")
       
   184             finally:
       
   185                 if tm is not None:
       
   186                     tm.abort()
       
   187 
       
   188 
       
   189 class SchedulerHandler(object):
       
   190     """Scheduler handler"""
       
   191 
       
   192     def get_jobs(self, settings):
       
   193         scheduler = self.process.scheduler
       
   194         return [{'id': job.id,
       
   195                  'name': job.name,
       
   196                  'trigger': '{0!s}'.format(job.trigger),
       
   197                  'next_run': job.next_run_time.timestamp()} for job in scheduler.get_jobs()]
       
   198 
       
   199     def reset_task(self, settings):
       
   200         TaskResettingThread(self.process, settings).start()
       
   201         return 'OK'
       
   202 
       
   203     def remove_task(self, settings):
       
   204         TaskRemoverThread(self.process, settings).start()
       
   205         return 'OK'
       
   206 
       
   207     def run_task(self, settings):
       
   208         TaskRunnerThread(self.process, settings).start()
       
   209         return 'OK'
       
   210 
       
   211 
       
   212 class SchedulerMessageHandler(ZMQMessageHandler):
       
   213     """ØMQ scheduler messages handler"""
       
   214 
       
   215     handler = SchedulerHandler
       
   216 
       
   217 
       
   218 class SchedulerProcess(ZMQProcess):
       
   219     """ØMQ tasks scheduler process"""
       
   220 
       
   221     def __init__(self, zmq_address, handler, registry):
       
   222         ZMQProcess.__init__(self, zmq_address, handler)
       
   223         self.registry = registry
       
   224         self.scheduler = BackgroundScheduler()
       
   225         self.jobstore = MemoryJobStore()
       
   226 
       
   227     def run(self):
       
   228         if self.scheduler is not None:
       
   229             self.scheduler.add_jobstore(self.jobstore, 'default')
       
   230             self.scheduler.start()
       
   231         ZMQProcess.run(self)