# HG changeset patch # User Thierry Florac # Date 1574180165 -3600 # Node ID dd9d4e651992f10ef9ffacd6e6f7ac90fba7e584 # Parent c4fb4f3219f1f9a6e9dc493ad39d95d731fe7bb0 Register sessions using last zope.sqlalchemy features diff -r c4fb4f3219f1 -r dd9d4e651992 src/pyams_alchemy/engine.py --- a/src/pyams_alchemy/engine.py Thu Jul 11 13:24:16 2019 +0200 +++ b/src/pyams_alchemy/engine.py Tue Nov 19 17:16:05 2019 +0100 @@ -10,41 +10,38 @@ # FOR A PARTICULAR PURPOSE. # -__docformat__ = 'restructuredtext' - - -# import standard library import logging -logger = logging.getLogger('PyAMS (SQLAlchemy)') +import time +from datetime import datetime +from threading import Lock, Thread import sqlalchemy -import time -from datetime import datetime +from persistent import Persistent +from persistent.dict import PersistentDict +from pyramid.events import subscriber from sqlalchemy.event import listens_for from sqlalchemy.orm.scoping import scoped_session from sqlalchemy.orm.session import sessionmaker -from sqlalchemy.pool import Pool, NullPool -from threading import Thread, Lock +from sqlalchemy.pool import NullPool, Pool +from zope.componentvocabulary.vocabulary import UtilityVocabulary +from zope.container.contained import Contained +from zope.interface import implementer +from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, \ + IObjectRemovedEvent +from zope.schema.fieldproperty import FieldProperty +from zope.sqlalchemy.datamanager import STATUS_ACTIVE, STATUS_READONLY, ZopeTransactionEvents, \ + _SESSION_STATE, join_transaction, register -# import interfaces -from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility +from pyams_alchemy.interfaces import IAlchemyEngineUtility, REQUEST_SESSION_KEY from pyams_utils.interfaces.site import IOptionalUtility -from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, IObjectRemovedEvent - -# import packages -from persistent import Persistent -from persistent.dict import PersistentDict from pyams_utils.registry import query_utility from pyams_utils.request import check_request, get_request_data, set_request_data from pyams_utils.vocabulary import vocabulary_config -from pyramid.events import subscriber -from zope.componentvocabulary.vocabulary import UtilityVocabulary -from zope.container.contained import Contained -from zope.interface import implementer -from zope.schema.fieldproperty import FieldProperty -from zope.sqlalchemy.datamanager import _SESSION_STATE, STATUS_READONLY, STATUS_ACTIVE, \ - ZopeTransactionExtension, join_transaction + +__docformat__ = 'restructuredtext' + +logger = logging.getLogger('PyAMS (SQLAlchemy)') CONNECTIONS_TIMESTAMP = {} CONNECTIONS_LOCK = Lock() @@ -59,7 +56,9 @@ """ with CONNECTIONS_LOCK: if record in CONNECTIONS_TIMESTAMP: - logger.debug("Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection, record)) + logger.debug( + "Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection, + record)) del CONNECTIONS_TIMESTAMP[record] @@ -72,7 +71,8 @@ after 5 minutes without being used. """ with CONNECTIONS_LOCK: - logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(connection, record)) + logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format( + connection, record)) CONNECTIONS_TIMESTAMP[record] = datetime.utcnow() @@ -90,12 +90,14 @@ for connection, value in list(CONNECTIONS_TIMESTAMP.items()): delta = now - value if delta.total_seconds() > self.timeout: - logger.debug("Invalidating unused connection {0!r} from pool".format(connection)) + logger.debug( + "Invalidating unused connection {0!r} from pool".format(connection)) with CONNECTIONS_LOCK: connection.invalidate() del CONNECTIONS_TIMESTAMP[connection] time.sleep(60) + logger.info("Starting SQLAlchemy connections management thread") cleaner_thread = ConnectionCleanerThread() cleaner_thread.daemon = True @@ -116,8 +118,8 @@ encoding = FieldProperty(IAlchemyEngineUtility['encoding']) convert_unicode = FieldProperty(IAlchemyEngineUtility['convert_unicode']) - def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1, echo_pool=False, - encoding='utf-8', convert_unicode=False, **kwargs): + def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1, + echo_pool=False, encoding='utf-8', convert_unicode=False, **kwargs): self.name = name self.dsn = dsn self.echo = echo @@ -151,15 +153,16 @@ # Store engine into volatile attributes when pooling is enabled engine = getattr(self, '_v_engine', None) if engine is None: - engine = self._v_engine = sqlalchemy.create_engine(self.dsn, - echo=self.echo, - pool_size=self.pool_size, - pool_recycle=self.pool_recycle, - echo_pool=self.echo_pool, - encoding=self.encoding, - convert_unicode=self.convert_unicode, - strategy='threadlocal', - **kw) + engine = self._v_engine = \ + sqlalchemy.create_engine(self.dsn, + echo=self.echo, + pool_size=self.pool_size, + pool_recycle=self.pool_recycle, + echo_pool=self.echo_pool, + encoding=self.encoding, + convert_unicode=self.convert_unicode, + strategy='threadlocal', + **kw) return engine def clear_engine(self): @@ -217,11 +220,12 @@ _engine = get_engine(engine, use_pool) if use_zope_extension: factory = scoped_session(sessionmaker(bind=_engine, - twophase=twophase, - extension=ZopeTransactionExtension())) + twophase=twophase)) else: factory = sessionmaker(bind=_engine, twophase=twophase) session = factory() + if use_zope_extension: + register(session, initial_state=status) if join: join_transaction(session, initial_state=status) if status != STATUS_READONLY: @@ -237,18 +241,20 @@ twophase=True, use_zope_extension=True, use_pool=True): """Get a new SQLAlchemy session - :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as a string, it is - returned as-is. + :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as + a string, it is returned as-is. :param bool join: if *True*, session is joined to the current Pyramid transaction :param str status: status of the new session; can be STATUS_ACTIVE or STATUS_READONLY :param request: currently running request :param str alias: alias to use in connections mapping for this session - :param bool twophase: if *False*, session will be isolated and not included into two-phase transactions mechanism + :param bool twophase: if *False*, session will be isolated and not included into two-phase + transactions mechanism :param bool use_zope_extension: if *True*, use ZopeTransactionExtension scoped session :param bool use_pool: if *True*, this session will use a pool """ if isinstance(engine, str): - session = get_session(engine, join, status, request, alias, twophase, use_zope_extension, use_pool) + session = get_session(engine, join, status, request, alias, + twophase, use_zope_extension, use_pool) else: session = engine return session diff -r c4fb4f3219f1 -r dd9d4e651992 src/pyams_alchemy/metadirectives.py --- a/src/pyams_alchemy/metadirectives.py Thu Jul 11 13:24:16 2019 +0200 +++ b/src/pyams_alchemy/metadirectives.py Tue Nov 19 17:16:05 2019 +0100 @@ -10,16 +10,11 @@ # FOR A PARTICULAR PURPOSE. # -__docformat__ = 'restructuredtext' +from zope.interface import Interface +from zope.schema import TextLine, Bool, Int, Choice -# import standard library - -# import interfaces - -# import packages -from zope.interface import Interface -from zope.schema import TextLine, Bool, Int, Choice +__docformat__ = 'restructuredtext' from pyams_alchemy import _