src/pyams_notify_ws/notify.py
changeset 20 03c3572a16ad
parent 15 6472be90626a
child 25 bdb57ba75245
equal deleted inserted replaced
19:803130da71e5 20:03c3572a16ad
    10 # FOR A PARTICULAR PURPOSE.
    10 # FOR A PARTICULAR PURPOSE.
    11 #
    11 #
    12 
    12 
    13 __docformat__ = 'restructuredtext'
    13 __docformat__ = 'restructuredtext'
    14 
    14 
    15 
       
    16 # import standard library
       
    17 import asyncio
    15 import asyncio
    18 import collections
    16 import collections
    19 import json
    17 import json
    20 import pickle
    18 import pickle
    21 
    19 
    22 # import interfaces
       
    23 from pyams_cache.interfaces import IAioCacheHandler
       
    24 
       
    25 # import packages
       
    26 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    20 from aiopyramid.websocket.config.gunicorn import WebsocketMapper
    27 from aiopyramid.websocket.view import WebsocketConnectionView
    21 from aiopyramid.websocket.view import WebsocketConnectionView
    28 from pyams_cache.cache import get_cache_handler
       
    29 from pyams_notify_ws.subscribe import users, users_lock
       
    30 from pyramid.view import view_config
    22 from pyramid.view import view_config
    31 
    23 
       
    24 from pyams_cache.cache import get_cache_handler
       
    25 from pyams_cache.interfaces import IAioCacheHandler
       
    26 from pyams_notify_ws.subscribe import users, users_lock
    32 
    27 
    33 queue = collections.deque(maxlen=50)
    28 
       
    29 queue = collections.deque(maxlen=100)
    34 queue_key = b'PyAMS:notify:messages_queue'
    30 queue_key = b'PyAMS:notify:messages_queue'
    35 queue_lock = asyncio.Lock()
    31 queue_lock = asyncio.Lock()
    36 
    32 
    37 cache_handler = None
    33 cache_handler = None
    38 
    34 
    69                         # don't send notification to emitter
    65                         # don't send notification to emitter
    70                         if source and (source == subscription.principal['id']):
    66                         if source and (source == subscription.principal['id']):
    71                             continue
    67                             continue
    72                         if subscription.filter_target(target):
    68                         if subscription.filter_target(target):
    73                             await connection.send(json_message)
    69                             await connection.send(json_message)
    74                 # store message in memcached_queue
    70                 # store message in cached queue
    75                 if cache_handler is not None:
    71                 if cache_handler is not None:
    76                     async with queue_lock:
    72                     async with queue_lock:
    77                         mem_queue = await cache_handler.get(queue_key)
    73                         mem_queue = await cache_handler.get(queue_key)
    78                         if mem_queue is None:
    74                         if mem_queue is None:
    79                             mem_queue = queue
    75                             mem_queue = queue