最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【部分解决】Python中实现多线程或多进程中的单例singleton

Python crifan 6899浏览 0评论

折腾:

【未解决】Flask部署到线上生产环境后多实例多线程中无法共享全局变量

期间,需要去想办法用最合适的方式去实现Python中的singleton单例

之前别人是参考:

python – Is there a simple, elegant way to define singletons? – Stack Overflow

最后用的:

<code>class Singleton(type):
    """
    reference: https://stackoverflow.com/questions/31875/is-there-a-simple-elegant-way-to-define-singletons
    """
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(
                Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]
</code>

自己此处再去研究看看:

python singleton

python singleton装饰器

The Singleton — Python 3 Patterns, Recipes and Idioms

Creating a singleton in Python – Stack Overflow

而从:

python – Why is this singleton implementation “not thread safe”? – Stack Overflow

才知道,原来上述那些singleton的实现方式,都不是线程安全的thread safe

-》对于多个线程来说,最终还不能保证一定是单实例的

-》所以要去实现 线程安全 thread-safety的单例Singleton

multithreading – Are Python instance variables thread-safe? – Stack Overflow

multithreading – What operations in Java are considered atomic? – Stack Overflow

What are some common uses for Python decorators? – Stack Overflow

然后目前用代码:

common/ThreadSafeSingleton.py

<code>import functools
import threading

thread_lock = threading.Lock()
print("ThreadSafeSingleton: thread_lock=%s" % thread_lock)

def synchronized(lock):
    """ Synchronization decorator """
    def wrapper(f):
        print("synchronized: wrapper: f=%s, lock=%s" % (f, lock))
        @functools.wraps(f)
        def inner_wrapper(*args, **kw):
            print("functools.wraps: args=%s, kw=%s" % (args, kw))
            with lock:
                return f(*args, **kw)
        print("inner_wrapper%s" % inner_wrapper)
        return inner_wrapper
    return wrapper


# class Singleton(type):
class ThreadSafeSingleton(type):
    _instances = {}

    @synchronized(thread_lock)
    def __call__(cls, *args, **kwargs):
        print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs))
        print("cls._instances=%s" % cls._instances)
        if cls not in cls._instances:
            # cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
            cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
            print("after added _instances: cls._instances=%s" % cls._instances)
        return cls._instances[cls]
</code>

resources/tasks.py

<code>from resources.extensions_celery import celery, celery_logger as log

