最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】Flask中的线程或进程间通信

Flask crifan 7207浏览 0评论

折腾:

【无法解决】Flask中用pickle去保存Websocket对象出错:TypeError a class that defines __slots__ without defining __getstate__ cannot be pickled

期间,尝试去为了实现全局变量的ws,而要去尝试

在Flask中,用线程或进程间通信,去实现:

某个线程中,当有信息需要发送,则给其他线程发送通知

告知要给哪个用户,发送什么消息。

flask interprocess communication

With Python Flask how do you handle a web app that requires the same form on many pages but storing the data in different fields in the database? DRY? – Quora

Raspberry Pi • View topic – General inter-process communication in python

How To Work with the ZeroMQ Messaging Library | DigitalOcean

[Flask] Interchange data with main python application

-》

zeromq或着类似的

或:

RQ: Simple job queues for Python

flask multi process  communication

multiple processes – Communicate with long-running Python program – Stack Overflow

Python (Flask) and MQTT listening – Stack Overflow

python – Handling multiple requests in Flask – Stack Overflow

python – Multiprocess within flask app spinning up 2 processes – Stack Overflow

python – Can I serve multiple clients using just Flask app.run() as standalone? – Stack Overflow

-》

Deployment Options — Flask Documentation (0.11)

Multi-processing techniques in Python – Doug Hellmann

Flask 进程间通信

python – flask 进程间通信 – SegmentFault

Python通过Manager方式实现多个无关联进程共享数据 – Xjng – 博客园

gunicorn+flask, 不同的 worker 之间怎么共享一个对象 – V2EX

16.6. multiprocessing — Process-based “threading” interface — Python 2.7.12 documentation

Flask IPC

python – use multiprocessing as local IPC – Stack Overflow

flask redis 通信

flask redis 事件

flask redis 订阅 发布

flask pub sub

Redis实现简单消息队列 – 简书

import redis
class Task(object):
    def __init__(self):
        self.rcon = redis.StrictRedis(host=’localhost’, db=5)
        self.ps = self.rcon.pubsub()
        self.ps.subscribe(‘task:pubsub:channel’)
    def listen_task(self):
        for i in self.ps.listen():
            if i[‘type’] == ‘message’:
                print “Task get”, i[‘data’]
if __name__ == ‘__main__’:
    print ‘listen task channel’
    Task().listen_task()

How to implement server push in Flask framework? – Peter Hoffmann

Python redis 发布和订阅 – 奋斗吧 – 51CTO技术博客

Python操作redis的订阅发布功能 – wsyht90的博客 – 51CTO技术博客

python的redis,实用订阅发布简单实用代码 – 我叫河蟹 – 博客频道 – CSDN.NET

Flask-RQ  pubsub

Flask RQ pubsub

Flask redis pubsub

python – How to implement server push in Flask framework? – Stack Overflow

chat/app.py at master · jkbrzt/chat

redis pubsub

Pub/Sub – Redis

PUBSUB – Redis

Redis Pub/Sub… How Does it Work?

Going Real-Time with Redis Pub/Sub | Toptal

然后去实现了基本的redis的pubsub

但是最终却死在了:

ws去send之后,导致SSH链接的SecureCRT中,都停止了

后来看到:

Going Real-Time with Redis Pub/Sub | Toptal

难道是:

此处redis是最好是去用不同的port

去通知publish对应的事情?

然后redis(的整个范围内,部分db的,都可以收到,)就不会堵塞了?

redis pubsub listen blocking

python – Is non-blocking Redis pubsub possible? – Stack Overflow

andymccurdy/redis-py: Redis Python Client

说是已经支持异步的pubsub了?

用的是:

p.get_message()

但是,即使解决了redis的pubsub的异步,此处也还是不行:

因为此处的代码是:

@sockets.route(‘/users/<userId>/<accesstoken>’)
def userWebsocket(ws, userId, accesstoken):
    while not ws.closed:
        gLog.info(“call ws %s to receive”, ws)
        message = ws.receive()
        eachPublishedMsg = curPubSub.listen()

