Use Python 3.5 async/await coroutines syntax
authorThierry Florac <tflorac@ulthar.net>
Sun, 14 Jan 2018 14:43:17 +0100
changeset 15 6472be90626a
parent 14 dbdf7019502c
child 16 bea8c4b209aa
Use Python 3.5 async/await coroutines syntax
src/pyams_notify_ws/notify.py
src/pyams_notify_ws/subscribe.py
--- a/src/pyams_notify_ws/notify.py	Fri Dec 08 12:11:21 2017 +0100
+++ b/src/pyams_notify_ws/notify.py	Sun Jan 14 14:43:17 2018 +0100
@@ -26,7 +26,7 @@
 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
 from aiopyramid.websocket.view import WebsocketConnectionView
 from pyams_cache.cache import get_cache_handler
-from pyams_notify_ws.subscribe import users
+from pyams_notify_ws.subscribe import users, users_lock
 from pyramid.view import view_config
 
 
@@ -47,12 +47,10 @@
 class WebsocketNotifyView(WebsocketConnectionView):
     """Websocket notify view"""
 
-    @asyncio.coroutine
-    def on_open(self):
+    async def on_open(self):
         pass
 
-    @asyncio.coroutine
-    def on_message(self, message):
+    async def on_message(self, message):
         try:
             message = json.loads(message)
         except ValueError:
@@ -66,23 +64,23 @@
                 clean_message = message.copy()
                 clean_message.pop('target')
                 json_message = json.dumps(clean_message)
-                for connection, subscription in users.items():
-                    # don't send notification to emitter
-                    if source and (source == subscription.principal['id']):
-                        continue
-                    if subscription.filter_target(target):
-                        yield from connection.send(json_message)
+                async with users_lock:
+                    for connection, subscription in users.items():
+                        # don't send notification to emitter
+                        if source and (source == subscription.principal['id']):
+                            continue
+                        if subscription.filter_target(target):
+                            await connection.send(json_message)
                 # store message in memcached_queue
                 if cache_handler is not None:
-                    with (yield from queue_lock):
-                        mem_queue = yield from cache_handler.get(queue_key)
+                    async with queue_lock:
+                        mem_queue = await cache_handler.get(queue_key)
                         if mem_queue is None:
                             mem_queue = queue
                         else:
                             mem_queue = pickle.loads(mem_queue)
                         mem_queue.append(message)
-                        yield from cache_handler.set(queue_key, pickle.dumps(mem_queue))
+                        await cache_handler.set(queue_key, pickle.dumps(mem_queue))
 
-    @asyncio.coroutine
-    def on_close(self):
+    async def on_close(self):
         pass
--- a/src/pyams_notify_ws/subscribe.py	Fri Dec 08 12:11:21 2017 +0100
+++ b/src/pyams_notify_ws/subscribe.py	Sun Jan 14 14:43:17 2018 +0100
@@ -58,12 +58,10 @@
 class WebsocketSubscribeView(WebsocketConnectionView):
     """Websocket subscribe view"""
 
-    @asyncio.coroutine
-    def on_open(self):
+    async def on_open(self):
         pass
 
-    @asyncio.coroutine
-    def on_message(self, message):
+    async def on_message(self, message):
         try:
             message = json.loads(message)
         except ValueError:
@@ -72,18 +70,17 @@
             action = message.get('action')
             if action == 'subscribe':
                 if not message.get('principal'):
-                    yield from self.ws.send(json.dumps({'status': 'error',
-                                                        'message': "Missing 'principal' argument"}))
+                    await self.ws.send(json.dumps({'status': 'error',
+                                                   'message': "Missing 'principal' argument"}))
                     return
-                with (yield from users_lock):
+                async with users_lock:
                     subscription = users.get(self.ws)
                     if subscription is None:
                         subscription = WebsocketSubscription(message.get('principal'))
                     subscription.update_contexts(message.get('context'))
                     users[self.ws] = subscription
 
-    @asyncio.coroutine
-    def on_close(self):
+    async def on_close(self):
         if self.ws in users:
-            with (yield from users_lock):
+            async with users_lock:
                 del users[self.ws]