src/pyams_notify_ws/notify.py
changeset 0 284c0976e3ff
child 1 f2910a60a29a
equal deleted inserted replaced
-1:000000000000 0:284c0976e3ff
       
     1 #
       
     2 # Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
       
     3 # All Rights Reserved.
       
     4 #
       
     5 # This software is subject to the provisions of the Zope Public License,
       
     6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
       
     7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
       
     8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
     9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
       
    10 # FOR A PARTICULAR PURPOSE.
       
    11 #
       
    12 
       
    13 __docformat__ = 'restructuredtext'
       
    14 
       
    15 
       
    16 # import standard library
       
    17 import asyncio
       
    18 import collections
       
    19 import json
       
    20 import pickle
       
    21 
       
    22 # import interfaces
       
    23 
       
    24 # import packages
       
    25 from aiomcache import Client as MemcachedClient
       
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
       
    27 from aiopyramid.websocket.view import WebsocketConnectionView
       
    28 from pyams_notify_ws.subscribe import users
       
    29 from pyramid.view import view_config
       
    30 
       
    31 
       
    32 queue = collections.deque(maxlen=50)
       
    33 queue_key = b'_PyAMS_notify_messages_queue_'
       
    34 queue_lock = asyncio.Lock()
       
    35 
       
    36 memcached_client = None
       
    37 
       
    38 
       
    39 def init_memcached_client(server):
       
    40     """Initialize memcached handler"""
       
    41     global memcached_client
       
    42     ip, port = server.split(':')
       
    43     memcached_client = MemcachedClient(ip, int(port))
       
    44 
       
    45 
       
    46 @view_config(route_name='notify', mapper=WebsocketMapper)
       
    47 class WebsocketNotifyView(WebsocketConnectionView):
       
    48     """Websocket notify view"""
       
    49 
       
    50     @asyncio.coroutine
       
    51     def on_open(self):
       
    52         pass
       
    53 
       
    54     @asyncio.coroutine
       
    55     def on_message(self, message):
       
    56         try:
       
    57             message = json.loads(message)
       
    58         except ValueError:
       
    59             pass
       
    60         else:
       
    61             action = message.get('action')
       
    62             if action == 'notify':
       
    63                 # dispatch notification to subscribers
       
    64                 json_message = json.dumps(message)
       
    65                 target = message.get('target').get('principals', ())
       
    66                 for connection, subscription in users.items():
       
    67                     if subscription.filter_target(target):
       
    68                         yield from connection.send(json_message)
       
    69                 # store message in memcached_queue
       
    70                 if memcached_client is not None:
       
    71                     with (yield from queue_lock):
       
    72                         mem_queue = yield from memcached_client.get(queue_key)
       
    73                         if mem_queue is None:
       
    74                             mem_queue = queue
       
    75                         else:
       
    76                             mem_queue = pickle.loads(mem_queue)
       
    77                         mem_queue.append(message)
       
    78                         yield from memcached_client.set(queue_key, pickle.dumps(mem_queue))
       
    79 
       
    80     @asyncio.coroutine
       
    81     def on_close(self):
       
    82         pass