|
1 # |
|
2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net> |
|
3 # All Rights Reserved. |
|
4 # |
|
5 # This software is subject to the provisions of the Zope Public License, |
|
6 # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|
10 # FOR A PARTICULAR PURPOSE. |
|
11 # |
|
12 |
|
13 __docformat__ = 'restructuredtext' |
|
14 |
|
15 |
|
16 # import standard library |
|
17 import base64 |
|
18 import hashlib |
|
19 import hmac |
|
20 import random |
|
21 import sys |
|
22 from datetime import datetime |
|
23 from os import urandom |
|
24 |
|
25 # import interfaces |
|
26 from pyams_security.interfaces import ISecurityManager, IUsersFolderPlugin, IAdminAuthenticationPlugin, ILocalUser, \ |
|
27 IPrincipalInfo |
|
28 from zope.lifecycleevent.interfaces import IObjectAddedEvent |
|
29 from zope.password.interfaces import IPasswordManager |
|
30 from zope.schema.interfaces import IVocabularyRegistry |
|
31 |
|
32 # import packages |
|
33 from persistent import Persistent |
|
34 from pyams_security.principal import PrincipalInfo |
|
35 from pyams_utils.adapter import adapter_config |
|
36 from pyams_utils.registry import query_utility, get_utility |
|
37 from pyramid.events import subscriber |
|
38 from zope.container.contained import Contained |
|
39 from zope.container.folder import Folder |
|
40 from zope.interface import implementer, provider, Invalid |
|
41 from zope.schema.fieldproperty import FieldProperty |
|
42 from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm, getVocabularyRegistry |
|
43 |
|
44 from pyams_security import _ |
|
45 |
|
46 |
|
47 @implementer(ILocalUser) |
|
48 class User(Persistent, Contained): |
|
49 """Local user persistent class""" |
|
50 |
|
51 login = FieldProperty(ILocalUser['login']) |
|
52 email = FieldProperty(ILocalUser['email']) |
|
53 firstname = FieldProperty(ILocalUser['firstname']) |
|
54 lastname = FieldProperty(ILocalUser['lastname']) |
|
55 company_name = FieldProperty(ILocalUser['company_name']) |
|
56 _password = FieldProperty(ILocalUser['password']) |
|
57 _password_salt = None |
|
58 password_manager = FieldProperty(ILocalUser['password_manager']) |
|
59 self_registered = FieldProperty(ILocalUser['self_registered']) |
|
60 activation_secret = FieldProperty(ILocalUser['activation_secret']) |
|
61 activation_hash = FieldProperty(ILocalUser['activation_hash']) |
|
62 activation_date = FieldProperty(ILocalUser['activation_date']) |
|
63 activated = FieldProperty(ILocalUser['activated']) |
|
64 |
|
65 @property |
|
66 def title(self): |
|
67 return '{0} {1}'.format(self.firstname, self.lastname) |
|
68 |
|
69 @property |
|
70 def password(self): |
|
71 return self._password |
|
72 |
|
73 @password.setter |
|
74 def password(self, value): |
|
75 self._password_salt = urandom(4) |
|
76 manager = get_utility(IPasswordManager, name=self.password_manager) |
|
77 if self.password_manager == 'Plain Text': |
|
78 self._password = manager.encodePassword(value) |
|
79 else: |
|
80 self._password = manager.encodePassword(value, salt=self._password_salt) |
|
81 |
|
82 def check_password(self, password): |
|
83 if not self.activated: |
|
84 return False |
|
85 manager = query_utility(IPasswordManager, name=self.password_manager) |
|
86 if manager is None: |
|
87 return False |
|
88 return manager.checkPassword(self.password, password) |
|
89 |
|
90 def generate_secret(self): |
|
91 seed = self.activation_secret = '-'.join((hex(random.randint(0, sys.maxsize))[2:] for i in range(5))) |
|
92 secret = hmac.new(self.password, self.login.encode(), digestmod=hashlib.sha256) |
|
93 secret.update(seed.encode()) |
|
94 self.activation_hash = base64.b32encode(secret.digest()).decode() |
|
95 |
|
96 def check_activation(self, hash, login, password): |
|
97 if self.self_registered: |
|
98 # If principal was registered by it's own, we check activation hash |
|
99 # with given login and password |
|
100 manager = get_utility(IPasswordManager, name=self.password_manager) |
|
101 password = manager.encodePassword(password, salt=self._password_salt) |
|
102 secret = hmac.new(password, login.encode(), digestmod=hashlib.sha256) |
|
103 secret.update(self.activation_secret.encode()) |
|
104 activation_hash = base64.b32encode(secret.digest()).decode() |
|
105 if hash != activation_hash: |
|
106 raise Invalid(_("Can't activate profile with given params!")) |
|
107 else: |
|
108 # If principal was registered by a site manager, just check that |
|
109 # hash is matching stored one and update user password... |
|
110 if hash != self.activation_hash: |
|
111 raise Invalid(_("Can't activate profile with given params!")) |
|
112 self.password = password |
|
113 self.activation_date = datetime.utcnow() |
|
114 self.activated = True |
|
115 |
|
116 |
|
117 @adapter_config(context=ILocalUser, provides=IPrincipalInfo) |
|
118 def UserPrincipalInfoAdapter(user): |
|
119 """User principal info adapter""" |
|
120 return PrincipalInfo(id="{0}:{1}".format(user.__parent__.prefix, user.login), |
|
121 title=user.title) |
|
122 |
|
123 |
|
124 @implementer(IUsersFolderPlugin) |
|
125 class UsersFolder(Folder): |
|
126 """Local users folder""" |
|
127 |
|
128 prefix = FieldProperty(IAdminAuthenticationPlugin['prefix']) |
|
129 title = FieldProperty(IAdminAuthenticationPlugin['title']) |
|
130 enabled = FieldProperty(IAdminAuthenticationPlugin['enabled']) |
|
131 |
|
132 def authenticate(self, credentials, request): |
|
133 if not self.enabled: |
|
134 return None |
|
135 attrs = credentials.attributes |
|
136 login = attrs.get('login') |
|
137 principal = self.get(login) |
|
138 if principal is not None: |
|
139 password = attrs.get('password') |
|
140 if principal.check_password(password): |
|
141 return "{0}:{1}".format(self.prefix, principal.login) |
|
142 |
|
143 def check_login(self, login): |
|
144 if not login: |
|
145 return False |
|
146 return login not in self |
|
147 |
|
148 def get_principal(self, principal_id): |
|
149 if not self.enabled: |
|
150 return None |
|
151 if not principal_id.startswith(self.prefix + ':'): |
|
152 return None |
|
153 prefix, login = principal_id.split(':', 1) |
|
154 if prefix != self.prefix: |
|
155 return None |
|
156 user = self.get(login) |
|
157 if user is not None: |
|
158 return PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login), |
|
159 title=user.title) |
|
160 |
|
161 def get_all_principals(self, principal_id): |
|
162 if not self.enabled: |
|
163 return set() |
|
164 if self.get_principal(principal_id) is not None: |
|
165 return {principal_id} |
|
166 return set() |
|
167 |
|
168 def find_principals(self, query): |
|
169 # TODO: use users catalog for more efficient search? |
|
170 if not query: |
|
171 return None |
|
172 query = query.lower() |
|
173 for user in self.values(): |
|
174 if (query == user.login or |
|
175 query in user.firstname.lower() or |
|
176 query in user.lastname.lower()): |
|
177 yield PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login), |
|
178 title=user.title) |
|
179 |
|
180 def get_search_results(self, data): |
|
181 # TODO: use users catalog for more efficient search? |
|
182 query = data.get('query') |
|
183 if not query: |
|
184 return () |
|
185 query = query.lower() |
|
186 for user in self.values(): |
|
187 if (query == user.login or |
|
188 query in user.firstname.lower() or |
|
189 query in user.lastname.lower()): |
|
190 yield user |
|
191 |
|
192 |
|
193 @provider(IVocabularyRegistry) |
|
194 class UsersFolderVocabulary(SimpleVocabulary): |
|
195 """'PyAMS users folders' vocabulary""" |
|
196 |
|
197 def __init__(self, *args, **kwargs): |
|
198 terms = [] |
|
199 manager = query_utility(ISecurityManager) |
|
200 if manager is not None: |
|
201 for name, plugin in manager.items(): |
|
202 if IUsersFolderPlugin.providedBy(plugin): |
|
203 terms.append(SimpleTerm(name, title=plugin.title)) |
|
204 super(UsersFolderVocabulary, self).__init__(terms) |
|
205 |
|
206 getVocabularyRegistry().register('PyAMS users folders', UsersFolderVocabulary) |
|
207 |
|
208 |
|
209 @subscriber(IObjectAddedEvent, context_selector=ILocalUser) |
|
210 def handle_new_local_user(event): |
|
211 """Send a confirmation message when a new user is recorded""" |
|
212 user = event.object |
|
213 if user.self_registered: |
|
214 pass |
|
215 else: |
|
216 pass |