src/pyams_notify_ws/notify.py
changeset 15 6472be90626a
parent 12 167cc4ab3313
child 20 03c3572a16ad
equal deleted inserted replaced
14:dbdf7019502c 15:6472be90626a
    24 
    24 
    25 # import packages
    25 # import packages
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    27 from aiopyramid.websocket.view import WebsocketConnectionView
    27 from aiopyramid.websocket.view import WebsocketConnectionView
    28 from pyams_cache.cache import get_cache_handler
    28 from pyams_cache.cache import get_cache_handler
    29 from pyams_notify_ws.subscribe import users
    29 from pyams_notify_ws.subscribe import users, users_lock
    30 from pyramid.view import view_config
    30 from pyramid.view import view_config
    31 
    31 
    32 
    32 
    33 queue = collections.deque(maxlen=50)
    33 queue = collections.deque(maxlen=50)
    34 queue_key = b'PyAMS:notify:messages_queue'
    34 queue_key = b'PyAMS:notify:messages_queue'
    45 
    45 
    46 @view_config(route_name='notify', mapper=WebsocketMapper)
    46 @view_config(route_name='notify', mapper=WebsocketMapper)
    47 class WebsocketNotifyView(WebsocketConnectionView):
    47 class WebsocketNotifyView(WebsocketConnectionView):
    48     """Websocket notify view"""
    48     """Websocket notify view"""
    49 
    49 
    50     @asyncio.coroutine
    50     async def on_open(self):
    51     def on_open(self):
       
    52         pass
    51         pass
    53 
    52 
    54     @asyncio.coroutine
    53     async def on_message(self, message):
    55     def on_message(self, message):
       
    56         try:
    54         try:
    57             message = json.loads(message)
    55             message = json.loads(message)
    58         except ValueError:
    56         except ValueError:
    59             pass
    57             pass
    60         else:
    58         else:
    64                 source = message.get('source', {}).get('id')
    62                 source = message.get('source', {}).get('id')
    65                 target = message.get('target', {}).get('principals', ())
    63                 target = message.get('target', {}).get('principals', ())
    66                 clean_message = message.copy()
    64                 clean_message = message.copy()
    67                 clean_message.pop('target')
    65                 clean_message.pop('target')
    68                 json_message = json.dumps(clean_message)
    66                 json_message = json.dumps(clean_message)
    69                 for connection, subscription in users.items():
    67                 async with users_lock:
    70                     # don't send notification to emitter
    68                     for connection, subscription in users.items():
    71                     if source and (source == subscription.principal['id']):
    69                         # don't send notification to emitter
    72                         continue
    70                         if source and (source == subscription.principal['id']):
    73                     if subscription.filter_target(target):
    71                             continue
    74                         yield from connection.send(json_message)
    72                         if subscription.filter_target(target):
       
    73                             await connection.send(json_message)
    75                 # store message in memcached_queue
    74                 # store message in memcached_queue
    76                 if cache_handler is not None:
    75                 if cache_handler is not None:
    77                     with (yield from queue_lock):
    76                     async with queue_lock:
    78                         mem_queue = yield from cache_handler.get(queue_key)
    77                         mem_queue = await cache_handler.get(queue_key)
    79                         if mem_queue is None:
    78                         if mem_queue is None:
    80                             mem_queue = queue
    79                             mem_queue = queue
    81                         else:
    80                         else:
    82                             mem_queue = pickle.loads(mem_queue)
    81                             mem_queue = pickle.loads(mem_queue)
    83                         mem_queue.append(message)
    82                         mem_queue.append(message)
    84                         yield from cache_handler.set(queue_key, pickle.dumps(mem_queue))
    83                         await cache_handler.set(queue_key, pickle.dumps(mem_queue))
    85 
    84 
    86     @asyncio.coroutine
    85     async def on_close(self):
    87     def on_close(self):
       
    88         pass
    86         pass