src/pyams_notify_ws/notify.py
changeset 1 f2910a60a29a
parent 0 284c0976e3ff
child 8 a285b7e8217c
equal deleted inserted replaced
0:284c0976e3ff 1:f2910a60a29a
    18 import collections
    18 import collections
    19 import json
    19 import json
    20 import pickle
    20 import pickle
    21 
    21 
    22 # import interfaces
    22 # import interfaces
       
    23 from pyams_cache.interfaces import IAioCacheHandler
    23 
    24 
    24 # import packages
    25 # import packages
    25 from aiomcache import Client as MemcachedClient
       
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    27 from aiopyramid.websocket.view import WebsocketConnectionView
    27 from aiopyramid.websocket.view import WebsocketConnectionView
       
    28 from pyams_cache.cache import get_cache_handler
    28 from pyams_notify_ws.subscribe import users
    29 from pyams_notify_ws.subscribe import users
    29 from pyramid.view import view_config
    30 from pyramid.view import view_config
    30 
    31 
    31 
    32 
    32 queue = collections.deque(maxlen=50)
    33 queue = collections.deque(maxlen=50)
    33 queue_key = b'_PyAMS_notify_messages_queue_'
    34 queue_key = b'PyAMS:notify:messages_queue'
    34 queue_lock = asyncio.Lock()
    35 queue_lock = asyncio.Lock()
    35 
    36 
    36 memcached_client = None
    37 cache_handler = None
    37 
    38 
    38 
    39 
    39 def init_memcached_client(server):
    40 def init_cache_client(server):
    40     """Initialize memcached handler"""
    41     """Initialize cache handler"""
    41     global memcached_client
    42     global cache_handler
    42     ip, port = server.split(':')
    43     cache_handler = get_cache_handler(server, IAioCacheHandler)
    43     memcached_client = MemcachedClient(ip, int(port))
       
    44 
    44 
    45 
    45 
    46 @view_config(route_name='notify', mapper=WebsocketMapper)
    46 @view_config(route_name='notify', mapper=WebsocketMapper)
    47 class WebsocketNotifyView(WebsocketConnectionView):
    47 class WebsocketNotifyView(WebsocketConnectionView):
    48     """Websocket notify view"""
    48     """Websocket notify view"""
    65                 target = message.get('target').get('principals', ())
    65                 target = message.get('target').get('principals', ())
    66                 for connection, subscription in users.items():
    66                 for connection, subscription in users.items():
    67                     if subscription.filter_target(target):
    67                     if subscription.filter_target(target):
    68                         yield from connection.send(json_message)
    68                         yield from connection.send(json_message)
    69                 # store message in memcached_queue
    69                 # store message in memcached_queue
    70                 if memcached_client is not None:
    70                 if cache_handler is not None:
    71                     with (yield from queue_lock):
    71                     with (yield from queue_lock):
    72                         mem_queue = yield from memcached_client.get(queue_key)
    72                         mem_queue = yield from cache_handler.get(queue_key)
    73                         if mem_queue is None:
    73                         if mem_queue is None:
    74                             mem_queue = queue
    74                             mem_queue = queue
    75                         else:
    75                         else:
    76                             mem_queue = pickle.loads(mem_queue)
    76                             mem_queue = pickle.loads(mem_queue)
    77                         mem_queue.append(message)
    77                         mem_queue.append(message)
    78                         yield from memcached_client.set(queue_key, pickle.dumps(mem_queue))
    78                         yield from cache_handler.set(queue_key, pickle.dumps(mem_queue))
    79 
    79 
    80     @asyncio.coroutine
    80     @asyncio.coroutine
    81     def on_close(self):
    81     def on_close(self):
    82         pass
    82         pass