src/pyams_sequence/utility.py
changeset 0 980ffaa51a75
child 10 aa2472483e00
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_sequence/utility.py	Wed Jun 17 09:59:43 2015 +0200
@@ -0,0 +1,144 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+
+# import interfaces
+from pyams_i18n.interfaces import II18n
+from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo
+
+try:
+    from pyams_workflow.interfaces import IWorkflowVersions, IWorkflowVersion, IWorkflowManagedContent
+except ImportError:
+    handle_workflow = False
+else:
+    handle_workflow = True
+
+from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent
+
+# import packages
+from pyams_utils.registry import query_utility, get_utility
+from pyramid.events import subscriber
+from zope.interface import implementer, Invalid
+from zope.intid import IntIds
+from zope.schema.fieldproperty import FieldProperty
+
+from pyams_sequence import _
+
+
+def get_last_version(content):
+    """Check for last available version"""
+    if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+        content = IWorkflowVersions(content).get_last_versions()[0]
+    if ISequentialIdInfo(content, None) is not None:
+        return content
+    else:
+        return None
+
+
+def get_version_in_state(content, state):
+    """Check for versions in given status"""
+    if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+        versions = IWorkflowVersions(content).get_versions(state)
+        if versions:
+            content = versions[0]
+    if ISequentialIdInfo(content, None) is not None:
+        return content
+    else:
+        return None
+
+
+def get_sequence_dict(version, attribute='title', request=None):
+    """Get OID and label matching given version"""
+    sequence = get_utility(ISequentialIntIds)
+    info = ISequentialIdInfo(version)
+    return {'id': info.hex_oid,
+            'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request),
+                                       sequence.get_short_oid(info.oid))}
+
+
+@implementer(ISequentialIntIds)
+class SequentialIntIds(IntIds):
+    """Sequential IntIds utility"""
+
+    prefix = FieldProperty(ISequentialIntIds['prefix'])
+    hex_oid_length = FieldProperty(ISequentialIntIds['hex_oid_length'])
+    _last_oid = FieldProperty(ISequentialIntIds['last_oid'])
+
+    @property
+    def last_oid(self):
+        return self._last_oid
+
+    @last_oid.setter
+    def last_oid(self, value):
+        if value < self._last_oid:
+            raise Invalid(_("Can't set last OID to value lower than current one!"))
+        self._last_oid = value
+
+    def _generateId(self):
+        self._last_oid += 1
+        return self._last_oid
+
+    def register(self, ob):
+        if not ISequentialIdTarget.providedBy(ob):
+            return None
+        return super(SequentialIntIds, self).register(ob)
+
+    def query_hex_oid(self, obj):
+        oid = self.queryId(obj)
+        if oid is not None:
+            return '{{prefix}}{{obj_prefix}}{{hex_id:0{length}x}}' \
+                .format(length=self.hex_oid_length) \
+                .format(prefix=self.prefix or '',
+                        obj_prefix=getattr(obj, 'sequence_prefix', ''),
+                        hex_id=oid)
+
+    def get_full_oid(self, oid, obj_prefix=None):
+        if self.prefix and oid.startswith(self.prefix or ''):
+            return oid
+        if oid.startswith('+'):
+            oid = oid[1:]
+        return '{prefix}{obj_prefix}{zeros}{hex_id}'.format(prefix=self.prefix or '',
+                                                            obj_prefix=obj_prefix or '',
+                                                            zeros='0' * (self.hex_oid_length - len(oid)),
+                                                            hex_id=oid)
+
+    def get_short_oid(self, oid, obj_prefix=None):
+        return '{prefix}{obj_prefix} {hex_id:x}'.format(prefix=self.prefix or '',
+                                                        obj_prefix=obj_prefix or '',
+                                                        hex_id=oid)
+
+
+@subscriber(IObjectAddedEvent, context_selector=ISequentialIdTarget)
+def handle_added_intid_target(event):
+    """Handle added sequential ID target"""
+    target = event.object
+    sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
+    if sequence is not None:
+        info = ISequentialIdInfo(target)
+        if not info.oid:  # Objects cloned inside a workflow may share the same ID...
+            info.oid = sequence.register(target)
+            info.hex_oid = sequence.query_hex_oid(target)
+
+
+@subscriber(IObjectRemovedEvent, context_selector=ISequentialIdTarget)
+def handle_removed_intid_target(event):
+    """Handle removed sequential ID target"""
+    target = event.object
+    sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
+    if sequence is not None:
+        info = ISequentialIdInfo(target)
+        if info.oid:
+            sequence.unregister(target)