@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken(isSyncToSingleton=True):
    """celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s", celery)
    # global gMsTtsTokenSingleton

    updatedToken = None

    getMsTokenUrl = settings.MS_GET_TOKEN_URL
    reqHeaders = {
        "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
    }
    log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
    resp = requests.post(getMsTokenUrl, headers=reqHeaders)
    log.info("resp=%s", resp)
    if resp.ok:
        respTokenText = resp.text  # eyxxxxiJ9.xxx.xxx
        log.info("respTokenText=%s", respTokenText)
        # gMsToken = respTokenText
        updatedToken = respTokenText
    else:
        log.error("get ms tts token failed for: reason=%s, text=%s", resp.reason, resp.text)
    # log.info("after refresh: gMsToken=%s", gMsToken)

    log.info("updatedToken=%s", updatedToken)

    if isSyncToSingleton:
        log.info("Sync new token to global singleton for ms token: %s", updatedToken)
        # gMsTtsTokenSingleton.updateToken(updatedToken)
        msTtsTokenSingleton = MsAzureCognitiveToken()
        log.info("gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
        log.info("msTtsTokenSingleton=%s", msTtsTokenSingleton)
        msTtsTokenSingleton.updateToken(updatedToken)

    return updatedToken

@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: celery=%s, sender=%s, kwargs=%s", celery, sender, kwargs)

    sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
                             # celeryRefreshAzureSpeechToken.s(),
                             refreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")


################################################################################
# Global Class
################################################################################
from common.ThreadSafeSingleton import ThreadSafeSingleton

class MsAzureCognitiveToken(metaclass=ThreadSafeSingleton):
    curToken = ""

    def __init__(self):
        # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
        self.curToken = refreshAzureSpeechToken(False)
        log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)

    def updateToken(self, newToken=None):
        log.info("updateToken: newToken=%s, curToken=%s", newToken, self.curToken)
        if newToken:
            self.curToken = newToken
        else:
            self.curToken = refreshAzureSpeechToken(False)
            # refreshAzureSpeechToken(True)
        log.info("after updateToken: curToken=%s", self.curToken)


gMsTtsTokenSingleton = MsAzureCognitiveToken()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)

# for debug singleton
testNewSingleton = MsAzureCognitiveToken()
log.info("new inited testNewSingleton=%s", testNewSingleton)
</code>

去调试是可以单例的:

新建的另外的一个类,在内存中和之前的是一样的:

【后记】

后来Mac本地+PyCharm中或终端中,去本地调试gunicorn:

<code>/Users/crifan/.local/share/virtualenvs/xxx-SCpLPEyZ/bin/gunicorn -c conf/gunicorn/gunicorn_config.py app:app
</code>

结果输出的内容看起来:

和之前一样,也有类似错误:

<code>objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[22768]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
</code>

要么:

也是有多个实例运行

-》感觉这个不是线程安全的单例啊

-》不同的进程,好像会生成不同的单例啊

-》需要抽空去服务器中生产环境中调试看看效果

然后证明,果然是:

不同线程/进程,初始化出来的单例,实际上是不同的:

<code>[2018-08-29 16:48:11,866 INFO tasks.py:114 &lt;module&gt;] inited gMsTtsTokenSingleton=&lt;resources.tasks.MsTtsTokenSingleton object at 0x7f67199b3550&gt;
[2018-08-29 16:48:12,016 INFO tasks.py:114 &lt;module&gt;] inited gMsTtsTokenSingleton=&lt;resources.tasks.MsTtsTokenSingleton object at 0x7f67199b24a8&gt;
[2018-08-29 16:48:12,228 INFO tasks.py:114 &lt;module&gt;] inited gMsTtsTokenSingleton=&lt;resources.tasks.MsTtsTokenSingleton object at 0x7f67199b56a0&gt;
</code>

即:

上述办法:

对于单线程中,的确可以实现单例

但是对于多线程的话,就无法实现单例了。

What are some common uses for Python decorators? – Stack Overflow

Single instance of class in Python – Stack Overflow

Python 中的单例模式 | FunHacks

Python 中 Singleton 的写法及其拓展 – 掘金

python 单例模式—-装饰器实现 – CSDN博客

论Python的单例模式和装饰器 – CSDN博客

python单例装饰器 // wangzhilong’s blog

python multiple thread singleton

multithreading – Python sharing class instance among threads – Stack Overflow

multithreading – Concurrent Singleton Class Python – Stack Overflow

multithreading – Thread-safe Singleton doesn’t work. Python – Stack Overflow

A thread safe implementation of singleton pattern in Python. Based on tornado.ioloop.IOLoop.instance() approach.

好像和现有做法,没啥区别

当Singleton遇到multi-threading – AllenYoung – ITeye博客

是java,不是我要的python的

Python Thread Safe Singleton Pattern | Works (Tips / Assets / Snippets)

再去看看,还是问题依旧:

其中代码中的:

<code>class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton):
    curToken = ""

    def __init__(self):
        # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
        self.curToken = refreshAzureSpeechToken(False)
        log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)

    def updateToken(self, newToken=None):
        log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken)
        if newToken:
            self.curToken = newToken
        else:
            self.curToken = refreshAzureSpeechToken(False)
            # refreshAzureSpeechToken(True)
        log.info("after updateToken: curToken=%s", self.curToken)


gMsTtsTokenSingleton = MsTtsTokenSingleton()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)
</code>

中的updateToken中,对于单个进程:

<code>[2018-08-29 17:15:14,222 INFO tasks.py:104 updateToken] updateToken: self=&lt;resources.tasks.MsTtsTokenSingleton object at 0x7fd806f694a8&gt;, newToken=eyJhbxxxqx
</code>

是正常的:

单个进程中,的确是单例的。

导致代码中token获取异常:

9个线程中,4个线程的token是None:

而之所以None的原因是:

<code>[2018-08-29 17:15:00,086 INFO tasks.py:57 refreshAzureSpeechToken] resp=&lt;Response [429]&gt;
[2018-08-29 17:15:00,087 ERROR tasks.py:65 refreshAzureSpeechToken] from MS_TTS_SECRET_KEY=dfxxx59f, get ms tts token failed for: reason=Too Many Requests, text={ "statusCode": 429, "message": "Rate limit is exceeded. Try again in 1 seconds." }
</code>

对于429,找到定义是:

<code>"Response 429
application/json
{ "statusCode": 429, "message": "Rate limit is exceeded. Try again in 26 seconds." }"
</code>

虽然此处可以暂时通过:

<code>log.error("from MS_TTS_SECRET_KEY=%s, get ms tts token failed for: reason=%s, text=%s",
          settings.MS_TTS_SECRET_KEY, resp.reason, resp.text)

if resp.status_code == 429:
    tryOneMoreDelay = 5
    refreshAzureSpeechToken.apply_async([], countdown=tryOneMoreDelay)
</code>

去规避此问题,但不是根本解决办法。

根本的办法还是要真正确保:

多线程中也是单例

Singleton pattern for Python (single and multi-threaded) – mor krispil

Glossary — Python 3.3.7 documentation

Double-checked locking – Wikipedia

去自己测试 多线程 是否是单例,然后想办法调试并解决,所以去:

【已解决】Python中用多线程thread去测试单例Singleton

难道是线程thread不对?要用多进程?

所以再去试试:

【已解决】Python中用多进程process测试单例Singleton

但是结果:

用gunicorn的多worker部署到线上环境后,又无效:不是单例了,单个类的实例都不同。

所以问题就转为:

【已解决】Flask的gunicorn中多进程多worker如何共享数据或单实例

【总结】

此处用如下代码:

common/ThreadSafeSingleton.py

<code>import functools
import threading

thread_lock = threading.Lock()
print("ThreadSafeSingleton: thread_lock=%s" % thread_lock)

# refer: https://stackoverflow.com/questions/50566934/why-is-this-singleton-implementation-not-thread-safe

def synchronized(lock):
    """ Synchronization decorator """
    def wrapper(f):
        print("synchronized: wrapper: f=%s, lock=%s" % (f, lock))
        @functools.wraps(f)
        def inner_wrapper(*args, **kw):
            print("functools.wraps: args=%s, kw=%s" % (args, kw))
            with lock:
                return f(*args, **kw)
        print("inner_wrapper%s" % inner_wrapper)
        return inner_wrapper
    return wrapper


# class Singleton(type):
class ThreadSafeSingleton(type):
    _instances = {}

    @synchronized(thread_lock)
    def __call__(cls, *args, **kwargs):
        print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs))
        print("cls._instances=%s" % cls._instances)
        if cls not in cls._instances:
            # cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
            cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
            print("after added _instances: cls._instances=%s" % cls._instances)
        return cls._instances[cls]
</code>

然后别的地方调用:

比如logging:

common/FlaskLogSingleton.py

<code>import logging
from logging.handlers import RotatingFileHandler
from conf.app import settings
from common.ThreadSafeSingleton import ThreadSafeSingleton
# from sys import stdout

def init_logger(flask_settings, enableConsole=True):
    print("init_logger")
    flaskAppLogger = logging.getLogger(flask_settings.FLASK_APP_NAME) # &lt;Logger RobotQA (WARNING)&gt;
    print("flaskAppLogger=%s" % flaskAppLogger)
    flaskAppLogger.setLevel(flask_settings.LOG_LEVEL_FILE)

    logFormatter = logging.Formatter(flask_settings.LOG_FORMAT)

    fileHandler = RotatingFileHandler(
        flask_settings.LOG_FILE_FILENAME,
        maxBytes=flask_settings.LOG_FILE_MAX_BYTES,
        backupCount=flask_settings.LOG_FILE_BACKUP_COUNT,
        encoding="UTF-8")
    fileHandler.setLevel(flask_settings.LOG_LEVEL_FILE)
    fileHandler.setFormatter(logFormatter)
    flaskAppLogger.addHandler(fileHandler)

    if enableConsole :
        # define a Handler which writes INFO messages or higher to the sys.stderr
        console = logging.StreamHandler()
        # console = logging.StreamHandler(stdout)
        console.setLevel(flask_settings.LOG_LEVEL_CONSOLE)
        # set a format which is simpler for console use
        formatter = logging.Formatter(
            # fmt=logFormatter)
            # fmt=logFormatter,
            fmt=flask_settings.LOG_FORMAT,
            datefmt=flask_settings.LOG_CONSOLE_DATA_FORMAT)
        # tell the handler to use this format
        console.setFormatter(formatter)
        flaskAppLogger.addHandler(console)

    print("init_logger: after init flaskAppLogger%s" % flaskAppLogger)

    return flaskAppLogger

class LoggerSingleton(metaclass=ThreadSafeSingleton):
    curLog = ""

    def __init__(self):
        self.curLog = init_logger(settings)
        # Note: during __init__, AVOID use log, otherwise will deadlock
        # log.info("LoggerSingleton __init__: curLog=%s", self.curLog)
        print("LoggerSingleton __init__: curLog=%s" % self.curLog)

logSingleton = LoggerSingleton()
log = logSingleton.curLog
log.info("LoggerSingleton inited, logSingleton=%s", logSingleton) # &lt;factory.LoggerSingleton object at 0x10cbcafd0&gt;
log.info("log=%s", log) # &lt;Logger RobotQA (DEBUG)&gt;

# # debug for singleton log
# log2 = LoggerSingleton()
# print("log2=%s" % log2)
</code>

比如,ms的tts的token:

resources/tasks.py

<code>
class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton):
    curToken = ""

    def __init__(self):
        # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
        self.curToken = refreshAzureSpeechToken(False)
        log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)

    def updateToken(self, newToken=None):
        log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken)
        if newToken:
            self.curToken = newToken
        else:
            self.curToken = refreshAzureSpeechToken(False)
            # refreshAzureSpeechToken(True)
        log.info("after updateToken: curToken=%s", self.curToken)


gMsTtsTokenSingleton = MsTtsTokenSingleton()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)

# debug refreshAzureSpeechToken
# newToken = refreshAzureSpeechToken()
# log.info("newToken=%s", newToken)
</code>

现象是:

在本地环境:Mac中PyCharm,中调试期间:

单例都是完美工作的:

不论是log的:log2 = LoggerSingleton()

还是ms的tts的token的:newToken = refreshAzureSpeechToken()

都是和原有的类相同

-》此时已能证明,至少在单进程Process或单线程thread中,上述代码的单例是工作的

-》另外,本地又去调试了上面的MsTtsTokenSingleton的:

多线程thread和多进程Process

代码:

<code>log.info("========== test multiple thread singleton ==========")
import time, threading

def singleThreadDo():
    log.info("---------- singleThreadDo ----------")
    cur_thread = threading.current_thread()
    cur_thread_name = cur_thread.name
    curThreadTokenSingleton = MsTtsTokenSingleton()
    log.info("[%s] cur_thread=%s, curThreadTokenSingleton=%s", cur_thread_name, cur_thread, curThreadTokenSingleton)

max_thread_num = 5
for idx in range(max_thread_num):
    cur_num = idx + 1
    each_thread_name = "T%s" % cur_num
    cur_thread = threading.Thread(target=singleThreadDo, name=each_thread_name)
    log.info("[%d] %s, %s", cur_num, each_thread_name, cur_thread)
    # cur_thread.start()
    cur_thread.run()


log.info("========== test multiple process singleton ==========")
import multiprocessing

def singleProcessDo(cur_num):
    log.info("---------- singleProcessDo ----------")
    cur_process = multiprocessing.current_process()
    curProcessTTokenSingleton = MsTtsTokenSingleton()
    log.info("curProcessTTokenSingleton=%s", curProcessTTokenSingleton)
    log.info("[%d] name=%s, pid=%s, process=%s", cur_num, cur_process.name, cur_process.pid, cur_process)

max_process_num = 5
for idx in range(max_process_num):
    cur_num = idx + 1
    each_process_name = "P%s" % cur_num
    cur_process = multiprocessing.Process(target=singleProcessDo, name=each_process_name, args=(cur_num, ))
    log.info("[%d] name=%s, process=%s", cur_num, each_process_name, cur_process)
    cur_process.start()
</code>

-》结论是:

单例也是工作的:多个线程或进程中的单例都是一个类的实例

-》但是问题来了:

在部署到线上后,多进程或多线程时:

且不论部署方式是:

  • gunicorn的多worker=9个,type为sync -》 多进程=共9个线程

  • gunicorn是单worker=1,type是gevent -》 单进程

    • 加上额外的2个线程:supervisor管理的celery的worker和beat的

    • 共3个线程

上述单例失效:初始化出来的实例,都不同。

且没有很好的办法去解决celery的2个额外的进程导致上述单例失效的办法。

【后记】

后来想到一个,估计是更好的办法:

对于这种,多个Process进程之间共享数据(包括需要读取和修改)的事情,最好还是用通用的数据共享的方式,比如redis

如果是redis,则可以实现一个getToken,setToken/updateToken等函数,内部直接访问redis即可。

且还可以考虑利用redis的高级功能:expired,给set的token设置一个过期的时间,比如9分钟(其中是考虑到ms的tts的token的过期时间是10分钟),然后或许还有个callback,这样就可以去updateToken更新token了。

而此处由于时间限制 + 暂时单例失效但是业务逻辑中3个Process的获取tokne都返回200,都正常,业务逻辑中可以继续正常运行,所以就暂时不去尝试这个办法了。

等之后继续抽空优化,去试试redis的方案。

转载请注明:在路上 » 【部分解决】Python中实现多线程或多进程中的单例singleton

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

网友最新评论 (2)

  1. 如果是Flask启动完成过后需要共享这个数据,可不可以从flask的4个本地全局变量来解决这个问题
    zhe5年前 (2019-07-02)回复
  2. 所以除了使用redis或者数据库或者缓存之类的,没得通过代码实现单例的方法来解决这个问题啰
    zhe5年前 (2019-07-02)回复
92 queries in 0.204 seconds, using 22.16MB memory