24 |
24 |
25 # import packages |
25 # import packages |
26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
27 from aiopyramid.websocket.view import WebsocketConnectionView |
27 from aiopyramid.websocket.view import WebsocketConnectionView |
28 from pyams_cache.cache import get_cache_handler |
28 from pyams_cache.cache import get_cache_handler |
29 from pyams_notify_ws.subscribe import users |
29 from pyams_notify_ws.subscribe import users, users_lock |
30 from pyramid.view import view_config |
30 from pyramid.view import view_config |
31 |
31 |
32 |
32 |
33 queue = collections.deque(maxlen=50) |
33 queue = collections.deque(maxlen=50) |
34 queue_key = b'PyAMS:notify:messages_queue' |
34 queue_key = b'PyAMS:notify:messages_queue' |
45 |
45 |
46 @view_config(route_name='notify', mapper=WebsocketMapper) |
46 @view_config(route_name='notify', mapper=WebsocketMapper) |
47 class WebsocketNotifyView(WebsocketConnectionView): |
47 class WebsocketNotifyView(WebsocketConnectionView): |
48 """Websocket notify view""" |
48 """Websocket notify view""" |
49 |
49 |
50 @asyncio.coroutine |
50 async def on_open(self): |
51 def on_open(self): |
|
52 pass |
51 pass |
53 |
52 |
54 @asyncio.coroutine |
53 async def on_message(self, message): |
55 def on_message(self, message): |
|
56 try: |
54 try: |
57 message = json.loads(message) |
55 message = json.loads(message) |
58 except ValueError: |
56 except ValueError: |
59 pass |
57 pass |
60 else: |
58 else: |
64 source = message.get('source', {}).get('id') |
62 source = message.get('source', {}).get('id') |
65 target = message.get('target', {}).get('principals', ()) |
63 target = message.get('target', {}).get('principals', ()) |
66 clean_message = message.copy() |
64 clean_message = message.copy() |
67 clean_message.pop('target') |
65 clean_message.pop('target') |
68 json_message = json.dumps(clean_message) |
66 json_message = json.dumps(clean_message) |
69 for connection, subscription in users.items(): |
67 async with users_lock: |
70 # don't send notification to emitter |
68 for connection, subscription in users.items(): |
71 if source and (source == subscription.principal['id']): |
69 # don't send notification to emitter |
72 continue |
70 if source and (source == subscription.principal['id']): |
73 if subscription.filter_target(target): |
71 continue |
74 yield from connection.send(json_message) |
72 if subscription.filter_target(target): |
|
73 await connection.send(json_message) |
75 # store message in memcached_queue |
74 # store message in memcached_queue |
76 if cache_handler is not None: |
75 if cache_handler is not None: |
77 with (yield from queue_lock): |
76 async with queue_lock: |
78 mem_queue = yield from cache_handler.get(queue_key) |
77 mem_queue = await cache_handler.get(queue_key) |
79 if mem_queue is None: |
78 if mem_queue is None: |
80 mem_queue = queue |
79 mem_queue = queue |
81 else: |
80 else: |
82 mem_queue = pickle.loads(mem_queue) |
81 mem_queue = pickle.loads(mem_queue) |
83 mem_queue.append(message) |
82 mem_queue.append(message) |
84 yield from cache_handler.set(queue_key, pickle.dumps(mem_queue)) |
83 await cache_handler.set(queue_key, pickle.dumps(mem_queue)) |
85 |
84 |
86 @asyncio.coroutine |
85 async def on_close(self): |
87 def on_close(self): |
|
88 pass |
86 pass |