src/pyams_alchemy/engine.py
changeset 80 dd9d4e651992
parent 79 c4fb4f3219f1
equal deleted inserted replaced
79:c4fb4f3219f1 80:dd9d4e651992
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
    12 
    12 
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import logging
    13 import logging
    18 logger = logging.getLogger('PyAMS (SQLAlchemy)')
       
    19 
       
    20 import sqlalchemy
       
    21 import time
    14 import time
    22 from datetime import datetime
    15 from datetime import datetime
       
    16 from threading import Lock, Thread
       
    17 
       
    18 import sqlalchemy
       
    19 from persistent import Persistent
       
    20 from persistent.dict import PersistentDict
       
    21 from pyramid.events import subscriber
    23 from sqlalchemy.event import listens_for
    22 from sqlalchemy.event import listens_for
    24 from sqlalchemy.orm.scoping import scoped_session
    23 from sqlalchemy.orm.scoping import scoped_session
    25 from sqlalchemy.orm.session import sessionmaker
    24 from sqlalchemy.orm.session import sessionmaker
    26 from sqlalchemy.pool import Pool, NullPool
    25 from sqlalchemy.pool import NullPool, Pool
    27 from threading import Thread, Lock
    26 from zope.componentvocabulary.vocabulary import UtilityVocabulary
    28 
    27 from zope.container.contained import Contained
    29 # import interfaces
    28 from zope.interface import implementer
    30 from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility
    29 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, \
       
    30     IObjectRemovedEvent
       
    31 from zope.schema.fieldproperty import FieldProperty
       
    32 from zope.sqlalchemy.datamanager import STATUS_ACTIVE, STATUS_READONLY, ZopeTransactionEvents, \
       
    33     _SESSION_STATE, join_transaction, register
       
    34 
       
    35 from pyams_alchemy.interfaces import IAlchemyEngineUtility, REQUEST_SESSION_KEY
    31 from pyams_utils.interfaces.site import IOptionalUtility
    36 from pyams_utils.interfaces.site import IOptionalUtility
    32 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, IObjectRemovedEvent
       
    33 
       
    34 # import packages
       
    35 from persistent import Persistent
       
    36 from persistent.dict import PersistentDict
       
    37 from pyams_utils.registry import query_utility
    37 from pyams_utils.registry import query_utility
    38 from pyams_utils.request import check_request, get_request_data, set_request_data
    38 from pyams_utils.request import check_request, get_request_data, set_request_data
    39 from pyams_utils.vocabulary import vocabulary_config
    39 from pyams_utils.vocabulary import vocabulary_config
    40 from pyramid.events import subscriber
    40 
    41 from zope.componentvocabulary.vocabulary import UtilityVocabulary
    41 
    42 from zope.container.contained import Contained
    42 __docformat__ = 'restructuredtext'
    43 from zope.interface import implementer
    43 
    44 from zope.schema.fieldproperty import FieldProperty
    44 logger = logging.getLogger('PyAMS (SQLAlchemy)')
    45 from zope.sqlalchemy.datamanager import _SESSION_STATE, STATUS_READONLY, STATUS_ACTIVE, \
       
    46     ZopeTransactionExtension, join_transaction
       
    47 
       
    48 
    45 
    49 CONNECTIONS_TIMESTAMP = {}
    46 CONNECTIONS_TIMESTAMP = {}
    50 CONNECTIONS_LOCK = Lock()
    47 CONNECTIONS_LOCK = Lock()
    51 
    48 
    52 
    49 
    57     Called when a connection is retrieved from the pool.
    54     Called when a connection is retrieved from the pool.
    58     If the connection record is already marked, we remove it from the mapping.
    55     If the connection record is already marked, we remove it from the mapping.
    59     """
    56     """
    60     with CONNECTIONS_LOCK:
    57     with CONNECTIONS_LOCK:
    61         if record in CONNECTIONS_TIMESTAMP:
    58         if record in CONNECTIONS_TIMESTAMP:
    62             logger.debug("Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection, record))
    59             logger.debug(
       
    60                 "Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection,
       
    61                                                                                      record))
    63             del CONNECTIONS_TIMESTAMP[record]
    62             del CONNECTIONS_TIMESTAMP[record]
    64 
    63 
    65 
    64 
    66 @listens_for(Pool, 'checkin')
    65 @listens_for(Pool, 'checkin')
    67 def handle_pool_checkin(connection, record):
    66 def handle_pool_checkin(connection, record):
    70     Called when a connection returns to the pool.
    69     Called when a connection returns to the pool.
    71     We apply a timestamp on the connection record to be able to close it automatically
    70     We apply a timestamp on the connection record to be able to close it automatically
    72     after 5 minutes without being used.
    71     after 5 minutes without being used.
    73     """
    72     """
    74     with CONNECTIONS_LOCK:
    73     with CONNECTIONS_LOCK:
    75         logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(connection, record))
    74         logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(
       
    75             connection, record))
    76         CONNECTIONS_TIMESTAMP[record] = datetime.utcnow()
    76         CONNECTIONS_TIMESTAMP[record] = datetime.utcnow()
    77 
    77 
    78 
    78 
    79 class ConnectionCleanerThread(Thread):
    79 class ConnectionCleanerThread(Thread):
    80     """Background thread used to clean unused database connections
    80     """Background thread used to clean unused database connections
    88         while True:
    88         while True:
    89             now = datetime.utcnow()
    89             now = datetime.utcnow()
    90             for connection, value in list(CONNECTIONS_TIMESTAMP.items()):
    90             for connection, value in list(CONNECTIONS_TIMESTAMP.items()):
    91                 delta = now - value
    91                 delta = now - value
    92                 if delta.total_seconds() > self.timeout:
    92                 if delta.total_seconds() > self.timeout:
    93                     logger.debug("Invalidating unused connection {0!r} from pool".format(connection))
    93                     logger.debug(
       
    94                         "Invalidating unused connection {0!r} from pool".format(connection))
    94                     with CONNECTIONS_LOCK:
    95                     with CONNECTIONS_LOCK:
    95                         connection.invalidate()
    96                         connection.invalidate()
    96                         del CONNECTIONS_TIMESTAMP[connection]
    97                         del CONNECTIONS_TIMESTAMP[connection]
    97             time.sleep(60)
    98             time.sleep(60)
       
    99 
    98 
   100 
    99 logger.info("Starting SQLAlchemy connections management thread")
   101 logger.info("Starting SQLAlchemy connections management thread")
   100 cleaner_thread = ConnectionCleanerThread()
   102 cleaner_thread = ConnectionCleanerThread()
   101 cleaner_thread.daemon = True
   103 cleaner_thread.daemon = True
   102 cleaner_thread.start()
   104 cleaner_thread.start()
   114     pool_recycle = FieldProperty(IAlchemyEngineUtility['pool_recycle'])
   116     pool_recycle = FieldProperty(IAlchemyEngineUtility['pool_recycle'])
   115     echo_pool = FieldProperty(IAlchemyEngineUtility['echo_pool'])
   117     echo_pool = FieldProperty(IAlchemyEngineUtility['echo_pool'])
   116     encoding = FieldProperty(IAlchemyEngineUtility['encoding'])
   118     encoding = FieldProperty(IAlchemyEngineUtility['encoding'])
   117     convert_unicode = FieldProperty(IAlchemyEngineUtility['convert_unicode'])
   119     convert_unicode = FieldProperty(IAlchemyEngineUtility['convert_unicode'])
   118 
   120 
   119     def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1, echo_pool=False,
   121     def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1,
   120                  encoding='utf-8', convert_unicode=False, **kwargs):
   122                  echo_pool=False, encoding='utf-8', convert_unicode=False, **kwargs):
   121         self.name = name
   123         self.name = name
   122         self.dsn = dsn
   124         self.dsn = dsn
   123         self.echo = echo
   125         self.echo = echo
   124         self.use_pool = use_pool
   126         self.use_pool = use_pool
   125         self.pool_size = pool_size
   127         self.pool_size = pool_size
   149                                             **kw)
   151                                             **kw)
   150         else:
   152         else:
   151             # Store engine into volatile attributes when pooling is enabled
   153             # Store engine into volatile attributes when pooling is enabled
   152             engine = getattr(self, '_v_engine', None)
   154             engine = getattr(self, '_v_engine', None)
   153             if engine is None:
   155             if engine is None:
   154                 engine = self._v_engine = sqlalchemy.create_engine(self.dsn,
   156                 engine = self._v_engine = \
   155                                                                    echo=self.echo,
   157                     sqlalchemy.create_engine(self.dsn,
   156                                                                    pool_size=self.pool_size,
   158                                              echo=self.echo,
   157                                                                    pool_recycle=self.pool_recycle,
   159                                              pool_size=self.pool_size,
   158                                                                    echo_pool=self.echo_pool,
   160                                              pool_recycle=self.pool_recycle,
   159                                                                    encoding=self.encoding,
   161                                              echo_pool=self.echo_pool,
   160                                                                    convert_unicode=self.convert_unicode,
   162                                              encoding=self.encoding,
   161                                                                    strategy='threadlocal',
   163                                              convert_unicode=self.convert_unicode,
   162                                                                    **kw)
   164                                              strategy='threadlocal',
       
   165                                              **kw)
   163             return engine
   166             return engine
   164 
   167 
   165     def clear_engine(self):
   168     def clear_engine(self):
   166         if hasattr(self, '_v_engine'):
   169         if hasattr(self, '_v_engine'):
   167             delattr(self, '_v_engine')
   170             delattr(self, '_v_engine')
   215     session = session_data.get(alias)
   218     session = session_data.get(alias)
   216     if session is None:
   219     if session is None:
   217         _engine = get_engine(engine, use_pool)
   220         _engine = get_engine(engine, use_pool)
   218         if use_zope_extension:
   221         if use_zope_extension:
   219             factory = scoped_session(sessionmaker(bind=_engine,
   222             factory = scoped_session(sessionmaker(bind=_engine,
   220                                                   twophase=twophase,
   223                                                   twophase=twophase))
   221                                                   extension=ZopeTransactionExtension()))
       
   222         else:
   224         else:
   223             factory = sessionmaker(bind=_engine, twophase=twophase)
   225             factory = sessionmaker(bind=_engine, twophase=twophase)
   224         session = factory()
   226         session = factory()
       
   227         if use_zope_extension:
       
   228             register(session, initial_state=status)
   225         if join:
   229         if join:
   226             join_transaction(session, initial_state=status)
   230             join_transaction(session, initial_state=status)
   227         if status != STATUS_READONLY:
   231         if status != STATUS_READONLY:
   228             _SESSION_STATE[session] = session
   232             _SESSION_STATE[session] = session
   229         if session is not None:
   233         if session is not None:
   235 
   239 
   236 def get_user_session(engine, join=True, status=STATUS_ACTIVE, request=None, alias=None,
   240 def get_user_session(engine, join=True, status=STATUS_ACTIVE, request=None, alias=None,
   237                      twophase=True, use_zope_extension=True, use_pool=True):
   241                      twophase=True, use_zope_extension=True, use_pool=True):
   238     """Get a new SQLAlchemy session
   242     """Get a new SQLAlchemy session
   239 
   243 
   240     :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as a string, it is
   244     :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as
   241         returned as-is.
   245         a string, it is returned as-is.
   242     :param bool join: if *True*, session is joined to the current Pyramid transaction
   246     :param bool join: if *True*, session is joined to the current Pyramid transaction
   243     :param str status: status of the new session; can be STATUS_ACTIVE or STATUS_READONLY
   247     :param str status: status of the new session; can be STATUS_ACTIVE or STATUS_READONLY
   244     :param request: currently running request
   248     :param request: currently running request
   245     :param str alias: alias to use in connections mapping for this session
   249     :param str alias: alias to use in connections mapping for this session
   246     :param bool twophase: if *False*, session will be isolated and not included into two-phase transactions mechanism
   250     :param bool twophase: if *False*, session will be isolated and not included into two-phase
       
   251         transactions mechanism
   247     :param bool use_zope_extension: if *True*, use ZopeTransactionExtension scoped session
   252     :param bool use_zope_extension: if *True*, use ZopeTransactionExtension scoped session
   248     :param bool use_pool: if *True*, this session will use a pool
   253     :param bool use_pool: if *True*, this session will use a pool
   249     """
   254     """
   250     if isinstance(engine, str):
   255     if isinstance(engine, str):
   251         session = get_session(engine, join, status, request, alias, twophase, use_zope_extension, use_pool)
   256         session = get_session(engine, join, status, request, alias,
       
   257                               twophase, use_zope_extension, use_pool)
   252     else:
   258     else:
   253         session = engine
   259         session = engine
   254     return session
   260     return session
   255 
   261 
   256 
   262