--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_utils/zodb.py Wed Dec 05 12:45:56 2018 +0100
@@ -0,0 +1,312 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+
+# import interfaces
+from persistent.interfaces import IPersistent
+from pyams_utils.interfaces.site import IOptionalUtility
+from pyams_utils.interfaces.zeo import IZEOConnection
+from transaction.interfaces import ITransactionManager
+from ZODB.interfaces import IConnection
+from zope.annotation.interfaces import IAttributeAnnotatable
+from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent
+
+# import packages
+from persistent import Persistent
+from pyams_utils.adapter import adapter_config
+from pyams_utils.registry import get_utilities_for, get_global_registry
+from pyams_utils.vocabulary import vocabulary_config
+from pyramid.events import subscriber
+from pyramid_zodbconn import get_uris, db_from_uri
+from ZEO import DB
+from zope.container.contained import Contained
+from zope.interface import implementer
+from zope.schema import getFieldNames
+from zope.schema.fieldproperty import FieldProperty
+from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm
+
+
+@adapter_config(context=IPersistent, provides=IConnection)
+def persistent_connection(obj):
+ """An adapter which gets a ZODB connection from a persistent object
+
+ We are assuming the object has a parent if it has been created in
+ this transaction.
+
+ Raises ValueError if it is impossible to get a connection.
+ """
+ cur = obj
+ while not getattr(cur, '_p_jar', None):
+ cur = getattr(cur, '__parent__', None)
+ if cur is None:
+ return None
+ return cur._p_jar
+
+
+# IPersistent adapters copied from zc.twist package
+# also register this for adapting from IConnection
+@adapter_config(context=IPersistent, provides=ITransactionManager)
+def persistent_transaction_manager(obj):
+ conn = IConnection(obj) # typically this will be
+ # zope.keyreference.persistent.connectionOfPersistent
+ try:
+ return conn.transaction_manager
+ except AttributeError:
+ return conn._txn_mgr
+ # or else we give up; who knows. transaction_manager is the more
+ # recent spelling.
+
+
+#
+# ZEO connection management
+#
+
+@implementer(IZEOConnection)
+class ZEOConnection(object):
+ """ZEO connection object
+
+ This object can be used to store all settings to be able to open a ZEO connection.
+ Note that this class is required only for tasks specifically targeting a ZEO database connection (like a ZEO
+ packer scheduler task); for generic ZODB operations, just use a :class:`ZODBConnection` class defined through
+ Pyramid's configuration file.
+
+ Note that a ZEO connection object is a context manager, so you can use it like this:
+
+ .. code-block:: python
+
+ from pyams_utils.zodb import ZEOConnection
+
+ def my_method(zeo_settings):
+ zeo_connection = ZEOConnection()
+ zeo_connection.update(zeo_settings)
+ with zeo_connection as root:
+ # *root* is then the ZODB root object
+ # do whatever you want with ZEO connection,
+ # which is closed automatically
+ """
+
+ _storage = None
+ _db = None
+ _connection = None
+
+ name = FieldProperty(IZEOConnection['name'])
+ server_name = FieldProperty(IZEOConnection['server_name'])
+ server_port = FieldProperty(IZEOConnection['server_port'])
+ storage = FieldProperty(IZEOConnection['storage'])
+ username = FieldProperty(IZEOConnection['username'])
+ password = FieldProperty(IZEOConnection['password'])
+ server_realm = FieldProperty(IZEOConnection['server_realm'])
+ blob_dir = FieldProperty(IZEOConnection['blob_dir'])
+ shared_blob_dir = FieldProperty(IZEOConnection['shared_blob_dir'])
+
+ def get_settings(self):
+ """Get mapping of all connection settings
+
+ These settings can be converted to JSON and sent to another process, for example
+ via a ØMQ connection.
+
+ :return: dict
+ """
+ result = {}
+ for name in getFieldNames(IZEOConnection):
+ result[name] = getattr(self, name)
+ return result
+
+ def update(self, settings):
+ """Update connection properties with settings as *dict*
+
+ :param dict settings: typically extracted via the :py:meth:`get_settings` method from
+ another process
+ """
+ names = getFieldNames(IZEOConnection)
+ for key, value in settings.items():
+ if key in names:
+ setattr(self, key, value)
+
+ def get_connection(self, wait_timeout=30, get_storage=False):
+ """Create ZEO client connection from current settings
+
+ :param boolean wait_timeout: connection timeout, in seconds
+ :param boolean get_storage: if *True*, the method should return a tuple containing
+ storage and DB objects; otherwise only DB object is returned
+ :return: tuple containing ZEO client storage and DB object (if *get_storage* argument is
+ set to *True*), or only DB object otherwise
+ """
+ db = DB((self.server_name, self.server_port),
+ storage=self.storage,
+ username=self.username or '',
+ password=self.password or '',
+ realm=self.server_realm,
+ blob_dir=self.blob_dir,
+ shared_blob_dir=self.shared_blob_dir,
+ wait_timeout=wait_timeout)
+ return (db.storage, db) if get_storage else db
+
+ @property
+ def connection(self):
+ return self._connection
+
+ # Context manager methods
+ def __enter__(self):
+ self._storage, self._db = self.get_connection(get_storage=True)
+ self._connection = self._db.open_then_close_db_when_connection_closes()
+ return self._connection.root()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self._connection is not None:
+ self._connection.close()
+ if self._storage is not None:
+ self._storage.close()
+
+
+@implementer(IOptionalUtility, IAttributeAnnotatable)
+class ZEOConnectionUtility(ZEOConnection, Persistent, Contained):
+ """Persistent ZEO connection utility"""
+
+
+@subscriber(IObjectAddedEvent, context_selector=IZEOConnection)
+def handle_added_connection(event):
+ """Register new ZEO connection when added"""
+ manager = event.newParent
+ manager.registerUtility(event.object, IZEOConnection, name=event.object.name)
+
+
+@subscriber(IObjectRemovedEvent, context_selector=IZEOConnection)
+def handle_removed_connection(event):
+ """Un-register ZEO connection when deleted"""
+ manager = event.oldParent
+ manager.unregisterUtility(event.object, IZEOConnection, name=event.object.name)
+
+
+@vocabulary_config(name='PyAMS ZEO connections')
+class ZEOConnectionVocabulary(SimpleVocabulary):
+ """ZEO connections vocabulary"""
+
+ def __init__(self, context=None):
+ terms = [SimpleTerm(name, title=util.name) for name, util in get_utilities_for(IZEOConnection)]
+ super(ZEOConnectionVocabulary, self).__init__(terms)
+
+
+def get_connection_from_settings(settings=None):
+ """Load connection matching registry settings"""
+ if settings is None:
+ settings = get_global_registry().settings
+ for name, uri in get_uris(settings):
+ db = db_from_uri(uri, name, {})
+ return db.open()
+
+
+class ZODBConnection(object):
+ """ZODB connection wrapper
+
+ Connections are extracted from Pyramid's settings file in *zodbconn.uri* entries.
+
+ Note that a ZODB connection object is a context manager, so you can use it like this:
+
+ .. code-block:: python
+
+ from pyams_utils.zodb import ZODBConnection
+
+ def my_method(zodb_name):
+ zodb_connection = ZODBConnection(zodb_name)
+ with zodb_connection as root:
+ # *root* is then the ZODB root object
+ # do whatever you want with ZODB connection,
+ # which is closed automatically
+ """
+
+ def __init__(self, name='', settings=None):
+ self.name = name or ''
+ if not settings:
+ settings = get_global_registry().settings
+ self.settings = settings
+
+ _connection = None
+ _db = None
+ _storage = None
+
+ @property
+ def connection(self):
+ return self._connection
+
+ @property
+ def db(self):
+ return self._db
+
+ @property
+ def storage(self):
+ return self._storage
+
+ def get_connection(self):
+ """Load named connection matching registry settings"""
+ for name, uri in get_uris(self.settings):
+ if name == self.name:
+ db = db_from_uri(uri, name, {})
+ connection = self._connection = db.open()
+ self._db = connection.db()
+ self._storage = self.db.storage
+ return connection
+
+ def close(self):
+ self._connection.close()
+ self._db.close()
+ self._storage.close()
+
+ # Context manager methods
+ def __enter__(self):
+ connection = self.get_connection()
+ return connection.root()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+
+@vocabulary_config(name='PyAMS ZODB connections')
+class ZODBConnectionVocabulary(SimpleVocabulary):
+ """ZODB connections vocabulary"""
+
+ def __init__(self, context=None):
+ settings = get_global_registry().settings
+ terms = [SimpleTerm(name, title=name) for name, uri in get_uris(settings)]
+ super(ZODBConnectionVocabulary, self).__init__(terms)
+
+
+volatile_marker = object()
+
+
+class volatile_property:
+ """Property decorator to define volatile attributes into persistent classes"""
+
+ def __init__(self, fget, doc=None):
+ self.fget = fget
+ self.__doc__ = doc or fget.__doc__
+ self.__name__ = fget.__name__
+ self.__module__ = fget.__module__
+
+ def __get__(self, inst, cls):
+ if inst is None:
+ return self
+ attrname = '_v_{0}'.format(self.__name__)
+ value = getattr(inst, attrname, volatile_marker)
+ if value is volatile_marker:
+ value = self.fget(inst)
+ setattr(inst, attrname, value)
+ return value
+
+ def __delete__(self, inst):
+ attrname = '_v_{0}'.format(self.__name__)
+ if hasattr(inst, attrname):
+ delattr(inst, attrname)