--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_sequence/utility.py Wed Jun 17 09:59:43 2015 +0200
@@ -0,0 +1,144 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+
+# import interfaces
+from pyams_i18n.interfaces import II18n
+from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo
+
+try:
+ from pyams_workflow.interfaces import IWorkflowVersions, IWorkflowVersion, IWorkflowManagedContent
+except ImportError:
+ handle_workflow = False
+else:
+ handle_workflow = True
+
+from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent
+
+# import packages
+from pyams_utils.registry import query_utility, get_utility
+from pyramid.events import subscriber
+from zope.interface import implementer, Invalid
+from zope.intid import IntIds
+from zope.schema.fieldproperty import FieldProperty
+
+from pyams_sequence import _
+
+
+def get_last_version(content):
+ """Check for last available version"""
+ if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+ content = IWorkflowVersions(content).get_last_versions()[0]
+ if ISequentialIdInfo(content, None) is not None:
+ return content
+ else:
+ return None
+
+
+def get_version_in_state(content, state):
+ """Check for versions in given status"""
+ if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
+ versions = IWorkflowVersions(content).get_versions(state)
+ if versions:
+ content = versions[0]
+ if ISequentialIdInfo(content, None) is not None:
+ return content
+ else:
+ return None
+
+
+def get_sequence_dict(version, attribute='title', request=None):
+ """Get OID and label matching given version"""
+ sequence = get_utility(ISequentialIntIds)
+ info = ISequentialIdInfo(version)
+ return {'id': info.hex_oid,
+ 'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request),
+ sequence.get_short_oid(info.oid))}
+
+
+@implementer(ISequentialIntIds)
+class SequentialIntIds(IntIds):
+ """Sequential IntIds utility"""
+
+ prefix = FieldProperty(ISequentialIntIds['prefix'])
+ hex_oid_length = FieldProperty(ISequentialIntIds['hex_oid_length'])
+ _last_oid = FieldProperty(ISequentialIntIds['last_oid'])
+
+ @property
+ def last_oid(self):
+ return self._last_oid
+
+ @last_oid.setter
+ def last_oid(self, value):
+ if value < self._last_oid:
+ raise Invalid(_("Can't set last OID to value lower than current one!"))
+ self._last_oid = value
+
+ def _generateId(self):
+ self._last_oid += 1
+ return self._last_oid
+
+ def register(self, ob):
+ if not ISequentialIdTarget.providedBy(ob):
+ return None
+ return super(SequentialIntIds, self).register(ob)
+
+ def query_hex_oid(self, obj):
+ oid = self.queryId(obj)
+ if oid is not None:
+ return '{{prefix}}{{obj_prefix}}{{hex_id:0{length}x}}' \
+ .format(length=self.hex_oid_length) \
+ .format(prefix=self.prefix or '',
+ obj_prefix=getattr(obj, 'sequence_prefix', ''),
+ hex_id=oid)
+
+ def get_full_oid(self, oid, obj_prefix=None):
+ if self.prefix and oid.startswith(self.prefix or ''):
+ return oid
+ if oid.startswith('+'):
+ oid = oid[1:]
+ return '{prefix}{obj_prefix}{zeros}{hex_id}'.format(prefix=self.prefix or '',
+ obj_prefix=obj_prefix or '',
+ zeros='0' * (self.hex_oid_length - len(oid)),
+ hex_id=oid)
+
+ def get_short_oid(self, oid, obj_prefix=None):
+ return '{prefix}{obj_prefix} {hex_id:x}'.format(prefix=self.prefix or '',
+ obj_prefix=obj_prefix or '',
+ hex_id=oid)
+
+
+@subscriber(IObjectAddedEvent, context_selector=ISequentialIdTarget)
+def handle_added_intid_target(event):
+ """Handle added sequential ID target"""
+ target = event.object
+ sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
+ if sequence is not None:
+ info = ISequentialIdInfo(target)
+ if not info.oid: # Objects cloned inside a workflow may share the same ID...
+ info.oid = sequence.register(target)
+ info.hex_oid = sequence.query_hex_oid(target)
+
+
+@subscriber(IObjectRemovedEvent, context_selector=ISequentialIdTarget)
+def handle_removed_intid_target(event):
+ """Handle removed sequential ID target"""
+ target = event.object
+ sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
+ if sequence is not None:
+ info = ISequentialIdInfo(target)
+ if info.oid:
+ sequence.unregister(target)