--- a/src/pyams_alchemy/engine.py Thu Jul 11 13:24:16 2019 +0200
+++ b/src/pyams_alchemy/engine.py Tue Nov 19 17:16:05 2019 +0100
@@ -10,41 +10,38 @@
# FOR A PARTICULAR PURPOSE.
#
-__docformat__ = 'restructuredtext'
-
-
-# import standard library
import logging
-logger = logging.getLogger('PyAMS (SQLAlchemy)')
+import time
+from datetime import datetime
+from threading import Lock, Thread
import sqlalchemy
-import time
-from datetime import datetime
+from persistent import Persistent
+from persistent.dict import PersistentDict
+from pyramid.events import subscriber
from sqlalchemy.event import listens_for
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.orm.session import sessionmaker
-from sqlalchemy.pool import Pool, NullPool
-from threading import Thread, Lock
+from sqlalchemy.pool import NullPool, Pool
+from zope.componentvocabulary.vocabulary import UtilityVocabulary
+from zope.container.contained import Contained
+from zope.interface import implementer
+from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, \
+ IObjectRemovedEvent
+from zope.schema.fieldproperty import FieldProperty
+from zope.sqlalchemy.datamanager import STATUS_ACTIVE, STATUS_READONLY, ZopeTransactionEvents, \
+ _SESSION_STATE, join_transaction, register
-# import interfaces
-from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility
+from pyams_alchemy.interfaces import IAlchemyEngineUtility, REQUEST_SESSION_KEY
from pyams_utils.interfaces.site import IOptionalUtility
-from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, IObjectRemovedEvent
-
-# import packages
-from persistent import Persistent
-from persistent.dict import PersistentDict
from pyams_utils.registry import query_utility
from pyams_utils.request import check_request, get_request_data, set_request_data
from pyams_utils.vocabulary import vocabulary_config
-from pyramid.events import subscriber
-from zope.componentvocabulary.vocabulary import UtilityVocabulary
-from zope.container.contained import Contained
-from zope.interface import implementer
-from zope.schema.fieldproperty import FieldProperty
-from zope.sqlalchemy.datamanager import _SESSION_STATE, STATUS_READONLY, STATUS_ACTIVE, \
- ZopeTransactionExtension, join_transaction
+
+__docformat__ = 'restructuredtext'
+
+logger = logging.getLogger('PyAMS (SQLAlchemy)')
CONNECTIONS_TIMESTAMP = {}
CONNECTIONS_LOCK = Lock()
@@ -59,7 +56,9 @@
"""
with CONNECTIONS_LOCK:
if record in CONNECTIONS_TIMESTAMP:
- logger.debug("Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection, record))
+ logger.debug(
+ "Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection,
+ record))
del CONNECTIONS_TIMESTAMP[record]
@@ -72,7 +71,8 @@
after 5 minutes without being used.
"""
with CONNECTIONS_LOCK:
- logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(connection, record))
+ logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(
+ connection, record))
CONNECTIONS_TIMESTAMP[record] = datetime.utcnow()
@@ -90,12 +90,14 @@
for connection, value in list(CONNECTIONS_TIMESTAMP.items()):
delta = now - value
if delta.total_seconds() > self.timeout:
- logger.debug("Invalidating unused connection {0!r} from pool".format(connection))
+ logger.debug(
+ "Invalidating unused connection {0!r} from pool".format(connection))
with CONNECTIONS_LOCK:
connection.invalidate()
del CONNECTIONS_TIMESTAMP[connection]
time.sleep(60)
+
logger.info("Starting SQLAlchemy connections management thread")
cleaner_thread = ConnectionCleanerThread()
cleaner_thread.daemon = True
@@ -116,8 +118,8 @@
encoding = FieldProperty(IAlchemyEngineUtility['encoding'])
convert_unicode = FieldProperty(IAlchemyEngineUtility['convert_unicode'])
- def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1, echo_pool=False,
- encoding='utf-8', convert_unicode=False, **kwargs):
+ def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1,
+ echo_pool=False, encoding='utf-8', convert_unicode=False, **kwargs):
self.name = name
self.dsn = dsn
self.echo = echo
@@ -151,15 +153,16 @@
# Store engine into volatile attributes when pooling is enabled
engine = getattr(self, '_v_engine', None)
if engine is None:
- engine = self._v_engine = sqlalchemy.create_engine(self.dsn,
- echo=self.echo,
- pool_size=self.pool_size,
- pool_recycle=self.pool_recycle,
- echo_pool=self.echo_pool,
- encoding=self.encoding,
- convert_unicode=self.convert_unicode,
- strategy='threadlocal',
- **kw)
+ engine = self._v_engine = \
+ sqlalchemy.create_engine(self.dsn,
+ echo=self.echo,
+ pool_size=self.pool_size,
+ pool_recycle=self.pool_recycle,
+ echo_pool=self.echo_pool,
+ encoding=self.encoding,
+ convert_unicode=self.convert_unicode,
+ strategy='threadlocal',
+ **kw)
return engine
def clear_engine(self):
@@ -217,11 +220,12 @@
_engine = get_engine(engine, use_pool)
if use_zope_extension:
factory = scoped_session(sessionmaker(bind=_engine,
- twophase=twophase,
- extension=ZopeTransactionExtension()))
+ twophase=twophase))
else:
factory = sessionmaker(bind=_engine, twophase=twophase)
session = factory()
+ if use_zope_extension:
+ register(session, initial_state=status)
if join:
join_transaction(session, initial_state=status)
if status != STATUS_READONLY:
@@ -237,18 +241,20 @@
twophase=True, use_zope_extension=True, use_pool=True):
"""Get a new SQLAlchemy session
- :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as a string, it is
- returned as-is.
+ :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as
+ a string, it is returned as-is.
:param bool join: if *True*, session is joined to the current Pyramid transaction
:param str status: status of the new session; can be STATUS_ACTIVE or STATUS_READONLY
:param request: currently running request
:param str alias: alias to use in connections mapping for this session
- :param bool twophase: if *False*, session will be isolated and not included into two-phase transactions mechanism
+ :param bool twophase: if *False*, session will be isolated and not included into two-phase
+ transactions mechanism
:param bool use_zope_extension: if *True*, use ZopeTransactionExtension scoped session
:param bool use_pool: if *True*, this session will use a pool
"""
if isinstance(engine, str):
- session = get_session(engine, join, status, request, alias, twophase, use_zope_extension, use_pool)
+ session = get_session(engine, join, status, request, alias,
+ twophase, use_zope_extension, use_pool)
else:
session = engine
return session