src/pyams_alchemy/engine.py
changeset 63 40f12a3d67db
child 77 2be615fc6da4
equal deleted inserted replaced
-1:000000000000 63:40f12a3d67db
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import logging
       
    18 logger = logging.getLogger('PyAMS (SQLAlchemy)')
       
    19 
       
    20 import sqlalchemy
       
    21 import time
       
    22 from datetime import datetime
       
    23 from sqlalchemy.event import listens_for
       
    24 from sqlalchemy.orm.scoping import scoped_session
       
    25 from sqlalchemy.orm.session import sessionmaker
       
    26 from sqlalchemy.pool import Pool, NullPool
       
    27 from threading import Thread, Lock
       
    28 
       
    29 # import interfaces
       
    30 from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility
       
    31 from pyams_utils.interfaces.site import IOptionalUtility
       
    32 from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectModifiedEvent, IObjectRemovedEvent
       
    33 
       
    34 # import packages
       
    35 from persistent import Persistent
       
    36 from persistent.dict import PersistentDict
       
    37 from pyams_utils.registry import query_utility
       
    38 from pyams_utils.request import check_request, get_request_data, set_request_data
       
    39 from pyams_utils.vocabulary import vocabulary_config
       
    40 from pyramid.events import subscriber
       
    41 from zope.componentvocabulary.vocabulary import UtilityVocabulary
       
    42 from zope.container.contained import Contained
       
    43 from zope.interface import implementer
       
    44 from zope.schema.fieldproperty import FieldProperty
       
    45 from zope.sqlalchemy.datamanager import _SESSION_STATE, STATUS_READONLY, STATUS_ACTIVE, \
       
    46     ZopeTransactionExtension, join_transaction
       
    47 
       
    48 
       
    49 CONNECTIONS_TIMESTAMP = {}
       
    50 CONNECTIONS_LOCK = Lock()
       
    51 
       
    52 
       
    53 @listens_for(Pool, 'checkout')
       
    54 def handle_pool_checkout(connection, record, proxy):
       
    55     """Pool connection checkout
       
    56 
       
    57     Called when a connection is retrieved from the pool.
       
    58     If the connection record is already marked, we remove it from the mapping.
       
    59     """
       
    60     with CONNECTIONS_LOCK:
       
    61         if record in CONNECTIONS_TIMESTAMP:
       
    62             logger.debug("Removing timestamp for checked-out connection {0!r} ({1!r})".format(connection, record))
       
    63             del CONNECTIONS_TIMESTAMP[record]
       
    64 
       
    65 
       
    66 @listens_for(Pool, 'checkin')
       
    67 def handle_pool_checkin(connection, record):
       
    68     """Pool connection checkin
       
    69 
       
    70     Called when a connection returns to the pool.
       
    71     We apply a timestamp on the connection record to be able to close it automatically
       
    72     after 5 minutes without being used.
       
    73     """
       
    74     with CONNECTIONS_LOCK:
       
    75         logger.debug("Setting inactivity timestamp for checked-in connection {0!r} ({1!r})".format(connection, record))
       
    76         CONNECTIONS_TIMESTAMP[record] = datetime.utcnow()
       
    77 
       
    78 
       
    79 class ConnectionCleanerThread(Thread):
       
    80     """Background thread used to clean unused database connections
       
    81 
       
    82     Each connection is referenced in CONNECTION_TIMESTAMPS mapping on checkin and is invalidated
       
    83     if not being used after 5 minutes
       
    84     """
       
    85     timeout = 300
       
    86 
       
    87     def run(self):
       
    88         while True:
       
    89             now = datetime.utcnow()
       
    90             for connection, value in list(CONNECTIONS_TIMESTAMP.items()):
       
    91                 delta = now - value
       
    92                 if delta.total_seconds() > self.timeout:
       
    93                     logger.debug("Invalidating unused connection {0!r} from pool".format(connection))
       
    94                     with CONNECTIONS_LOCK:
       
    95                         connection.invalidate()
       
    96                         del CONNECTIONS_TIMESTAMP[connection]
       
    97             time.sleep(60)
       
    98 
       
    99 logger.info("Starting SQLAlchemy connections management thread")
       
   100 cleaner_thread = ConnectionCleanerThread()
       
   101 cleaner_thread.daemon = True
       
   102 cleaner_thread.start()
       
   103 
       
   104 
       
   105 @implementer(IAlchemyEngineUtility, IOptionalUtility)
       
   106 class AlchemyEngineUtility(object):
       
   107     """SQLAlchemy engine utility"""
       
   108 
       
   109     name = FieldProperty(IAlchemyEngineUtility['name'])
       
   110     dsn = FieldProperty(IAlchemyEngineUtility['dsn'])
       
   111     echo = FieldProperty(IAlchemyEngineUtility['echo'])
       
   112     use_pool = FieldProperty(IAlchemyEngineUtility['use_pool'])
       
   113     pool_size = FieldProperty(IAlchemyEngineUtility['pool_size'])
       
   114     pool_recycle = FieldProperty(IAlchemyEngineUtility['pool_recycle'])
       
   115     echo_pool = FieldProperty(IAlchemyEngineUtility['echo_pool'])
       
   116     encoding = FieldProperty(IAlchemyEngineUtility['encoding'])
       
   117     convert_unicode = FieldProperty(IAlchemyEngineUtility['convert_unicode'])
       
   118 
       
   119     def __init__(self, name='', dsn='', echo=False, use_pool=True, pool_size=25, pool_recycle=-1, echo_pool=False,
       
   120                  encoding='utf-8', convert_unicode=False, **kwargs):
       
   121         self.name = name
       
   122         self.dsn = dsn
       
   123         self.echo = echo
       
   124         self.use_pool = use_pool
       
   125         self.pool_size = pool_size
       
   126         self.pool_recycle = pool_recycle
       
   127         self.echo_pool = echo_pool
       
   128         self.encoding = encoding
       
   129         self.convert_unicode = convert_unicode
       
   130         self.kw = PersistentDict()
       
   131         self.kw.update(kwargs)
       
   132 
       
   133     def __setattr__(self, key, value):
       
   134         super(AlchemyEngineUtility, self).__setattr__(key, value)
       
   135         if (key != '_v_engine') and hasattr(self, '_v_engine'):
       
   136             delattr(self, '_v_engine')
       
   137 
       
   138     def get_engine(self, use_pool=True):
       
   139         kw = {}
       
   140         kw.update(self.kw)
       
   141         if not (use_pool and self.use_pool):
       
   142             # Always create a new engine when pooling is disabled to help engine disposal
       
   143             return sqlalchemy.create_engine(self.dsn,
       
   144                                             echo=self.echo,
       
   145                                             poolclass=NullPool,
       
   146                                             encoding=self.encoding,
       
   147                                             convert_unicode=self.convert_unicode,
       
   148                                             strategy='threadlocal',
       
   149                                             **kw)
       
   150         else:
       
   151             # Store engine into volatile attributes when pooling is enabled
       
   152             engine = getattr(self, '_v_engine', None)
       
   153             if engine is None:
       
   154                 engine = self._v_engine = sqlalchemy.create_engine(self.dsn,
       
   155                                                                    echo=self.echo,
       
   156                                                                    pool_size=self.pool_size,
       
   157                                                                    pool_recycle=self.pool_recycle,
       
   158                                                                    echo_pool=self.echo_pool,
       
   159                                                                    encoding=self.encoding,
       
   160                                                                    convert_unicode=self.convert_unicode,
       
   161                                                                    strategy='threadlocal',
       
   162                                                                    **kw)
       
   163             return engine
       
   164 
       
   165     def clear_engine(self):
       
   166         if hasattr(self, '_v_engine'):
       
   167             delattr(self, '_v_engine')
       
   168 
       
   169 
       
   170 class PersistentAlchemyEngineUtility(Persistent, AlchemyEngineUtility, Contained):
       
   171     """Persistent implementation of SQLAlchemy engine utility"""
       
   172 
       
   173 
       
   174 @subscriber(IObjectAddedEvent, context_selector=IAlchemyEngineUtility)
       
   175 def handle_added_engine(event):
       
   176     """Register new SQLAlchemy engine when added"""
       
   177     manager = event.newParent
       
   178     manager.registerUtility(event.object, IAlchemyEngineUtility, name=event.object.name or '')
       
   179 
       
   180 
       
   181 @subscriber(IObjectModifiedEvent, context_selector=IAlchemyEngineUtility)
       
   182 def handle_modified_engine(event):
       
   183     """Clear SQLAlchemy engine volatile attributes when modified"""
       
   184     IAlchemyEngineUtility(event.object).clear_engine()
       
   185 
       
   186 
       
   187 @subscriber(IObjectRemovedEvent, context_selector=IAlchemyEngineUtility)
       
   188 def handle_removed_engine(event):
       
   189     """Un-register an SQLAlchemy engine when deleted"""
       
   190     manager = event.oldParent
       
   191     manager.unregisterUtility(event.object, IAlchemyEngineUtility, name=event.object.name or '')
       
   192 
       
   193 
       
   194 def get_engine(engine, use_pool=True):
       
   195     """Get engine matching given utility name"""
       
   196     if isinstance(engine, str):
       
   197         engine = query_utility(IAlchemyEngineUtility, name=engine)
       
   198         if engine is not None:
       
   199             return engine.get_engine(use_pool)
       
   200 
       
   201 
       
   202 def get_session(engine, join=True, status=STATUS_ACTIVE, request=None, alias=None,
       
   203                 twophase=True, use_zope_extension=True, use_pool=True):
       
   204     """Get a new SQLALchemy session
       
   205 
       
   206     Session is stored in request and in session storage.
       
   207     See :func:`get_user_session` function to get arguments documentation.
       
   208     """
       
   209     if request is None:
       
   210         request = check_request()
       
   211         logger.debug("Checked request {0!r}".format(request))
       
   212     if not alias:
       
   213         alias = engine
       
   214     session_data = get_request_data(request, REQUEST_SESSION_KEY, {})
       
   215     session = session_data.get(alias)
       
   216     if session is None:
       
   217         _engine = get_engine(engine, use_pool)
       
   218         if use_zope_extension:
       
   219             factory = scoped_session(sessionmaker(bind=_engine,
       
   220                                                   twophase=twophase,
       
   221                                                   extension=ZopeTransactionExtension()))
       
   222         else:
       
   223             factory = sessionmaker(bind=_engine, twophase=twophase)
       
   224         session = factory()
       
   225         if join:
       
   226             join_transaction(session, initial_state=status)
       
   227         if status != STATUS_READONLY:
       
   228             _SESSION_STATE[session] = session
       
   229         if session is not None:
       
   230             session_data[alias] = session
       
   231             set_request_data(request, REQUEST_SESSION_KEY, session_data)
       
   232     logger.debug("Using SQLAlchemy session {0!r}".format(session))
       
   233     return session
       
   234 
       
   235 
       
   236 def get_user_session(engine, join=True, status=STATUS_ACTIVE, request=None, alias=None,
       
   237                      twophase=True, use_zope_extension=True, use_pool=True):
       
   238     """Get a new SQLAlchemy session
       
   239 
       
   240     :param str engine: name of an SQLAlchemy engine session utility; if *engine* is not given as a string, it is
       
   241         returned as-is.
       
   242     :param bool join: if *True*, session is joined to the current Pyramid transaction
       
   243     :param str status: status of the new session; can be STATUS_ACTIVE or STATUS_READONLY
       
   244     :param request: currently running request
       
   245     :param str alias: alias to use in connections mapping for this session
       
   246     :param bool twophase: if *False*, session will be isolated and not included into two-phase transactions mechanism
       
   247     :param bool use_zope_extension: if *True*, use ZopeTransactionExtension scoped session
       
   248     :param bool use_pool: if *True*, this session will use a pool
       
   249     """
       
   250     if isinstance(engine, str):
       
   251         session = get_session(engine, join, status, request, alias, twophase, use_zope_extension, use_pool)
       
   252     else:
       
   253         session = engine
       
   254     return session
       
   255 
       
   256 
       
   257 @vocabulary_config(name='PyAMS SQLAlchemy engines')
       
   258 class EnginesVocabulary(UtilityVocabulary):
       
   259     """SQLAlchemy engines vocabulary"""
       
   260 
       
   261     interface = IAlchemyEngineUtility
       
   262     nameOnly = True