src/pyams_scheduler/include.py
changeset 72 b3c277ed52d4
parent 55 8ba86a8b057b
child 73 0f124b1be6a1
equal deleted inserted replaced
71:19c49f85449e 72:b3c277ed52d4
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
       
    12 from pyams_utils.protocol.tcp import is_port_in_use
       
    13 
    12 
    14 
    13 __docformat__ = 'restructuredtext'
    15 __docformat__ = 'restructuredtext'
    14 
    16 
    15 
    17 
    16 # import standard library
    18 # import standard library
    63 
    65 
    64     registry = get_global_registry()
    66     registry = get_global_registry()
    65     settings = registry.settings
    67     settings = registry.settings
    66     start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False))
    68     start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False))
    67     if start_handler:
    69     if start_handler:
    68         # get database connection
    70         # check if port is available
    69         connection = get_connection_from_settings(settings)
    71         handler_address = settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555')
    70         root = connection.root()
    72         hostname, port = handler_address.split(':')
    71         # get application
    73         if not is_port_in_use(int(port), hostname):
    72         application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
    74             # get database connection
    73         application = root.get(application_name)
    75             connection = get_connection_from_settings(settings)
    74         if application is not None:
    76             root = connection.root()
    75             sm = application.getSiteManager()
    77             # get application
    76             set_local_registry(sm)
    78             application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME)
    77             process = None
    79             application = root.get(application_name)
    78             try:
    80             if application is not None:
    79                 scheduler_util = sm.get(SCHEDULER_NAME)
    81                 sm = application.getSiteManager()
       
    82                 set_local_registry(sm)
       
    83                 process = None
    80                 try:
    84                 try:
    81                     zodb_name = scheduler_util.zodb_name
    85                     scheduler_util = sm.get(SCHEDULER_NAME)
    82                 except ComponentLookupError:
    86                     try:
    83                     pass
    87                         zodb_name = scheduler_util.zodb_name
    84                 else:
    88                     except ComponentLookupError:
    85                     # create scheduler process
    89                         pass
    86                     process = SchedulerProcess(settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555'),
    90                     else:
    87                                                SchedulerMessageHandler,
    91                         # create scheduler process
    88                                                settings.get(SCHEDULER_AUTH_KEY),
    92                         process = SchedulerProcess(handler_address,
    89                                                settings.get(SCHEDULER_CLIENTS_KEY),
    93                                                    SchedulerMessageHandler,
    90                                                registry)
    94                                                    settings.get(SCHEDULER_AUTH_KEY),
    91                     # load tasks
    95                                                    settings.get(SCHEDULER_CLIENTS_KEY),
    92                     for task in scheduler_util.values():
    96                                                    registry)
    93                         trigger = task.get_trigger(registry)
    97                         # load tasks
    94                         logger.debug("Adding scheduler job for task '{0.name}'".format(task))
    98                         for task in scheduler_util.values():
    95                         process.scheduler.add_job(task, trigger,
    99                             trigger = task.get_trigger(registry)
    96                                                   id=str(task.internal_id),
   100                             logger.debug("Adding scheduler job for task '{0.name}'".format(task))
    97                                                   name=task.name,
   101                             process.scheduler.add_job(task, trigger,
    98                                                   kwargs={'zodb_name': zodb_name,
   102                                                       id=str(task.internal_id),
    99                                                           'registry': registry})
   103                                                       name=task.name,
   100                     # start process
   104                                                       kwargs={'zodb_name': zodb_name,
   101                     logger.info("Starting tasks scheduler {0!r}...".format(process))
   105                                                               'registry': registry})
   102                     process.start()
   106                         # start process
   103                     if process.is_alive():
   107                         logger.info("Starting tasks scheduler {0!r}...".format(process))
   104                         atexit.register(process_exit_func, process=process)
   108                         process.start()
   105                         logger.info("Started tasks scheduler with PID {0}.".format(process.pid))
   109                         if process.is_alive():
   106             finally:
   110                             atexit.register(process_exit_func, process=process)
   107                 if process and not process.is_alive():
   111                             logger.info("Started tasks scheduler with PID {0}.".format(process.pid))
   108                     process.terminate()
   112                 finally:
   109                     process.join()
   113                     if process and not process.is_alive():
   110                 set_local_registry(None)
   114                         process.terminate()
       
   115                         process.join()
       
   116                     set_local_registry(None)