68 start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False)) |
68 start_handler = asbool(settings.get(SCHEDULER_STARTER_KEY, False)) |
69 if start_handler: |
69 if start_handler: |
70 # check if port is available |
70 # check if port is available |
71 handler_address = settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555') |
71 handler_address = settings.get(SCHEDULER_HANDLER_KEY, '127.0.0.1:5555') |
72 hostname, port = handler_address.split(':') |
72 hostname, port = handler_address.split(':') |
73 if not is_port_in_use(int(port), hostname): |
73 if is_port_in_use(int(port), hostname): |
74 # get database connection |
74 logger.info("Scheduler port already used, aborting...") |
75 connection = get_connection_from_settings(settings) |
75 return |
76 root = connection.root() |
76 # get database connection |
77 # get application |
77 connection = get_connection_from_settings(settings) |
78 application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME) |
78 root = connection.root() |
79 application = root.get(application_name) |
79 # get application |
80 if application is not None: |
80 application_name = settings.get(PYAMS_APPLICATION_SETTINGS_KEY, PYAMS_APPLICATION_DEFAULT_NAME) |
81 sm = application.getSiteManager() |
81 application = root.get(application_name) |
82 set_local_registry(sm) |
82 if application is not None: |
83 process = None |
83 sm = application.getSiteManager() |
|
84 set_local_registry(sm) |
|
85 process = None |
|
86 try: |
|
87 scheduler_util = sm.get(SCHEDULER_NAME) |
84 try: |
88 try: |
85 scheduler_util = sm.get(SCHEDULER_NAME) |
89 zodb_name = scheduler_util.zodb_name |
86 try: |
90 except ComponentLookupError: |
87 zodb_name = scheduler_util.zodb_name |
91 pass |
88 except ComponentLookupError: |
92 else: |
89 pass |
93 # create scheduler process |
90 else: |
94 process = SchedulerProcess(handler_address, |
91 # create scheduler process |
95 SchedulerMessageHandler, |
92 process = SchedulerProcess(handler_address, |
96 settings.get(SCHEDULER_AUTH_KEY), |
93 SchedulerMessageHandler, |
97 settings.get(SCHEDULER_CLIENTS_KEY), |
94 settings.get(SCHEDULER_AUTH_KEY), |
98 registry) |
95 settings.get(SCHEDULER_CLIENTS_KEY), |
99 # load tasks |
96 registry) |
100 for task in scheduler_util.values(): |
97 # load tasks |
101 trigger = task.get_trigger(registry) |
98 for task in scheduler_util.values(): |
102 logger.debug("Adding scheduler job for task '{0.name}'".format(task)) |
99 trigger = task.get_trigger(registry) |
103 process.scheduler.add_job(task, trigger, |
100 logger.debug("Adding scheduler job for task '{0.name}'".format(task)) |
104 id=str(task.internal_id), |
101 process.scheduler.add_job(task, trigger, |
105 name=task.name, |
102 id=str(task.internal_id), |
106 kwargs={'zodb_name': zodb_name, |
103 name=task.name, |
107 'registry': registry}) |
104 kwargs={'zodb_name': zodb_name, |
108 # start process |
105 'registry': registry}) |
109 logger.info("Starting tasks scheduler {0!r}...".format(process)) |
106 # start process |
110 process.start() |
107 logger.info("Starting tasks scheduler {0!r}...".format(process)) |
111 if process.is_alive(): |
108 process.start() |
112 atexit.register(process_exit_func, process=process) |
109 if process.is_alive(): |
113 logger.info("Started tasks scheduler with PID {0}.".format(process.pid)) |
110 atexit.register(process_exit_func, process=process) |
114 finally: |
111 logger.info("Started tasks scheduler with PID {0}.".format(process.pid)) |
115 if process and not process.is_alive(): |
112 finally: |
116 process.terminate() |
113 if process and not process.is_alive(): |
117 process.join() |
114 process.terminate() |
118 set_local_registry(None) |
115 process.join() |
|
116 set_local_registry(None) |
|