src/pyams_notify_ws/notify.py
changeset 0 284c0976e3ff
child 1 f2910a60a29a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_notify_ws/notify.py	Thu Jun 02 16:02:23 2016 +0200
@@ -0,0 +1,82 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import asyncio
+import collections
+import json
+import pickle
+
+# import interfaces
+
+# import packages
+from aiomcache import Client as MemcachedClient
+from aiopyramid.websocket.config.gunicorn import WebsocketMapper
+from aiopyramid.websocket.view import WebsocketConnectionView
+from pyams_notify_ws.subscribe import users
+from pyramid.view import view_config
+
+
+queue = collections.deque(maxlen=50)
+queue_key = b'_PyAMS_notify_messages_queue_'
+queue_lock = asyncio.Lock()
+
+memcached_client = None
+
+
+def init_memcached_client(server):
+    """Initialize memcached handler"""
+    global memcached_client
+    ip, port = server.split(':')
+    memcached_client = MemcachedClient(ip, int(port))
+
+
+@view_config(route_name='notify', mapper=WebsocketMapper)
+class WebsocketNotifyView(WebsocketConnectionView):
+    """Websocket notify view"""
+
+    @asyncio.coroutine
+    def on_open(self):
+        pass
+
+    @asyncio.coroutine
+    def on_message(self, message):
+        try:
+            message = json.loads(message)
+        except ValueError:
+            pass
+        else:
+            action = message.get('action')
+            if action == 'notify':
+                # dispatch notification to subscribers
+                json_message = json.dumps(message)
+                target = message.get('target').get('principals', ())
+                for connection, subscription in users.items():
+                    if subscription.filter_target(target):
+                        yield from connection.send(json_message)
+                # store message in memcached_queue
+                if memcached_client is not None:
+                    with (yield from queue_lock):
+                        mem_queue = yield from memcached_client.get(queue_key)
+                        if mem_queue is None:
+                            mem_queue = queue
+                        else:
+                            mem_queue = pickle.loads(mem_queue)
+                        mem_queue.append(message)
+                        yield from memcached_client.set(queue_key, pickle.dumps(mem_queue))
+
+    @asyncio.coroutine
+    def on_close(self):
+        pass