14 |
14 |
15 |
15 |
16 # import standard library |
16 # import standard library |
17 |
17 |
18 # import interfaces |
18 # import interfaces |
19 from hypatia.interfaces import ICatalog |
|
20 from pyams_i18n.interfaces import II18n |
|
21 from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo |
19 from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo |
22 from pyams_skin.layer import IPyAMSUserLayer |
|
23 |
|
24 try: |
|
25 from pyams_workflow.interfaces import IWorkflowVersions, IWorkflowVersion, IWorkflowManagedContent, IWorkflow, \ |
|
26 IWorkflowPublicationInfo |
|
27 except ImportError: |
|
28 handle_workflow = False |
|
29 else: |
|
30 handle_workflow = True |
|
31 |
|
32 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent |
20 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent |
33 |
21 |
34 # import packages |
22 # import packages |
35 from hypatia.catalog import CatalogQuery |
23 from pyams_utils.registry import query_utility |
36 from hypatia.query import Eq, Any |
|
37 from pyams_catalog.query import CatalogResultSet |
|
38 from pyams_utils.registry import query_utility, get_utility |
|
39 from pyams_utils.request import check_request |
|
40 from pyramid.events import subscriber |
24 from pyramid.events import subscriber |
41 from zope.interface import implementer, Invalid |
25 from zope.interface import implementer, Invalid |
42 from zope.intid import IntIds |
26 from zope.intid import IntIds |
43 from zope.schema.fieldproperty import FieldProperty |
27 from zope.schema.fieldproperty import FieldProperty |
44 |
28 |
45 from pyams_sequence import _ |
29 from pyams_sequence import _ |
46 |
|
47 |
|
48 def get_last_version(content): |
|
49 """Check for last available version""" |
|
50 if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)): |
|
51 content = IWorkflowVersions(content).get_last_versions()[0] |
|
52 if ISequentialIdInfo(content, None) is not None: |
|
53 return content |
|
54 else: |
|
55 return None |
|
56 |
|
57 |
|
58 def get_visible_version(content): |
|
59 """Check for visible version""" |
|
60 if handle_workflow: |
|
61 if IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content): |
|
62 workflow = IWorkflow(content) |
|
63 versions = IWorkflowVersions(content).get_versions(workflow.published_states, sort=True) |
|
64 if versions: |
|
65 return versions[-1] |
|
66 publication_info = IWorkflowPublicationInfo(content, None) |
|
67 if publication_info is not None: |
|
68 if publication_info.is_visible(): |
|
69 return content |
|
70 else: |
|
71 return None |
|
72 return content |
|
73 |
|
74 |
|
75 def get_version_in_state(content, state): |
|
76 """Check for versions in given status""" |
|
77 if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)): |
|
78 versions = IWorkflowVersions(content).get_versions(state) |
|
79 if versions: |
|
80 content = versions[0] |
|
81 if ISequentialIdInfo(content, None) is not None: |
|
82 return content |
|
83 else: |
|
84 return None |
|
85 |
|
86 |
|
87 def get_sequence_dict(version, attribute='title', request=None): |
|
88 """Get OID and label matching given version""" |
|
89 sequence = get_utility(ISequentialIntIds) |
|
90 info = ISequentialIdInfo(version) |
|
91 return {'id': info.hex_oid, |
|
92 'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request), |
|
93 sequence.get_short_oid(info.oid))} |
|
94 |
|
95 |
|
96 def get_sequence_target(oid, state): |
|
97 """Get content matching given OID""" |
|
98 sequence = get_utility(ISequentialIntIds) |
|
99 content = sequence.query_object_from_oid(oid) |
|
100 if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)): |
|
101 versions = IWorkflowVersions(content).get_versions(state, sort=True) |
|
102 if versions: |
|
103 content = versions[0] |
|
104 return content |
|
105 |
|
106 |
|
107 def get_reference_target(reference, state=None, request=None): |
|
108 """Get target of given reference OID""" |
|
109 catalog = get_utility(ICatalog) |
|
110 params = Eq(catalog['oid'], reference) |
|
111 results = list(CatalogResultSet(CatalogQuery(catalog).query(params))) |
|
112 if results: |
|
113 if state: |
|
114 results = list(filter(lambda x: get_version_in_state(x, state), results)) |
|
115 else: |
|
116 if request is None: |
|
117 request = check_request() |
|
118 if IPyAMSUserLayer.providedBy(request): |
|
119 getter = get_visible_version |
|
120 else: |
|
121 getter = get_last_version |
|
122 results = list(map(getter, results)) |
|
123 if results: |
|
124 return results[0] |
|
125 |
30 |
126 |
31 |
127 @implementer(ISequentialIntIds) |
32 @implementer(ISequentialIntIds) |
128 class SequentialIntIds(IntIds): |
33 class SequentialIntIds(IntIds): |
129 """Sequential IntIds utility""" |
34 """Sequential IntIds utility""" |