src/pyams_scheduler/include.py
changeset 73 0f124b1be6a1
parent 72 b3c277ed52d4
child 75 acea456f532a
equal deleted inserted replaced
72:b3c277ed52d4 73:0f124b1be6a1
    68     start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False))
    68     start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False))
    69     if start_handler:
    69     if start_handler:
    70         # check if port is available
    70         # check if port is available
    71         handler_address = settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555')
    71         handler_address = settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555')
    72         hostname, port = handler_address.split(':')
    72         hostname, port = handler_address.split(':')
    73         if not is_port_in_use(int(port), hostname):
    73         if is_port_in_use(int(port), hostname):
    74             # get database connection
    74             logger.info("Scheduler port already used, aborting...")
    75             connection = get_connection_from_settings(settings)
    75             return
    76             root = connection.root()
    76         # get database connection
    77             # get application
    77         connection = get_connection_from_settings(settings)
    78             application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
    78         root = connection.root()
    79             application = root.get(application_name)
    79         # get application
    80             if application is not None:
    80         application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
    81                 sm = application.getSiteManager()
    81         application = root.get(application_name)
    82                 set_local_registry(sm)
    82         if application is not None:
    83                 process = None
    83             sm = application.getSiteManager()
       
    84             set_local_registry(sm)
       
    85             process = None
       
    86             try:
       
    87                 scheduler_util = sm.get(SCHEDULER_NAME)
    84                 try:
    88                 try:
    85                     scheduler_util = sm.get(SCHEDULER_NAME)
    89                     zodb_name = scheduler_util.zodb_name
    86                     try:
    90                 except ComponentLookupError:
    87                         zodb_name = scheduler_util.zodb_name
    91                     pass
    88                     except ComponentLookupError:
    92                 else:
    89                         pass
    93                     # create scheduler process
    90                     else:
    94                     process = SchedulerProcess(handler_address,
    91                         # create scheduler process
    95                                                SchedulerMessageHandler,
    92                         process = SchedulerProcess(handler_address,
    96                                                settings.get(SCHEDULER_AUTH_KEY),
    93                                                    SchedulerMessageHandler,
    97                                                settings.get(SCHEDULER_CLIENTS_KEY),
    94                                                    settings.get(SCHEDULER_AUTH_KEY),
    98                                                registry)
    95                                                    settings.get(SCHEDULER_CLIENTS_KEY),
    99                     # load tasks
    96                                                    registry)
   100                     for task in scheduler_util.values():
    97                         # load tasks
   101                         trigger = task.get_trigger(registry)
    98                         for task in scheduler_util.values():
   102                         logger.debug("Adding scheduler job for task '{0.name}'".format(task))
    99                             trigger = task.get_trigger(registry)
   103                         process.scheduler.add_job(task, trigger,
   100                             logger.debug("Adding scheduler job for task '{0.name}'".format(task))
   104                                                   id=str(task.internal_id),
   101                             process.scheduler.add_job(task, trigger,
   105                                                   name=task.name,
   102                                                       id=str(task.internal_id),
   106                                                   kwargs={'zodb_name': zodb_name,
   103                                                       name=task.name,
   107                                                           'registry': registry})
   104                                                       kwargs={'zodb_name': zodb_name,
   108                     # start process
   105                                                               'registry': registry})
   109                     logger.info("Starting tasks scheduler {0!r}...".format(process))
   106                         # start process
   110                     process.start()
   107                         logger.info("Starting tasks scheduler {0!r}...".format(process))
   111                     if process.is_alive():
   108                         process.start()
   112                         atexit.register(process_exit_func, process=process)
   109                         if process.is_alive():
   113                         logger.info("Started tasks scheduler with PID {0}.".format(process.pid))
   110                             atexit.register(process_exit_func, process=process)
   114             finally:
   111                             logger.info("Started tasks scheduler with PID {0}.".format(process.pid))
   115                 if process and not process.is_alive():
   112                 finally:
   116                     process.terminate()
   113                     if process and not process.is_alive():
   117                     process.join()
   114                         process.terminate()
   118                 set_local_registry(None)
   115                         process.join()
       
   116                     set_local_registry(None)