折腾:
期间,尝试去为了实现全局变量的ws,而要去尝试
在Flask中,用线程或进程间通信,去实现:
某个线程中,当有信息需要发送,则给其他线程发送通知
告知要给哪个用户,发送什么消息。
flask interprocess communication
Raspberry Pi • View topic – General inter-process communication in python
How To Work with the ZeroMQ Messaging Library | DigitalOcean
[Flask] Interchange data with main python application
-》
zeromq或着类似的
或:
RQ: Simple job queues for Python
flask multi process communication
multiple processes – Communicate with long-running Python program – Stack Overflow
Python (Flask) and MQTT listening – Stack Overflow
python – Handling multiple requests in Flask – Stack Overflow
python – Multiprocess within flask app spinning up 2 processes – Stack Overflow
python – Can I serve multiple clients using just Flask app.run() as standalone? – Stack Overflow
-》
Deployment Options — Flask Documentation (0.11)
Multi-processing techniques in Python – Doug Hellmann
Flask 进程间通信
python – flask 进程间通信 – SegmentFault
Python通过Manager方式实现多个无关联进程共享数据 – Xjng – 博客园
gunicorn+flask, 不同的 worker 之间怎么共享一个对象 – V2EX
16.6. multiprocessing — Process-based “threading” interface — Python 2.7.12 documentation
Flask IPC
python – use multiprocessing as local IPC – Stack Overflow
flask redis 通信
flask redis 事件
flask redis 订阅 发布
flask pub sub
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host=’localhost’, db=5) self.ps = self.rcon.pubsub() self.ps.subscribe(‘task:pubsub:channel’) def listen_task(self): for i in self.ps.listen(): if i[‘type’] == ‘message’: print “Task get”, i[‘data’] if __name__ == ‘__main__’: print ‘listen task channel’ Task().listen_task() |
How to implement server push in Flask framework? – Peter Hoffmann
Python redis 发布和订阅 – 奋斗吧 – 51CTO技术博客
Python操作redis的订阅发布功能 – wsyht90的博客 – 51CTO技术博客
python的redis,实用订阅发布简单实用代码 – 我叫河蟹 – 博客频道 – CSDN.NET
Flask-RQ pubsub
Flask RQ pubsub
Flask redis pubsub
python – How to implement server push in Flask framework? – Stack Overflow
chat/app.py at master · jkbrzt/chat
redis pubsub
Redis Pub/Sub… How Does it Work?
Going Real-Time with Redis Pub/Sub | Toptal
然后去实现了基本的redis的pubsub
但是最终却死在了:
ws去send之后,导致SSH链接的SecureCRT中,都停止了
后来看到:
Going Real-Time with Redis Pub/Sub | Toptal
难道是:
此处redis是最好是去用不同的port
去通知publish对应的事情?
然后redis(的整个范围内,部分db的,都可以收到,)就不会堵塞了?
redis pubsub listen blocking
python – Is non-blocking Redis pubsub possible? – Stack Overflow
andymccurdy/redis-py: Redis Python Client
说是已经支持异步的pubsub了?
用的是:
p.get_message() |
但是,即使解决了redis的pubsub的异步,此处也还是不行:
因为此处的代码是:
@sockets.route(‘/users/<userId>/<accesstoken>’) def userWebsocket(ws, userId, accesstoken): while not ws.closed: gLog.info(“call ws %s to receive”, ws) message = ws.receive() eachPublishedMsg = curPubSub.listen() |
在同一个ws的处理代码中,用了while去处理:
ws的receive
(redis的)pubsub的listen
而
ws的receivce也是blocking的。。。
所以再去看看:
ws的receive
能否变成非blocking的
flask sockets non blocking
Question: how to make ws socket receive non-blocking ? · Issue #30 · kennethreitz/flask-sockets
同样问题,但是作者没有解释。。。
flask-sockets receive blocking
flask-sockets ws.receive() blocking
Flask-uWSGI-WebSocket 0.5.2 : Python Package Index
“WebSocket client abstraction with fully non-blocking methods.
receive()
send(msg)
close()
connected”
说是异步的
-》抽空真的可以去试试这个?
python – why is gevent-websocket synchronous? – Stack Overflow
-》
好像是可以通过:
gevent去spawn心得进程(协程?)去处理事情?
【已解决】尝试使用gevent的spawn去新创建协程去异步执行
【总结】
此处,进程间通信,是通过redis的pubsub机制。
具体做法是:
完整代码:
from redis import Redis from rq import Queue, Worker from rq.connections import push_connection redisConnection = Redis.from_url(u’redis://localhost:6379/0′) queue = Queue(connection=redisConnection) pushConnResult = push_connection(redisConnection) gLog.debug(“redisConnection=%s, queue=%s, pushConnResult=%s”, redisConnection, queue, pushConnResult) @sockets.route(‘/users/<userId>/<accesstoken>’) def userWebsocket(ws, userId, accesstoken): wsRoutine = gevent.spawn(listenWsAndSendMsg, ws, userId) gLog.debug(“wsRoutine=%s”, wsRoutine) def listenWsAndSendMsg(ws, userId): gLog.debug(“use redis to listen ws=%s and send message for userId=%s”, ws, userId) curPubSub = redisConnection.pubsub() gLog.debug(“curPubSub=%s”, curPubSub) # curPubSub=<redis.client.PubSub object at 0x7fd17176d950> curSubKey = genWsKey(userId) gLog.debug(“curSubKey=%s”, curSubKey) # curSubKey=develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295 subResp = curPubSub.subscribe(curSubKey) gLog.debug(“subResp=%s”, subResp) # subResp=None while not ws.closed: gLog.info(“start redis pubsub listen for ws=%s of userId=%s”, ws, userId) listenedMessages = curPubSub.listen() gLog.debug(“listenedMessages=%s”, listenedMessages) # listenedMessages=<generator object listen at 0x7fd170ca65f0> for eachPublishedMsg in listenedMessages: gLog.debug(“type(eachPublishedMsg)=%s, eachPublishedMsg=%s”, type(eachPublishedMsg), eachPublishedMsg) # type(eachPublishedMsg)=<type ‘dict’>, eachPublishedMsg={‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295’, ‘data’: 1L} messageType = eachPublishedMsg[“type”] gLog.debug(“messageType=%s”, messageType) if messageType == “message”: messageDataStr = eachPublishedMsg[“data”] gLog.debug(“type(messageDataStr)=%s, messageDataStr=%s”, type(messageDataStr), messageDataStr) # messageDataStr = messageDataStr.replace(“\\n”, “\n”) # gLog.debug(“messageDataStr=%s”, messageDataStr) # eachPublishedMsg={‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295’, ‘data’: 1L} # { # “channel”: “develop|staging|user-cc680b0a-8d04-4f2b-8ad9-c6fefb527861”, # “data”: 1, # “pattern”: null, # “type”: “subscribe” # } # eachPublishedMsg= # { # “channel”: “develop|staging|user-cc680b0a-8d04-4f2b-8ad9-c6fefb527861”, # “data”: “{\n \”data\”: {\n \”errandorId\”: \”\”, \n \”initiatorId\”: \”user-bb22f24e-3c27-4e7b-867a-b855e139b295\”\n }, \n \”event\”: \”Created\”, \n \”id\”: \”task-16c8b56e-37ec-4af8-9fd1-4313874980ad\”, \n \”type\”: \”Task\”\n}”, # “pattern”: null, # “type”: “message” # } gLog.info(“send to user %s message %s”, userId, messageDataStr) wsSendMessage(ws, respMsgDictOrStr=messageDataStr) else: gLog.info(“not send user %s non message type message %s”, userId, eachPublishedMsg) gLog.info(“ws=%s closed for userId=%s, do unsubscribe”, ws, userId) unsubResp = curPubSub.unsubscribe(curSubKey) gLog.debug(“unsubResp=%s”, unsubResp) def wsSendUserMessage(userId, respInfoDict): gLog.debug(“userId=%s, respInfoDict=%s”, userId, respInfoDict) respMessageStr = “” if isinstance(respInfoDict, (str, unicode)): gLog.debug(“supported response string to websocket”) respMessageStr = respInfoDict elif isinstance(respInfoDict, dict): gLog.debug(“auto convert json dict into string”) respMessageStr = jsonToStr(respInfoDict) gLog.debug(“respMessageStr=%s”, respMessageStr) else: gLog.error(“not support send type %s to ws”, type(respMessageStr)) return gLog.debug(“respMessageStr=%s”, respMessageStr) gLog.debug(“redisConnection=%s”, redisConnection) # redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>> curUserPubSub = redisConnection.pubsub() gLog.debug(“curUserPubSub=%s”, curUserPubSub) # curUserPubSub=<redis.client.PubSub object at 0x7fd170c25f90> curUserPubKey = genWsKey(userId) gLog.debug(“curUserPubKey=%s”, curUserPubKey) # curUserPubKey=develop|staging|user-c9353e25-2e28-49c0-ae50-bcc414f1600c gLog.info(“use redis to publish: to=%s, message=%s”, curUserPubKey, respMessageStr) redisConnection.publish(curUserPubKey, respMessageStr) |
其中的核心代码的执行逻辑是:
在用户访问websocket的接口:
/users/<userId>/<accesstoken>
后,执行了:
def userWebsocket(ws, userId, accesstoken):
其中用:
wsRoutine = gevent.spawn(listenWsAndSendMsg, ws, userId)
去spawn孵化,新建了一个协程,
而对应的函数listenWsAndSendMsg中,
先是
curPubSub=redisConnection.pubsub()
去通过redis的实例连接得到了pubsub对象
然后用
curPubSub.subscribe(curSubKey)
去订阅对应的频道channel
之后就是用:
while not ws.closed:
listenedMessages = curPubSub.listen()
去循环调用,当ws没有断开时,就去listen监听pubsub的chanel
注意此处的listen是locking阻塞的,一直运行不停止,直到listen到了数据为止。
而监听到的数据,是从:
wsSendUserMessage
中去:
redisConnection.publish(curUserPubKey, respMessageStr)
发布对应的消息respMessageStr到对应的channel频道curUserPubKey
而wsSendUserMessage的调用,是别的地方,比如新建任务后,想要通知其他用户任务可以去抢单任务了,的时候,会去调用这个wsSendUserMessage,以便于publish对应消息到对应的频道
转载请注明:在路上 » 【已解决】Flask中的线程或进程间通信