--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_sequence/reference.py Thu Jul 12 16:00:06 2018 +0200
@@ -0,0 +1,146 @@
+#
+# Copyright (c) 2008-2018 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+
+# import interfaces
+from hypatia.interfaces import ICatalog
+from pyams_i18n.interfaces import II18n
+from pyams_sequence.interfaces import IInternalReference, ISequentialIdInfo, ISequentialIntIds
+from pyams_skin.layer import IPyAMSUserLayer
+from zope.lifecycleevent.interfaces import IObjectModifiedEvent
+
+try:
+ from pyams_workflow.interfaces import IWorkflow, IWorkflowVersions, IWorkflowVersion, \
+ IWorkflowState, IWorkflowManagedContent, IWorkflowPublicationInfo, IWorkflowTransitionEvent
+except ImportError:
+ handle_workflow = False
+else:
+ handle_workflow = True
+
+# import packages
+from hypatia.catalog import CatalogQuery
+from hypatia.query import Eq
+from pyams_catalog.query import CatalogResultSet
+from pyams_utils.registry import get_utility
+from pyams_utils.request import check_request
+from pyramid.events import subscriber
+
+
+@subscriber(IObjectModifiedEvent, context_selector=IInternalReference)
+def handle_modified_reference(event):
+ """Handle modified reference"""
+ for description in event.descriptions:
+ for attribute in description.attributes:
+ if attribute == 'reference':
+ del event.object.target
+ return
+
+
+if handle_workflow:
+ # Target is handled as a volatile property to keep references in memory
+ # Reference must be updated when a content is published or retired while being in version
+ # greater than 1
+ @subscriber(IWorkflowTransitionEvent)
+ def handle_workflow_transition(event):
+ """Handle workflow transition to update internal references"""
+ workflow_state = IWorkflowState(event.object, None)
+ if (workflow_state is None) or (workflow_state.version_id == 1):
+ return # don't update references on first version
+ sequence_info = ISequentialIdInfo(event.object, None)
+ if sequence_info is not None:
+ catalog = get_utility(ICatalog)
+ params = Eq(catalog['link_reference'], sequence_info.hex_oid)
+ for result in CatalogResultSet(CatalogQuery(catalog).query(params)):
+ del result.target
+
+
+def get_last_version(content):
+ """Check for last available version"""
+ if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+ content = IWorkflowVersions(content).get_last_versions()[0]
+ if ISequentialIdInfo(content, None) is not None:
+ return content
+ else:
+ return None
+
+
+def get_visible_version(content):
+ """Check for visible version"""
+ if handle_workflow:
+ if IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content):
+ workflow = IWorkflow(content)
+ versions = IWorkflowVersions(content).get_versions(workflow.published_states, sort=True)
+ if versions:
+ return versions[-1]
+ publication_info = IWorkflowPublicationInfo(content, None)
+ if publication_info is not None:
+ if publication_info.is_visible():
+ return content
+ else:
+ return None
+ return content
+
+
+def get_version_in_state(content, state):
+ """Check for versions in given status"""
+ if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+ versions = IWorkflowVersions(content).get_versions(state)
+ if versions:
+ content = versions[0]
+ if ISequentialIdInfo(content, None) is not None:
+ return content
+ else:
+ return None
+
+
+def get_sequence_dict(version, attribute='title', request=None):
+ """Get OID and label matching given version"""
+ sequence = get_utility(ISequentialIntIds)
+ info = ISequentialIdInfo(version)
+ return {'id': info.hex_oid,
+ 'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request),
+ sequence.get_short_oid(info.oid))}
+
+
+def get_sequence_target(oid, state):
+ """Get content matching given OID"""
+ sequence = get_utility(ISequentialIntIds)
+ content = sequence.query_object_from_oid(oid)
+ if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+ versions = IWorkflowVersions(content).get_versions(state, sort=True)
+ if versions:
+ content = versions[0]
+ return content
+
+
+def get_reference_target(reference, state=None, request=None):
+ """Get target of given reference OID"""
+ catalog = get_utility(ICatalog)
+ params = Eq(catalog['oid'], reference)
+ results = list(CatalogResultSet(CatalogQuery(catalog).query(params)))
+ if results:
+ if state:
+ results = list(filter(lambda x: get_version_in_state(x, state), results))
+ else:
+ if request is None:
+ request = check_request()
+ if IPyAMSUserLayer.providedBy(request):
+ getter = get_visible_version
+ else:
+ getter = get_last_version
+ results = list(map(getter, results))
+ if results:
+ return results[0]