# HG changeset patch # User Thierry Florac # Date 1426084293 -3600 # Node ID aaa67c9ff76fdec75265707815d9ff303ce9f3a5 # Parent dd47bcfc62378d1b84ed4f1d7e426eb0a3358e48 Added connections cleaner thread to close connections that were added to the pool but unused for more than 5 minutes diff -r dd47bcfc6237 -r aaa67c9ff76f src/pyams_alchemy/engine.py --- a/src/pyams_alchemy/engine.py Wed Mar 11 11:55:34 2015 +0100 +++ b/src/pyams_alchemy/engine.py Wed Mar 11 15:31:33 2015 +0100 @@ -14,9 +14,17 @@ # import standard library +import logging +logger = logging.getLogger('PyAMS (SQLAlchemy)') + import sqlalchemy +import time +from datetime import datetime +from sqlalchemy.event import listens_for from sqlalchemy.orm.scoping import scoped_session from sqlalchemy.orm.session import sessionmaker +from sqlalchemy.pool import Pool +from threading import Thread, Lock # import interfaces from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility @@ -39,6 +47,44 @@ ZopeTransactionExtension, join_transaction +CONNECTIONS_TIMESTAMP = {} +CONNECTIONS_LOCK = Lock() + + +@listens_for(Pool, 'checkout') +def pool_checkout(connection, record, proxy): + """Pool checkout""" + logger.debug("Setting checkout timestamp for connection {0!r} ({1!r})".format(connection, record)) + with CONNECTIONS_LOCK: + CONNECTIONS_TIMESTAMP[record] = datetime.utcnow() + + +class ConnectionCleanerThread(Thread): + """Background thread used to clean unused database connections + + Each connection is referenced in CONNECTION_TIMESTAMPS on checkout and is invalidated + if not being used after 5 minutes + """ + timeout = 300 + + def run(self): + while True: + now = datetime.utcnow() + for connection, value in list(CONNECTIONS_TIMESTAMP.items()): + delta = now - value + if delta.total_seconds() > self.timeout: + logger.debug("Detaching unused connection {0!r} from pool".format(connection)) + with CONNECTIONS_LOCK: + connection.invalidate() + del CONNECTIONS_TIMESTAMP[connection] + time.sleep(60) + +logger.debug("Starting pool connections management thread") +cleaner_thread = ConnectionCleanerThread() +cleaner_thread.daemon = True +cleaner_thread.start() + + @implementer(IAlchemyEngineUtility, IOptionalUtility) class AlchemyEngineUtility(object): """SQLAlchemy engine utility"""