18 import collections |
18 import collections |
19 import json |
19 import json |
20 import pickle |
20 import pickle |
21 |
21 |
22 # import interfaces |
22 # import interfaces |
|
23 from pyams_cache.interfaces import IAioCacheHandler |
23 |
24 |
24 # import packages |
25 # import packages |
25 from aiomcache import Client as MemcachedClient |
|
26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
27 from aiopyramid.websocket.view import WebsocketConnectionView |
27 from aiopyramid.websocket.view import WebsocketConnectionView |
|
28 from pyams_cache.cache import get_cache_handler |
28 from pyams_notify_ws.subscribe import users |
29 from pyams_notify_ws.subscribe import users |
29 from pyramid.view import view_config |
30 from pyramid.view import view_config |
30 |
31 |
31 |
32 |
32 queue = collections.deque(maxlen=50) |
33 queue = collections.deque(maxlen=50) |
33 queue_key = b'_PyAMS_notify_messages_queue_' |
34 queue_key = b'PyAMS:notify:messages_queue' |
34 queue_lock = asyncio.Lock() |
35 queue_lock = asyncio.Lock() |
35 |
36 |
36 memcached_client = None |
37 cache_handler = None |
37 |
38 |
38 |
39 |
39 def init_memcached_client(server): |
40 def init_cache_client(server): |
40 """Initialize memcached handler""" |
41 """Initialize cache handler""" |
41 global memcached_client |
42 global cache_handler |
42 ip, port = server.split(':') |
43 cache_handler = get_cache_handler(server, IAioCacheHandler) |
43 memcached_client = MemcachedClient(ip, int(port)) |
|
44 |
44 |
45 |
45 |
46 @view_config(route_name='notify', mapper=WebsocketMapper) |
46 @view_config(route_name='notify', mapper=WebsocketMapper) |
47 class WebsocketNotifyView(WebsocketConnectionView): |
47 class WebsocketNotifyView(WebsocketConnectionView): |
48 """Websocket notify view""" |
48 """Websocket notify view""" |
65 target = message.get('target').get('principals', ()) |
65 target = message.get('target').get('principals', ()) |
66 for connection, subscription in users.items(): |
66 for connection, subscription in users.items(): |
67 if subscription.filter_target(target): |
67 if subscription.filter_target(target): |
68 yield from connection.send(json_message) |
68 yield from connection.send(json_message) |
69 # store message in memcached_queue |
69 # store message in memcached_queue |
70 if memcached_client is not None: |
70 if cache_handler is not None: |
71 with (yield from queue_lock): |
71 with (yield from queue_lock): |
72 mem_queue = yield from memcached_client.get(queue_key) |
72 mem_queue = yield from cache_handler.get(queue_key) |
73 if mem_queue is None: |
73 if mem_queue is None: |
74 mem_queue = queue |
74 mem_queue = queue |
75 else: |
75 else: |
76 mem_queue = pickle.loads(mem_queue) |
76 mem_queue = pickle.loads(mem_queue) |
77 mem_queue.append(message) |
77 mem_queue.append(message) |
78 yield from memcached_client.set(queue_key, pickle.dumps(mem_queue)) |
78 yield from cache_handler.set(queue_key, pickle.dumps(mem_queue)) |
79 |
79 |
80 @asyncio.coroutine |
80 @asyncio.coroutine |
81 def on_close(self): |
81 def on_close(self): |
82 pass |
82 pass |