src/pyams_scheduler/process.py
changeset 5 0fe262326e0e
parent 0 48483b0b26fa
child 18 4806161ea41a
equal deleted inserted replaced
4:03b9d3744d75 5:0fe262326e0e
    64             return
    64             return
    65         job_id = str(job_id)
    65         job_id = str(job_id)
    66         logger.debug("Loading ZEO connection...")
    66         logger.debug("Loading ZEO connection...")
    67         with self._get_connection() as root:
    67         with self._get_connection() as root:
    68             logger.debug("Loaded ZODB root {0!r}".format(root))
    68             logger.debug("Loaded ZODB root {0!r}".format(root))
    69             tm = None
       
    70             try:
    69             try:
    71                 try:
    70                 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
    72                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
    71                                                                       PYAMS_APPLICATION_DEFAULT_NAME)
    73                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
    72                 application = root.get(application_name)
    74                     application = root.get(application_name)
    73                 logger.debug("Loaded application {0!r}".format(application))
    75                     logger.debug("Loaded application {0!r}".format(application))
    74                 sm = application.getSiteManager()
    76                     sm = application.getSiteManager()
    75                 scheduler_util = sm.get(SCHEDULER_NAME)
    77                     scheduler_util = sm.get(SCHEDULER_NAME)
    76                 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
    78                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
    77                 scheduler = self.process.scheduler
    79                     scheduler = self.process.scheduler
    78                 logger.debug("Removing job '{0}'".format(job_id))
    80                     logger.debug("Removing job '{0}'".format(job_id))
    79                 job = scheduler.get_job(job_id)
    81                     job = scheduler.get_job(job_id)
    80                 if job is not None:
    82                     if job is not None:
    81                     logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
    83                         logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
    82                     scheduler.remove_job(job.id)
    84                         scheduler.remove_job(job.id)
    83                 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
    85                     logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
    84                 task = scheduler_util.get(settings.get('task_name').lower())
    86                     task = scheduler_util.get(settings.get('task_name').lower())
    85                 logger.debug("Loaded scheduler task {0!r}".format(task))
    87                     logger.debug("Loaded scheduler task {0!r}".format(task))
    86                 if task is not None:
    88                     if task is not None:
    87                     trigger = task.get_trigger(self.process.registry)
    89                         trigger = task.get_trigger(self.process.registry)
    88                     logger.debug("Getting task trigger {0!r}".format(trigger))
    90                         logger.debug("Getting task trigger {0!r}".format(trigger))
    89                     zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
    91                         zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
    90                     logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
    92                         logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
    91                     scheduler.add_job(task, trigger,
    93                         scheduler.add_job(task, trigger,
    92                                       id=str(task.internal_id),
    94                                           id=str(task.internal_id),
    93                                       name=task.name,
    95                                           name=task.name,
    94                                       kwargs={'zeo_settings': zeo_connection.get_settings(),
    96                                           kwargs={'zeo_settings': zeo_connection.get_settings(),
    95                                               'registry': self.process.registry})
    97                                                   'registry': self.process.registry})
    96                     logger.debug("Added job")
    98                         logger.debug("Added job")
    97             except:
    99                 except:
    98                 logger.exception("An exception occurred:")
   100                     logger.exception("An exception occurred:")
       
   101             finally:
       
   102                 if tm is not None:
       
   103                     tm.abort()
       
   104 
    99 
   105 
   100 
   106 class TaskRemoverThread(BaseTaskThread):
   101 class TaskRemoverThread(BaseTaskThread):
   107     """Task remover thread"""
   102     """Task remover thread"""
   108 
   103 
   114             return
   109             return
   115         job_id = str(job_id)
   110         job_id = str(job_id)
   116         logger.debug("Loading ZEO connection...")
   111         logger.debug("Loading ZEO connection...")
   117         with self._get_connection() as root:
   112         with self._get_connection() as root:
   118             logger.debug("Loaded ZODB root {0!r}".format(root))
   113             logger.debug("Loaded ZODB root {0!r}".format(root))
   119             tm = None
       
   120             try:
   114             try:
   121                 try:
   115                 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   122                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   116                                                                       PYAMS_APPLICATION_DEFAULT_NAME)
   123                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
   117                 application = root.get(application_name)
   124                     application = root.get(application_name)
   118                 logger.debug("Loaded application {0!r}".format(application))
   125                     logger.debug("Loaded application {0!r}".format(application))
   119                 sm = application.getSiteManager()
   126                     sm = application.getSiteManager()
   120                 scheduler_util = sm.get(SCHEDULER_NAME)
   127                     scheduler_util = sm.get(SCHEDULER_NAME)
   121                 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
   128                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
   122                 scheduler = self.process.scheduler
   129                     scheduler = self.process.scheduler
   123                 logger.debug("Removing job '{0}'".format(job_id))
   130                     logger.debug("Removing job '{0}'".format(job_id))
   124                 job = scheduler.get_job(job_id)
   131                     job = scheduler.get_job(job_id)
   125                 if job is not None:
   132                     if job is not None:
   126                     logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
   133                         logger.debug("Loaded job {0!r} ({0.id!r})".format(job))
   127                     scheduler.remove_job(job.id)
   134                         scheduler.remove_job(job.id)
   128                 logger.debug("Removed job")
   135                     logger.debug("Removed job")
   129             except:
   136                 except:
   130                 logger.exception("An exception occurred:")
   137                     logger.exception("An exception occurred:")
       
   138             finally:
       
   139                 if tm is not None:
       
   140                     tm.abort()
       
   141 
   131 
   142 
   132 
   143 class TaskRunnerThread(BaseTaskThread):
   133 class TaskRunnerThread(BaseTaskThread):
   144     """Task immediate runner thread"""
   134     """Task immediate runner thread"""
   145 
   135 
   150         if job_id is None:
   140         if job_id is None:
   151             return
   141             return
   152         logger.debug("Loading ZEO connection...")
   142         logger.debug("Loading ZEO connection...")
   153         with self._get_connection() as root:
   143         with self._get_connection() as root:
   154             logger.debug("Loaded ZODB root {0!r}".format(root))
   144             logger.debug("Loaded ZODB root {0!r}".format(root))
   155             tm = None
       
   156             try:
   145             try:
   157                 try:
   146                 application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   158                     application_name = self.process.registry.settings.get(PYAMS_APPLICATION_SETTINGS_KEY,
   147                                                                       PYAMS_APPLICATION_DEFAULT_NAME)
   159                                                                           PYAMS_APPLICATION_DEFAULT_NAME)
   148                 application = root.get(application_name)
   160                     application = root.get(application_name)
   149                 logger.debug("Loaded application {0!r}".format(application))
   161                     logger.debug("Loaded application {0!r}".format(application))
   150                 sm = application.getSiteManager()
   162                     sm = application.getSiteManager()
   151                 scheduler_util = sm.get(SCHEDULER_NAME)
   163                     scheduler_util = sm.get(SCHEDULER_NAME)
   152                 logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
   164                     logger.debug("Loaded scheduler utility {0!r}".format(scheduler_util))
   153                 scheduler = self.process.scheduler
   165                     scheduler = self.process.scheduler
   154                 logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
   166                     logger.debug("Loading scheduler task '{0}'".format(settings.get('task_name').lower()))
   155                 task = scheduler_util.get(settings.get('task_name').lower())
   167                     task = scheduler_util.get(settings.get('task_name').lower())
   156                 logger.debug("Loaded scheduler task {0!r}".format(task))
   168                     logger.debug("Loaded scheduler task {0!r}".format(task))
   157                 if task is not None:
   169                     if task is not None:
   158                     trigger = ImmediateTaskTrigger()
   170                         trigger = ImmediateTaskTrigger()
   159                     logger.debug("Getting task trigger {0!r}".format(trigger))
   171                         logger.debug("Getting task trigger {0!r}".format(trigger))
   160                     zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
   172                         zeo_connection = sm.getUtility(IZEOConnection, name=scheduler_util.zeo_connection)
   161                     logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
   173                         logger.debug("Adding new job to scheduler {0!r}".format(scheduler))
   162                     scheduler.add_job(task, trigger,
   174                         scheduler.add_job(task, trigger,
   163                                       id='{0.internal_id}::{1}'.format(task,
   175                                           id='{0.internal_id}::{1}'.format(task,
   164                                                                        datetime.utcnow().isoformat()),
   176                                                                            datetime.utcnow().isoformat()),
   165                                       name=task.name,
   177                                           name=task.name,
   166                                       kwargs={'zeo_settings': zeo_connection.get_settings(),
   178                                           kwargs={'zeo_settings': zeo_connection.get_settings(),
   167                                               'registry': self.process.registry,
   179                                                   'registry': self.process.registry,
   168                                               'run_immediate': True})
   180                                                   'run_immediate': True})
   169                     logger.debug("Added job")
   181                         logger.debug("Added job")
   170             except:
   182                 except:
   171                 logger.exception("An exception occurred:")
   183                     logger.exception("An exception occurred:")
       
   184             finally:
       
   185                 if tm is not None:
       
   186                     tm.abort()
       
   187 
   172 
   188 
   173 
   189 class SchedulerHandler(object):
   174 class SchedulerHandler(object):
   190     """Scheduler handler"""
   175     """Scheduler handler"""
   191 
   176