equal
deleted
inserted
replaced
10 # FOR A PARTICULAR PURPOSE. |
10 # FOR A PARTICULAR PURPOSE. |
11 # |
11 # |
12 |
12 |
13 __docformat__ = 'restructuredtext' |
13 __docformat__ = 'restructuredtext' |
14 |
14 |
15 |
|
16 # import standard library |
|
17 import asyncio |
15 import asyncio |
18 import collections |
16 import collections |
19 import json |
17 import json |
20 import pickle |
18 import pickle |
21 |
19 |
22 # import interfaces |
|
23 from pyams_cache.interfaces import IAioCacheHandler |
|
24 |
|
25 # import packages |
|
26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
20 from aiopyramid.websocket.config.gunicorn import WebsocketMapper |
27 from aiopyramid.websocket.view import WebsocketConnectionView |
21 from aiopyramid.websocket.view import WebsocketConnectionView |
28 from pyams_cache.cache import get_cache_handler |
|
29 from pyams_notify_ws.subscribe import users, users_lock |
|
30 from pyramid.view import view_config |
22 from pyramid.view import view_config |
31 |
23 |
|
24 from pyams_cache.cache import get_cache_handler |
|
25 from pyams_cache.interfaces import IAioCacheHandler |
|
26 from pyams_notify_ws.subscribe import users, users_lock |
32 |
27 |
33 queue = collections.deque(maxlen=50) |
28 |
|
29 queue = collections.deque(maxlen=100) |
34 queue_key = b'PyAMS:notify:messages_queue' |
30 queue_key = b'PyAMS:notify:messages_queue' |
35 queue_lock = asyncio.Lock() |
31 queue_lock = asyncio.Lock() |
36 |
32 |
37 cache_handler = None |
33 cache_handler = None |
38 |
34 |
69 # don't send notification to emitter |
65 # don't send notification to emitter |
70 if source and (source == subscription.principal['id']): |
66 if source and (source == subscription.principal['id']): |
71 continue |
67 continue |
72 if subscription.filter_target(target): |
68 if subscription.filter_target(target): |
73 await connection.send(json_message) |
69 await connection.send(json_message) |
74 # store message in memcached_queue |
70 # store message in cached queue |
75 if cache_handler is not None: |
71 if cache_handler is not None: |
76 async with queue_lock: |
72 async with queue_lock: |
77 mem_queue = await cache_handler.get(queue_key) |
73 mem_queue = await cache_handler.get(queue_key) |
78 if mem_queue is None: |
74 if mem_queue is None: |
79 mem_queue = queue |
75 mem_queue = queue |