python - Break loop in one Django Channels consumer from another consumer -
i'm building web-app django , django channels utilizes websockets.
when user clicks button in browser, websocket sends data server , consumer on server starts send messages client once per second (in loop).
i want create button stop data sending process. when user clicks on new button, websocket sends data server , consumer on server must somehow stop loop in previous consumer. require stop loop when client disconnects.
i felt desire use global variable. django channels documentation states not recommend use global variables want keep app network transparent (don't understand this).
i've tried use channel session. made second consumer update value in channel session, channel session values didn't updated in first consumer.
here simplified code. browser:
var socket = new websocket("ws://" + window.location.host + "/socket/"); $('#button1').on('click', function() { socket.send(json.stringify({action: 'start_getting_values'})) }); $('#button2').on('click', function() { socket.send(json.stringify({action: 'stop_getting_values'})) });
consumer on server:
@channel_session def ws_message(message): text = json.loads(message.content['text']) if text['action'] == 'start_getting_values': while true: # getting data here # ... message.reply_channel.send({"text": some_data}, immediately=true) time.sleep(1) if text['action'] == 'stop_getting_values': do_something_to_stop_the_loop_above()
well, managed solve task myself after contacted django channels developers.
the approach of making loop inside consumer bad because block site once consumer run amount of times equal amount of threads of workers running consumer.
so approach following: once consumer gets 'start_getting_values' signal, adds current reply channel group increments value on redis server connected (i use redis channel layer backend work on other backend).
what value increments? on redis have key 'groups' of hash object type. each key of key represents group in channels , value represents amount of reply channels in group.
then created new python file connected same redis server. in file run infinite loop loads dict key 'groups' redis. loop through each keys in dict (each key represents channels groups name) , broadcast data each group have non-zero value. when run file, it's run separate process , doesn't block on consumer side.
to stop broadcasting user, when appropriate signal him, remove him group decrement appropriate redis value.
consumer code:
import redis redis_client = redis.strictredis(host='localhost', port=6379, db=0) @channel_session_user def ws_message(message): text = json.loads(message.content['text']) if text['stream'] == 'start_getting_values': value_id = text['value_id'] redis_client.hincrby('redis_some_key', value_id, 1) group(value_id).add(message.reply_channel) channel_session['value_id'] = value_id return 0 if text['stream'] == 'stop_getting_values': if message.channel_session['value_id'] != '': value_id = message.channel_session['value_id'] group(value_id).discard(message.reply_channel) l = redis_client.lock(name='del_lock') val = redis_client.hincrby('redis_some_key', value_id, -1) if (val <= 0): redis_client.hdel('redis_some_key', value_id) l.release() return 0
separate python file:
import time import redis threading import thread import asgi_redis redis_client = redis.strictredis(host='localhost', port=6379, db=0) cl = asgi_redis.redischannellayer() def some_action(value_id): # getting data based on value_id # .... cl.send_group(value_id, { "text": some_data, }) while true: value_ids = redis_client.hgetall('redis_some_key') ths = [] b_value_id in value_ids.keys(): value_id = b_value_id.decode("utf-8") ths.append(thread(target=some_action, args=(value_id,))) th in ths: th.start() th in ths: th.join() time.sleep(1)
Comments
Post a Comment