在同一个ws的处理代码中,用了while去处理:

ws的receive

(redis的)pubsub的listen

ws的receivce也是blocking的。。。

所以再去看看:

ws的receive

能否变成非blocking的

flask sockets non blocking

Question: how to make ws socket receive non-blocking ? · Issue #30 · kennethreitz/flask-sockets

同样问题,但是作者没有解释。。。

flask-sockets receive blocking

flask-sockets ws.receive() blocking

Flask-uWSGI-WebSocket 0.5.2 : Python Package Index

“WebSocket client abstraction with fully non-blocking methods.

receive()

send(msg)

close()

connected”

说是异步的

-》抽空真的可以去试试这个?

python – why is gevent-websocket synchronous? – Stack Overflow

-》

好像是可以通过:

gevent去spawn心得进程(协程?)去处理事情?

【已解决】尝试使用gevent的spawn去新创建协程去异步执行

【总结】

此处,进程间通信,是通过redis的pubsub机制。

具体做法是:

完整代码:

from redis import Redis
from rq import Queue, Worker
from rq.connections import push_connection
redisConnection = Redis.from_url(u’redis://localhost:6379/0′)
queue = Queue(connection=redisConnection)
pushConnResult = push_connection(redisConnection)
gLog.debug(“redisConnection=%s, queue=%s, pushConnResult=%s”, redisConnection, queue, pushConnResult)
@sockets.route(‘/users/<userId>/<accesstoken>’)
def userWebsocket(ws, userId, accesstoken):
    wsRoutine = gevent.spawn(listenWsAndSendMsg, ws, userId)
    gLog.debug(“wsRoutine=%s”, wsRoutine)
def listenWsAndSendMsg(ws, userId):
    gLog.debug(“use redis to listen ws=%s and send message for userId=%s”, ws, userId)
    curPubSub = redisConnection.pubsub()
    gLog.debug(“curPubSub=%s”, curPubSub)
    # curPubSub=<redis.client.PubSub object at 0x7fd17176d950>
    curSubKey = genWsKey(userId)
    gLog.debug(“curSubKey=%s”, curSubKey)
    # curSubKey=develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295
    subResp = curPubSub.subscribe(curSubKey)
    gLog.debug(“subResp=%s”, subResp)
    # subResp=None
    while not ws.closed:
        gLog.info(“start redis pubsub listen for ws=%s of userId=%s”, ws, userId)
        listenedMessages = curPubSub.listen()
        gLog.debug(“listenedMessages=%s”, listenedMessages)
        # listenedMessages=<generator object listen at 0x7fd170ca65f0>
        for eachPublishedMsg in listenedMessages:
            gLog.debug(“type(eachPublishedMsg)=%s, eachPublishedMsg=%s”, type(eachPublishedMsg), eachPublishedMsg)
            # type(eachPublishedMsg)=<type ‘dict’>, eachPublishedMsg={‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295’, ‘data’: 1L}
            messageType = eachPublishedMsg[“type”]
            gLog.debug(“messageType=%s”, messageType)
            if messageType == “message”:
                messageDataStr = eachPublishedMsg[“data”]
                gLog.debug(“type(messageDataStr)=%s, messageDataStr=%s”, type(messageDataStr), messageDataStr)
                # messageDataStr = messageDataStr.replace(“\\n”, “\n”)
                # gLog.debug(“messageDataStr=%s”, messageDataStr)
                # eachPublishedMsg={‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘develop|staging|user-bb22f24e-3c27-4e7b-867a-b855e139b295’, ‘data’: 1L}
                # {
                #     “channel”: “develop|staging|user-cc680b0a-8d04-4f2b-8ad9-c6fefb527861”,
                #     “data”: 1,
                #     “pattern”: null,
                #     “type”: “subscribe”
                # }
                # eachPublishedMsg=
                # {
                #     “channel”: “develop|staging|user-cc680b0a-8d04-4f2b-8ad9-c6fefb527861”,
                #     “data”: “{\n  \”data\”: {\n    \”errandorId\”: \”\”, \n    \”initiatorId\”: \”user-bb22f24e-3c27-4e7b-867a-b855e139b295\”\n  }, \n  \”event\”: \”Created\”, \n  \”id\”: \”task-16c8b56e-37ec-4af8-9fd1-4313874980ad\”, \n  \”type\”: \”Task\”\n}”,
                #     “pattern”: null,
                #     “type”: “message”
                # }
                gLog.info(“send to user %s message %s”, userId, messageDataStr)
                wsSendMessage(ws, respMsgDictOrStr=messageDataStr)
            else:
                gLog.info(“not send user %s non message type message %s”, userId, eachPublishedMsg)
    gLog.info(“ws=%s closed for userId=%s, do unsubscribe”, ws, userId)
    unsubResp = curPubSub.unsubscribe(curSubKey)
    gLog.debug(“unsubResp=%s”, unsubResp)
def wsSendUserMessage(userId, respInfoDict):
    gLog.debug(“userId=%s, respInfoDict=%s”, userId, respInfoDict)
    respMessageStr = “”
    if isinstance(respInfoDict, (str, unicode)):
        gLog.debug(“supported response string to websocket”)
        respMessageStr = respInfoDict
    elif isinstance(respInfoDict, dict):
        gLog.debug(“auto convert json dict into string”)
        respMessageStr = jsonToStr(respInfoDict)
        gLog.debug(“respMessageStr=%s”, respMessageStr)
    else:
        gLog.error(“not support send type %s to ws”, type(respMessageStr))
        return
    gLog.debug(“respMessageStr=%s”, respMessageStr)
    gLog.debug(“redisConnection=%s”, redisConnection)
    # redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
    curUserPubSub = redisConnection.pubsub()
    gLog.debug(“curUserPubSub=%s”, curUserPubSub)
    # curUserPubSub=<redis.client.PubSub object at 0x7fd170c25f90>
    curUserPubKey = genWsKey(userId)
    gLog.debug(“curUserPubKey=%s”, curUserPubKey)
    # curUserPubKey=develop|staging|user-c9353e25-2e28-49c0-ae50-bcc414f1600c
    gLog.info(“use redis to publish: to=%s, message=%s”, curUserPubKey, respMessageStr)
    redisConnection.publish(curUserPubKey, respMessageStr)

其中的核心代码的执行逻辑是:

在用户访问websocket的接口:

/users/<userId>/<accesstoken>

后,执行了:

def userWebsocket(ws, userId, accesstoken):

其中用:

wsRoutine = gevent.spawn(listenWsAndSendMsg, ws, userId)

去spawn孵化,新建了一个协程,

而对应的函数listenWsAndSendMsg中,

先是

curPubSub=redisConnection.pubsub()

去通过redis的实例连接得到了pubsub对象

然后用

curPubSub.subscribe(curSubKey)

去订阅对应的频道channel

之后就是用:

while not ws.closed:

    listenedMessages = curPubSub.listen()

去循环调用,当ws没有断开时,就去listen监听pubsub的chanel

注意此处的listen是locking阻塞的,一直运行不停止,直到listen到了数据为止。

而监听到的数据,是从:

wsSendUserMessage

中去:

redisConnection.publish(curUserPubKey, respMessageStr)

发布对应的消息respMessageStr到对应的channel频道curUserPubKey

而wsSendUserMessage的调用,是别的地方,比如新建任务后,想要通知其他用户任务可以去抢单任务了,的时候,会去调用这个wsSendUserMessage,以便于publish对应消息到对应的频道

转载请注明:在路上 » 【已解决】Flask中的线程或进程间通信

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
89 queries in 0.210 seconds, using 22.23MB memory