Added connections cleaner thread to close connections that were added to the pool but unused for more than 5 minutes
authorThierry Florac <thierry.florac@onf.fr>
Wed, 11 Mar 2015 15:31:33 +0100
changeset 5 aaa67c9ff76f
parent 4 dd47bcfc6237
child 6 6239080e27fd
Added connections cleaner thread to close connections that were added to the pool but unused for more than 5 minutes
src/pyams_alchemy/engine.py
--- a/src/pyams_alchemy/engine.py	Wed Mar 11 11:55:34 2015 +0100
+++ b/src/pyams_alchemy/engine.py	Wed Mar 11 15:31:33 2015 +0100
@@ -14,9 +14,17 @@
 
 
 # import standard library
+import logging
+logger = logging.getLogger('PyAMS (SQLAlchemy)')
+
 import sqlalchemy
+import time
+from datetime import datetime
+from sqlalchemy.event import listens_for
 from sqlalchemy.orm.scoping import scoped_session
 from sqlalchemy.orm.session import sessionmaker
+from sqlalchemy.pool import Pool
+from threading import Thread, Lock
 
 # import interfaces
 from pyams_alchemy.interfaces import REQUEST_SESSION_KEY, IAlchemyEngineUtility
@@ -39,6 +47,44 @@
     ZopeTransactionExtension, join_transaction
 
 
+CONNECTIONS_TIMESTAMP = {}
+CONNECTIONS_LOCK = Lock()
+
+
+@listens_for(Pool, 'checkout')
+def pool_checkout(connection, record, proxy):
+    """Pool checkout"""
+    logger.debug("Setting checkout timestamp for connection {0!r} ({1!r})".format(connection, record))
+    with CONNECTIONS_LOCK:
+        CONNECTIONS_TIMESTAMP[record] = datetime.utcnow()
+
+
+class ConnectionCleanerThread(Thread):
+    """Background thread used to clean unused database connections
+
+    Each connection is referenced in CONNECTION_TIMESTAMPS on checkout and is invalidated
+    if not being used after 5 minutes
+    """
+    timeout = 300
+
+    def run(self):
+        while True:
+            now = datetime.utcnow()
+            for connection, value in list(CONNECTIONS_TIMESTAMP.items()):
+                delta = now - value
+                if delta.total_seconds() > self.timeout:
+                    logger.debug("Detaching unused connection {0!r} from pool".format(connection))
+                    with CONNECTIONS_LOCK:
+                        connection.invalidate()
+                        del CONNECTIONS_TIMESTAMP[connection]
+            time.sleep(60)
+
+logger.debug("Starting pool connections management thread")
+cleaner_thread = ConnectionCleanerThread()
+cleaner_thread.daemon = True
+cleaner_thread.start()
+
+
 @implementer(IAlchemyEngineUtility, IOptionalUtility)
 class AlchemyEngineUtility(object):
     """SQLAlchemy engine utility"""