src/pyams_security/plugin/userfolder.py
changeset 0 f04e1d0a0723
child 2 94e76f8e9828
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_security/plugin/userfolder.py	Thu Feb 19 10:53:29 2015 +0100
@@ -0,0 +1,216 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import base64
+import hashlib
+import hmac
+import random
+import sys
+from datetime import datetime
+from os import urandom
+
+# import interfaces
+from pyams_security.interfaces import ISecurityManager, IUsersFolderPlugin, IAdminAuthenticationPlugin, ILocalUser, \
+    IPrincipalInfo
+from zope.lifecycleevent.interfaces import IObjectAddedEvent
+from zope.password.interfaces import IPasswordManager
+from zope.schema.interfaces import IVocabularyRegistry
+
+# import packages
+from persistent import Persistent
+from pyams_security.principal import PrincipalInfo
+from pyams_utils.adapter import adapter_config
+from pyams_utils.registry import query_utility, get_utility
+from pyramid.events import subscriber
+from zope.container.contained import Contained
+from zope.container.folder import Folder
+from zope.interface import implementer, provider, Invalid
+from zope.schema.fieldproperty import FieldProperty
+from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm, getVocabularyRegistry
+
+from pyams_security import _
+
+
+@implementer(ILocalUser)
+class User(Persistent, Contained):
+    """Local user persistent class"""
+
+    login = FieldProperty(ILocalUser['login'])
+    email = FieldProperty(ILocalUser['email'])
+    firstname = FieldProperty(ILocalUser['firstname'])
+    lastname = FieldProperty(ILocalUser['lastname'])
+    company_name = FieldProperty(ILocalUser['company_name'])
+    _password = FieldProperty(ILocalUser['password'])
+    _password_salt = None
+    password_manager = FieldProperty(ILocalUser['password_manager'])
+    self_registered = FieldProperty(ILocalUser['self_registered'])
+    activation_secret = FieldProperty(ILocalUser['activation_secret'])
+    activation_hash = FieldProperty(ILocalUser['activation_hash'])
+    activation_date = FieldProperty(ILocalUser['activation_date'])
+    activated = FieldProperty(ILocalUser['activated'])
+
+    @property
+    def title(self):
+        return '{0} {1}'.format(self.firstname, self.lastname)
+
+    @property
+    def password(self):
+        return self._password
+
+    @password.setter
+    def password(self, value):
+        self._password_salt = urandom(4)
+        manager = get_utility(IPasswordManager, name=self.password_manager)
+        if self.password_manager == 'Plain Text':
+            self._password = manager.encodePassword(value)
+        else:
+            self._password = manager.encodePassword(value, salt=self._password_salt)
+
+    def check_password(self, password):
+        if not self.activated:
+            return False
+        manager = query_utility(IPasswordManager, name=self.password_manager)
+        if manager is None:
+            return False
+        return manager.checkPassword(self.password, password)
+
+    def generate_secret(self):
+        seed = self.activation_secret = '-'.join((hex(random.randint(0, sys.maxsize))[2:] for i in range(5)))
+        secret = hmac.new(self.password, self.login.encode(), digestmod=hashlib.sha256)
+        secret.update(seed.encode())
+        self.activation_hash = base64.b32encode(secret.digest()).decode()
+
+    def check_activation(self, hash, login, password):
+        if self.self_registered:
+            # If principal was registered by it's own, we check activation hash
+            # with given login and password
+            manager = get_utility(IPasswordManager, name=self.password_manager)
+            password = manager.encodePassword(password, salt=self._password_salt)
+            secret = hmac.new(password, login.encode(), digestmod=hashlib.sha256)
+            secret.update(self.activation_secret.encode())
+            activation_hash = base64.b32encode(secret.digest()).decode()
+            if hash != activation_hash:
+                raise Invalid(_("Can't activate profile with given params!"))
+        else:
+            # If principal was registered by a site manager, just check that
+            # hash is matching stored one and update user password...
+            if hash != self.activation_hash:
+                raise Invalid(_("Can't activate profile with given params!"))
+            self.password = password
+        self.activation_date = datetime.utcnow()
+        self.activated = True
+
+
+@adapter_config(context=ILocalUser, provides=IPrincipalInfo)
+def UserPrincipalInfoAdapter(user):
+    """User principal info adapter"""
+    return PrincipalInfo(id="{0}:{1}".format(user.__parent__.prefix, user.login),
+                         title=user.title)
+
+
+@implementer(IUsersFolderPlugin)
+class UsersFolder(Folder):
+    """Local users folder"""
+
+    prefix = FieldProperty(IAdminAuthenticationPlugin['prefix'])
+    title = FieldProperty(IAdminAuthenticationPlugin['title'])
+    enabled = FieldProperty(IAdminAuthenticationPlugin['enabled'])
+
+    def authenticate(self, credentials, request):
+        if not self.enabled:
+            return None
+        attrs = credentials.attributes
+        login = attrs.get('login')
+        principal = self.get(login)
+        if principal is not None:
+            password = attrs.get('password')
+            if principal.check_password(password):
+                return "{0}:{1}".format(self.prefix, principal.login)
+
+    def check_login(self, login):
+        if not login:
+            return False
+        return login not in self
+
+    def get_principal(self, principal_id):
+        if not self.enabled:
+            return None
+        if not principal_id.startswith(self.prefix + ':'):
+            return None
+        prefix, login = principal_id.split(':', 1)
+        if prefix != self.prefix:
+            return None
+        user = self.get(login)
+        if user is not None:
+            return PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login),
+                                 title=user.title)
+
+    def get_all_principals(self, principal_id):
+        if not self.enabled:
+            return set()
+        if self.get_principal(principal_id) is not None:
+            return {principal_id}
+        return set()
+
+    def find_principals(self, query):
+        # TODO: use users catalog for more efficient search?
+        if not query:
+            return None
+        query = query.lower()
+        for user in self.values():
+            if (query == user.login or
+                    query in user.firstname.lower() or
+                    query in user.lastname.lower()):
+                yield PrincipalInfo(id='{0}:{1}'.format(self.prefix, user.login),
+                                    title=user.title)
+
+    def get_search_results(self, data):
+        # TODO: use users catalog for more efficient search?
+        query = data.get('query')
+        if not query:
+            return ()
+        query = query.lower()
+        for user in self.values():
+            if (query == user.login or
+                    query in user.firstname.lower() or
+                    query in user.lastname.lower()):
+                yield user
+
+
+@provider(IVocabularyRegistry)
+class UsersFolderVocabulary(SimpleVocabulary):
+    """'PyAMS users folders' vocabulary"""
+
+    def __init__(self, *args, **kwargs):
+        terms = []
+        manager = query_utility(ISecurityManager)
+        if manager is not None:
+            for name, plugin in manager.items():
+                if IUsersFolderPlugin.providedBy(plugin):
+                    terms.append(SimpleTerm(name, title=plugin.title))
+        super(UsersFolderVocabulary, self).__init__(terms)
+
+getVocabularyRegistry().register('PyAMS users folders', UsersFolderVocabulary)
+
+
+@subscriber(IObjectAddedEvent, context_selector=ILocalUser)
+def handle_new_local_user(event):
+    """Send a confirmation message when a new user is recorded"""
+    user = event.object
+    if user.self_registered:
+        pass
+    else:
+        pass