--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pyams_notify_ws/notify.py Thu Jun 02 16:02:23 2016 +0200
@@ -0,0 +1,82 @@
+#
+# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+
+__docformat__ = 'restructuredtext'
+
+
+# import standard library
+import asyncio
+import collections
+import json
+import pickle
+
+# import interfaces
+
+# import packages
+from aiomcache import Client as MemcachedClient
+from aiopyramid.websocket.config.gunicorn import WebsocketMapper
+from aiopyramid.websocket.view import WebsocketConnectionView
+from pyams_notify_ws.subscribe import users
+from pyramid.view import view_config
+
+
+queue = collections.deque(maxlen=50)
+queue_key = b'_PyAMS_notify_messages_queue_'
+queue_lock = asyncio.Lock()
+
+memcached_client = None
+
+
+def init_memcached_client(server):
+ """Initialize memcached handler"""
+ global memcached_client
+ ip, port = server.split(':')
+ memcached_client = MemcachedClient(ip, int(port))
+
+
+@view_config(route_name='notify', mapper=WebsocketMapper)
+class WebsocketNotifyView(WebsocketConnectionView):
+ """Websocket notify view"""
+
+ @asyncio.coroutine
+ def on_open(self):
+ pass
+
+ @asyncio.coroutine
+ def on_message(self, message):
+ try:
+ message = json.loads(message)
+ except ValueError:
+ pass
+ else:
+ action = message.get('action')
+ if action == 'notify':
+ # dispatch notification to subscribers
+ json_message = json.dumps(message)
+ target = message.get('target').get('principals', ())
+ for connection, subscription in users.items():
+ if subscription.filter_target(target):
+ yield from connection.send(json_message)
+ # store message in memcached_queue
+ if memcached_client is not None:
+ with (yield from queue_lock):
+ mem_queue = yield from memcached_client.get(queue_key)
+ if mem_queue is None:
+ mem_queue = queue
+ else:
+ mem_queue = pickle.loads(mem_queue)
+ mem_queue.append(message)
+ yield from memcached_client.set(queue_key, pickle.dumps(mem_queue))
+
+ @asyncio.coroutine
+ def on_close(self):
+ pass