Updated workflow tasks management so that planned publication and retiring are handled correctly even if server is stopped when the task should be executed
authorThierry Florac <tflorac@ulthar.net>
Fri, 11 Jan 2019 09:57:15 +0100
changeset 1224 c51128c007d6
parent 1223 99a4c33e2962
child 1225 900f3c39d29e
Updated workflow tasks management so that planned publication and retiring are handled correctly even if server is stopped when the task should be executed
src/pyams_content/workflow/__init__.py
src/pyams_content/workflow/basic.py
src/pyams_content/workflow/task.py
--- a/src/pyams_content/workflow/__init__.py	Thu Jan 10 17:40:28 2019 +0100
+++ b/src/pyams_content/workflow/__init__.py	Fri Jan 11 09:57:15 2019 +0100
@@ -12,7 +12,7 @@
 
 __docformat__ = 'restructuredtext'
 
-from datetime import datetime
+from datetime import datetime, timedelta
 
 from pyramid.threadlocal import get_current_registry
 from zope.copy import copy
@@ -21,12 +21,13 @@
 from zope.location import locate
 from zope.schema.vocabulary import SimpleTerm, SimpleVocabulary
 
+from pyams_content import _
 from pyams_content.interfaces import CREATE_VERSION_PERMISSION, MANAGE_CONTENT_PERMISSION, MANAGE_SITE_ROOT_PERMISSION, \
     PUBLISH_CONTENT_PERMISSION
 from pyams_content.interfaces import MANAGER_ROLE, OWNER_ROLE, PILOT_ROLE, READER_ROLE, WEBMASTER_ROLE
 from pyams_content.shared.common.interfaces import IManagerRestrictions, IWfSharedContentRoles
 from pyams_content.workflow.interfaces import IContentWorkflow
-from pyams_content.workflow.task import ContentPublishingTask, ContentArchivingTask
+from pyams_content.workflow.task import ContentArchivingTask, ContentPublishingTask
 from pyams_scheduler.interfaces import IDateTaskScheduling, IScheduler
 from pyams_security.interfaces import IRoleProtectedObject
 from pyams_sequence.interfaces import ISequentialIdInfo
@@ -34,11 +35,11 @@
 from pyams_utils.date import format_datetime
 from pyams_utils.registry import get_utility, query_utility, utility_config
 from pyams_utils.request import check_request
+from pyams_utils.timezone import gmtime
 from pyams_workflow.interfaces import AUTOMATIC, IWorkflow, IWorkflowInfo, IWorkflowPublicationInfo, IWorkflowState, \
     IWorkflowStateLabel, IWorkflowVersions, ObjectClonedEvent, SYSTEM
 from pyams_workflow.workflow import Transition, Workflow
 
-from pyams_content import _
 
 #
 # Workflow states
@@ -238,6 +239,17 @@
 # Workflow actions
 #
 
+def remove_scheduler_task(context):
+    """Remove any scheduler task for this context"""
+    scheduler = query_utility(IScheduler)
+    if scheduler is not None:
+        intids = get_utility(IIntIds)
+        context_id = intids.queryId(context)
+        task_id = 'workflow::{}'.format(context_id)
+        if task_id in scheduler:
+            del scheduler[task_id]
+
+
 def reset_publication_action(wf, context):
     """Refuse version publication"""
     IWorkflowPublicationInfo(context).reset(complete=True)
@@ -268,13 +280,7 @@
 
 def cancel_prepublish_action(wf, context):
     """Cancel pre-publication"""
-    scheduler = query_utility(IScheduler)
-    if scheduler is not None:
-        intids = get_utility(IIntIds)
-        context_id = intids.queryId(context)
-        task_id = 'workflow::{}'.format(context_id)
-        if task_id in scheduler:
-            del scheduler[task_id]
+    remove_scheduler_task(context)
 
 
 def publish_action(wf, context):
@@ -291,6 +297,9 @@
                                                           comment=translate(_("Published version {0}")).format(
                                                               version_id))
     # check expiration date and create auto-archiving task if needed
+    # we compare expiration date with current date to handle the case where content is
+    # published automatically at application startup, and we add a small amount of time
+    # to be sure that scheduler and indexer processes are started
     if publication_info.publication_expiration_date:
         scheduler = query_utility(IScheduler)
         if scheduler is not None:
@@ -303,12 +312,19 @@
             task.name = 'Planned archiving for {}'.format(ISequentialIdInfo(context).public_oid)
             task.schedule_mode = 'Date-style scheduling'
             pub_info = IWorkflowPublicationInfo(context)
+            now = gmtime(datetime.utcnow())
             schedule_info = IDateTaskScheduling(task)
             schedule_info.active = True
-            schedule_info.start_date = pub_info.publication_expiration_date
+            schedule_info.start_date = max(now + timedelta(seconds=10),
+                                           pub_info.publication_expiration_date)
             scheduler[task_id] = task
 
 
+def unpublish_action(wf, context):
+    """Remove automatic scheduler task when content is unpublished"""
+    remove_scheduler_task(context)
+
+
 def archive_action(wf, context):
     """Remove readers when a content is archived, and delete any scheduler task"""
     # remove readers
@@ -316,13 +332,7 @@
     if roles is not None:
         IRoleProtectedObject(context).revoke_role(READER_ROLE, roles.readers)
     # remove any scheduler task
-    scheduler = query_utility(IScheduler)
-    if scheduler is not None:
-        intids = get_utility(IIntIds)
-        context_id = intids.queryId(context)
-        task_id = 'workflow::{}'.format(context_id)
-        if task_id in scheduler:
-            del scheduler[task_id]
+    remove_scheduler_task(context)
 
 
 def clone_action(wf, context):
