--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_security/plugin/userfolder.py Thu Feb 19 10:53:29 2015 +0100
@@ -0,0 +1,216 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import base64
+import hashlib
+import hmac
+import random
+import sys
+from datetime import datetime
+from os import urandom
+
+# import interfaces
+from pyams_security.interfaces import ISecurityManager, IUsersFolderPlugin, IAdminAuthenticationPlugin, ILocalUser, \
+ IPrincipalInfo
+from zope.lifecycleevent.interfaces import IObjectAddedEvent
+from zope.password.interfaces import IPasswordManager
+from zope.schema.interfaces import IVocabularyRegistry
+
+# import packages
+from persistent import Persistent
+from pyams_security.principal import PrincipalInfo
+from pyams_utils.adapter import adapter_config
+from pyams_utils.registry import query_utility, get_utility
+from pyramid.events import subscriber
+from zope.container.contained import Contained
+from zope.container.folder import Folder
+from zope.interface import implementer, provider, Invalid
+from zope.schema.fieldproperty import FieldProperty
+from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm, getVocabularyRegistry
+
+from pyams_security import _
+
+
+@implementer(ILocalUser)
+class User(Persistent, Contained):
+ """Local user persistent class"""
+
+ login = FieldProperty(ILocalUser['login'])
+ email = FieldProperty(ILocalUser['email'])
+ firstname = FieldProperty(ILocalUser['firstname'])
+ lastname = FieldProperty(ILocalUser['lastname'])
+ company_name = FieldProperty(ILocalUser['company_name'])
+ _password = FieldProperty(ILocalUser['password'])
+ _password_salt = None
+ password_manager = FieldProperty(ILocalUser['password_manager'])
+ self_registered = FieldProperty(ILocalUser['self_registered'])
+ activation_secret = FieldProperty(ILocalUser['activation_secret'])
+ activation_hash = FieldProperty(ILocalUser['activation_hash'])
+ activation_date = FieldProperty(ILocalUser['activation_date'])
+ activated = FieldProperty(ILocalUser['activated'])
+
+ @property
+ def title(self):
+ return '{0} {1}'.format(self.firstname, self.lastname)
+
+ @property
+ def password(self):
+ return self._password
+
+ @password.setter
+ def password(self, value):
+ self._password_salt = urandom(4)
+ manager = get_utility(IPasswordManager, name=self.password_manager)
+ if self.password_manager == 'Plain Text':
+ self._password = manager.encodePassword(value)
+ else:
+ self._password = manager.encodePassword(value, salt=self._password_salt)
+
+ def check_password(self, password):
+ if not self.activated:
+ return False
+ manager = query_utility(IPasswordManager, name=self.password_manager)
+ if manager is None:
+ return False
+ return manager.checkPassword(self.password, password)
+
+ def generate_secret(self):
+ seed = self.activation_secret = '-'.join((hex(random.randint(0, sys.maxsize))[2:] for i in range(5)))
+ secret = hmac.new(self.password, self.login.encode(), digestmod=hashlib.sha256)
+ secret.update(seed.encode())
+ self.activation_hash = base64.b32encode(secret.digest()).decode()
+
+ def check_activation(self, hash, login, password):
+ if self.self_registered:
+ # If principal was registered by it's own, we check activation hash
+ # with given login and password
+ manager = get_utility(IPasswordManager, name=self.password_manager)
+ password = manager.encodePassword(password, salt=self._password_salt)
+ secret = hmac.new(password, login.encode(), digestmod=hashlib.sha256)
+ secret.update(self.activation_secret.encode())
+ activation_hash = base64.b32encode(secret.digest()).decode()
+ if hash != activation_hash:
+ raise Invalid(_("Can't activate profile with given params!"))
+ else:
+ # If principal was registered by a site manager, just check that
+ # hash is matching stored one and update user password...
+ if hash != self.activation_hash:
+ raise Invalid(_("Can't activate profile with given params!"))
+ self.password = password
+ self.activation_date = datetime.utcnow()
+ self.activated = True
+
+
+@adapter_config(context=ILocalUser, provides=IPrincipalInfo)
+def UserPrincipalInfoAdapter(user):
+ """User principal info adapter"""
+ return PrincipalInfo(id="{0}:{1}".format(user.__parent__.prefix, user.login),
+ title=user.title)
+
+
+@implementer(IUsersFolderPlugin)
+class UsersFolder(Folder):
+ """Local users folder"""
+
+ prefix = FieldProperty(IAdminAuthenticationPlugin['prefix'])
+ title = FieldProperty(IAdminAuthenticationPlugin['title'])
+ enabled = FieldProperty(IAdminAuthenticationPlugin['enabled'])
+
+ def authenticate(self, credentials, request):
+ if not self.enabled:
+ return None
+ attrs = credentials.attributes
+ login = attrs.get('login')
+ principal = self.get(login)
+ if principal is not None:
+ password = attrs.get('password')
+ if principal.check_password(password):
+ return "{0}:{1}".format(self.prefix, principal.login)
+
+ def check_login(self, login):
+ if not login:
+ return False
+ return login not in self
+
+ def get_principal(self, principal_id):
+ if not self.enabled:
+ return None
+ if not principal_id.startswith(self.prefix + ':'):
+ return None
+ prefix, login = principal_id.split(':', 1)
+ if prefix != self.prefix:
+ return None
+ user = self.get(login)
+ if user is not None:
+ return PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login),
+ title=user.title)
+
+ def get_all_principals(self, principal_id):
+ if not self.enabled:
+ return set()
+ if self.get_principal(principal_id) is not None:
+ return {principal_id}
+ return set()
+
+ def find_principals(self, query):
+ # TODO: use users catalog for more efficient search?
+ if not query:
+ return None
+ query = query.lower()
+ for user in self.values():
+ if (query == user.login or
+ query in user.firstname.lower() or
+ query in user.lastname.lower()):
+ yield PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login),
+ title=user.title)
+
+ def get_search_results(self, data):
+ # TODO: use users catalog for more efficient search?
+ query = data.get('query')
+ if not query:
+ return ()
+ query = query.lower()
+ for user in self.values():
+ if (query == user.login or
+ query in user.firstname.lower() or
+ query in user.lastname.lower()):
+ yield user
+
+
+@provider(IVocabularyRegistry)
+class UsersFolderVocabulary(SimpleVocabulary):
+ """'PyAMS users folders' vocabulary"""
+
+ def __init__(self, *args, **kwargs):
+ terms = []
+ manager = query_utility(ISecurityManager)
+ if manager is not None:
+ for name, plugin in manager.items():
+ if IUsersFolderPlugin.providedBy(plugin):
+ terms.append(SimpleTerm(name, title=plugin.title))
+ super(UsersFolderVocabulary, self).__init__(terms)
+
+getVocabularyRegistry().register('PyAMS users folders', UsersFolderVocabulary)
+
+
+@subscriber(IObjectAddedEvent, context_selector=ILocalUser)
+def handle_new_local_user(event):
+ """Send a confirmation message when a new user is recorded"""
+ user = event.object
+ if user.self_registered:
+ pass
+ else:
+ pass