--- a/src/pyams_notify_ws/notify.py Fri Dec 08 12:11:21 2017 +0100
+++ b/src/pyams_notify_ws/notify.py Sun Jan 14 14:43:17 2018 +0100
@@ -26,7 +26,7 @@
from aiopyramid.websocket.config.gunicorn import WebsocketMapper
from aiopyramid.websocket.view import WebsocketConnectionView
from pyams_cache.cache import get_cache_handler
-from pyams_notify_ws.subscribe import users
+from pyams_notify_ws.subscribe import users, users_lock
from pyramid.view import view_config
@@ -47,12 +47,10 @@
class WebsocketNotifyView(WebsocketConnectionView):
"""Websocket notify view"""
- @asyncio.coroutine
- def on_open(self):
+ async def on_open(self):
pass
- @asyncio.coroutine
- def on_message(self, message):
+ async def on_message(self, message):
try:
message = json.loads(message)
except ValueError:
@@ -66,23 +64,23 @@
clean_message = message.copy()
clean_message.pop('target')
json_message = json.dumps(clean_message)
- for connection, subscription in users.items():
- # don't send notification to emitter
- if source and (source == subscription.principal['id']):
- continue
- if subscription.filter_target(target):
- yield from connection.send(json_message)
+ async with users_lock:
+ for connection, subscription in users.items():
+ # don't send notification to emitter
+ if source and (source == subscription.principal['id']):
+ continue
+ if subscription.filter_target(target):
+ await connection.send(json_message)
# store message in memcached_queue
if cache_handler is not None:
- with (yield from queue_lock):
- mem_queue = yield from cache_handler.get(queue_key)
+ async with queue_lock:
+ mem_queue = await cache_handler.get(queue_key)
if mem_queue is None:
mem_queue = queue
else:
mem_queue = pickle.loads(mem_queue)
mem_queue.append(message)
- yield from cache_handler.set(queue_key, pickle.dumps(mem_queue))
+ await cache_handler.set(queue_key, pickle.dumps(mem_queue))
- @asyncio.coroutine
- def on_close(self):
+ async def on_close(self):
pass
--- a/src/pyams_notify_ws/subscribe.py Fri Dec 08 12:11:21 2017 +0100
+++ b/src/pyams_notify_ws/subscribe.py Sun Jan 14 14:43:17 2018 +0100
@@ -58,12 +58,10 @@
class WebsocketSubscribeView(WebsocketConnectionView):
"""Websocket subscribe view"""
- @asyncio.coroutine
- def on_open(self):
+ async def on_open(self):
pass
- @asyncio.coroutine
- def on_message(self, message):
+ async def on_message(self, message):
try:
message = json.loads(message)
except ValueError:
@@ -72,18 +70,17 @@
action = message.get('action')
if action == 'subscribe':
if not message.get('principal'):
- yield from self.ws.send(json.dumps({'status': 'error',
- 'message': "Missing 'principal' argument"}))
+ await self.ws.send(json.dumps({'status': 'error',
+ 'message': "Missing 'principal' argument"}))
return
- with (yield from users_lock):
+ async with users_lock:
subscription = users.get(self.ws)
if subscription is None:
subscription = WebsocketSubscription(message.get('principal'))
subscription.update_contexts(message.get('context'))
users[self.ws] = subscription
- @asyncio.coroutine
- def on_close(self):
+ async def on_close(self):
if self.ws in users:
- with (yield from users_lock):
+ async with users_lock:
del users[self.ws]