@@ -527,6 +537,7 @@
                                  destination=RETIRED,
                                  permission=PUBLISH_CONTENT_PERMISSION,
                                  condition=can_manage_content,
+                                 action=unpublish_action,
                                  menu_css_class='fa fa-fw fa-stop',
                                  view_name='wf-retire.html',
                                  history_label=_("Content retired"),
--- a/src/pyams_content/workflow/basic.py	Thu Jan 10 17:40:28 2019 +0100
+++ b/src/pyams_content/workflow/basic.py	Fri Jan 11 09:57:15 2019 +0100
@@ -12,7 +12,7 @@
 
 __docformat__ = 'restructuredtext'
 
-from datetime import datetime
+from datetime import datetime, timedelta
 
 from zope.copy import copy
 from zope.interface import implementer
@@ -20,6 +20,7 @@
 from zope.location import locate
 from zope.schema.vocabulary import SimpleTerm, SimpleVocabulary
 
+from pyams_content import _
 from pyams_content.interfaces import CREATE_VERSION_PERMISSION, MANAGER_ROLE, \
     MANAGE_CONTENT_PERMISSION, MANAGE_SITE_ROOT_PERMISSION, OWNER_ROLE, PILOT_ROLE, PUBLISH_CONTENT_PERMISSION, \
     READER_ROLE, WEBMASTER_ROLE
@@ -34,12 +35,11 @@
 from pyams_utils.date import format_datetime
 from pyams_utils.registry import get_current_registry, get_utility, query_utility, utility_config
 from pyams_utils.request import check_request
+from pyams_utils.timezone import gmtime
 from pyams_workflow.interfaces import IWorkflow, IWorkflowInfo, IWorkflowPublicationInfo, IWorkflowState, \
     IWorkflowStateLabel, IWorkflowVersions, ObjectClonedEvent, SYSTEM
 from pyams_workflow.workflow import Transition, Workflow
 
-from pyams_content import _
-
 
 DRAFT = 'draft'
 PRE_PUBLISHED = 'pre-published'
@@ -157,6 +157,17 @@
 # Workflow actions
 #
 
+def remove_scheduler_task(context):
+    """Remove any scheduler task for this context"""
+    scheduler = query_utility(IScheduler)
+    if scheduler is not None:
+        intids = get_utility(IIntIds)
+        context_id = intids.queryId(context)
+        task_id = 'workflow::{}'.format(context_id)
+        if task_id in scheduler:
+            del scheduler[task_id]
+
+
 def prepublish_action(wf, context):
     """Publish content with a future effective publication date
 
@@ -182,13 +193,7 @@
 
 def cancel_prepublish_action(wf, context):
     """Cancel pre-publication"""
-    scheduler = query_utility(IScheduler)
-    if scheduler is not None:
-        intids = get_utility(IIntIds)
-        context_id = intids.queryId(context)
-        task_id = 'workflow::{}'.format(context_id)
-        if task_id in scheduler:
-            del scheduler[task_id]
+    remove_scheduler_task(context)
 
 
 def publish_action(wf, context):
@@ -205,6 +210,9 @@
                                                           comment=translate(_("Published version {0}")).format(
                                                               version_id))
     # check expiration date and create auto-archiving task if needed
+    # we compare expiration date with current date to handle the case where content is
+    # published automatically at application startup, and we add a small amount of time
+    # to be sure that scheduler and indexer processes are started
     if publication_info.publication_expiration_date:
         scheduler = query_utility(IScheduler)
         if scheduler is not None:
@@ -217,9 +225,11 @@
             task.name = 'Planned archiving for {}'.format(ISequentialIdInfo(context).public_oid)
             task.schedule_mode = 'Date-style scheduling'
             pub_info = IWorkflowPublicationInfo(context)
+            now = gmtime(datetime.utcnow())
             schedule_info = IDateTaskScheduling(task)
             schedule_info.active = True
-            schedule_info.start_date = pub_info.publication_expiration_date
+            schedule_info.start_date = max(now + timedelta(seconds=10),
+                                           pub_info.publication_expiration_date)
             scheduler[task_id] = task
 
 
@@ -230,13 +240,7 @@
     if roles is not None:
         IRoleProtectedObject(context).revoke_role(READER_ROLE, roles.readers)
     # remove any scheduler task
-    scheduler = query_utility(IScheduler)
-    if scheduler is not None:
-        intids = get_utility(IIntIds)
-        context_id = intids.queryId(context)
-        task_id = 'workflow::{}'.format(context_id)
-        if task_id in scheduler:
-            del scheduler[task_id]
+    remove_scheduler_task(context)
 
 
 def clone_action(wf, context):
--- a/src/pyams_content/workflow/task.py	Thu Jan 10 17:40:28 2019 +0100
+++ b/src/pyams_content/workflow/task.py	Fri Jan 11 09:57:15 2019 +0100
@@ -118,9 +118,11 @@
                 continue
             now = gmtime(datetime.utcnow())
             if schedule_info.active and (schedule_info.start_date < now):
-                logger.debug(" - resetting task « {} »".format(task.name))
+                # we add a small amount of time to be sure that scheduler and indexer
+                # processes are started...
                 schedule_info.start_date = now + timedelta(seconds=10)
-                # commit task update for reset thread!!
+                # commit update for reset thread to get updated data!!
                 ITransactionManager(task).commit()
                 # start task resetting thread
+                logger.debug(" - restarting task « {} »".format(task.name))
                 TaskResettingThread(event.object, task).start()