折腾:
【未解决】Flask部署到线上生产环境后多实例多线程中无法共享全局变量
期间,需要去想办法用最合适的方式去实现Python中的singleton单例
之前别人是参考:
python – Is there a simple, elegant way to define singletons? – Stack Overflow
最后用的:
<code>class Singleton(type): """ reference: https://stackoverflow.com/questions/31875/is-there-a-simple-elegant-way-to-define-singletons """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super( Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] </code>
自己此处再去研究看看:
python singleton
python singleton装饰器
The Singleton — Python 3 Patterns, Recipes and Idioms
Creating a singleton in Python – Stack Overflow
而从:
python – Why is this singleton implementation “not thread safe”? – Stack Overflow
才知道,原来上述那些singleton的实现方式,都不是线程安全的thread safe
-》对于多个线程来说,最终还不能保证一定是单实例的
-》所以要去实现 线程安全 thread-safety的单例Singleton
multithreading – Are Python instance variables thread-safe? – Stack Overflow
multithreading – What operations in Java are considered atomic? – Stack Overflow
What are some common uses for Python decorators? – Stack Overflow
然后目前用代码:
common/ThreadSafeSingleton.py
<code>import functools import threading thread_lock = threading.Lock() print("ThreadSafeSingleton: thread_lock=%s" % thread_lock) def synchronized(lock): """ Synchronization decorator """ def wrapper(f): print("synchronized: wrapper: f=%s, lock=%s" % (f, lock)) @functools.wraps(f) def inner_wrapper(*args, **kw): print("functools.wraps: args=%s, kw=%s" % (args, kw)) with lock: return f(*args, **kw) print("inner_wrapper%s" % inner_wrapper) return inner_wrapper return wrapper # class Singleton(type): class ThreadSafeSingleton(type): _instances = {} @synchronized(thread_lock) def __call__(cls, *args, **kwargs): print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs)) print("cls._instances=%s" % cls._instances) if cls not in cls._instances: # cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs) print("after added _instances: cls._instances=%s" % cls._instances) return cls._instances[cls] </code>
resources/tasks.py
<code>from resources.extensions_celery import celery, celery_logger as log @celery.task # def celeryRefreshAzureSpeechToken(): def refreshAzureSpeechToken(isSyncToSingleton=True): """celery's task: refresh microsoft azure speech token key for later call tts/ASR api""" log.info("celeryRefreshAzureSpeechToken: celery=%s", celery) # global gMsTtsTokenSingleton updatedToken = None getMsTokenUrl = settings.MS_GET_TOKEN_URL reqHeaders = { "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY } log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders) resp = requests.post(getMsTokenUrl, headers=reqHeaders) log.info("resp=%s", resp) if resp.ok: respTokenText = resp.text # eyxxxxiJ9.xxx.xxx log.info("respTokenText=%s", respTokenText) # gMsToken = respTokenText updatedToken = respTokenText else: log.error("get ms tts token failed for: reason=%s, text=%s", resp.reason, resp.text) # log.info("after refresh: gMsToken=%s", gMsToken) log.info("updatedToken=%s", updatedToken) if isSyncToSingleton: log.info("Sync new token to global singleton for ms token: %s", updatedToken) # gMsTtsTokenSingleton.updateToken(updatedToken) msTtsTokenSingleton = MsAzureCognitiveToken() log.info("gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton) log.info("msTtsTokenSingleton=%s", msTtsTokenSingleton) msTtsTokenSingleton.updateToken(updatedToken) return updatedToken @celery.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: celery=%s, sender=%s, kwargs=%s", celery, sender, kwargs) sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL, # celeryRefreshAzureSpeechToken.s(), refreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes") ################################################################################ # Global Class ################################################################################ from common.ThreadSafeSingleton import ThreadSafeSingleton class MsAzureCognitiveToken(metaclass=ThreadSafeSingleton): curToken = "" def __init__(self): # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call self.curToken = refreshAzureSpeechToken(False) log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken) def updateToken(self, newToken=None): log.info("updateToken: newToken=%s, curToken=%s", newToken, self.curToken) if newToken: self.curToken = newToken else: self.curToken = refreshAzureSpeechToken(False) # refreshAzureSpeechToken(True) log.info("after updateToken: curToken=%s", self.curToken) gMsTtsTokenSingleton = MsAzureCognitiveToken() log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton) log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken) # for debug singleton testNewSingleton = MsAzureCognitiveToken() log.info("new inited testNewSingleton=%s", testNewSingleton) </code>
去调试是可以单例的:
新建的另外的一个类,在内存中和之前的是一样的:
【后记】
后来Mac本地+PyCharm中或终端中,去本地调试gunicorn:
<code>/Users/crifan/.local/share/virtualenvs/xxx-SCpLPEyZ/bin/gunicorn -c conf/gunicorn/gunicorn_config.py app:app </code>
结果输出的内容看起来:
和之前一样,也有类似错误:
<code>objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug. objc[22768]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. </code>
要么:
也是有多个实例运行
-》感觉这个不是线程安全的单例啊
-》不同的进程,好像会生成不同的单例啊
-》需要抽空去服务器中生产环境中调试看看效果
然后证明,果然是:
不同线程/进程,初始化出来的单例,实际上是不同的:
<code>[2018-08-29 16:48:11,866 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b3550> [2018-08-29 16:48:12,016 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b24a8> [2018-08-29 16:48:12,228 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b56a0> </code>
即:
上述办法:
对于单线程中,的确可以实现单例
但是对于多线程的话,就无法实现单例了。
What are some common uses for Python decorators? – Stack Overflow
Single instance of class in Python – Stack Overflow
Python 中 Singleton 的写法及其拓展 – 掘金
python单例装饰器 // wangzhilong’s blog
python multiple thread singleton
multithreading – Python sharing class instance among threads – Stack Overflow
multithreading – Concurrent Singleton Class Python – Stack Overflow
multithreading – Thread-safe Singleton doesn’t work. Python – Stack Overflow
好像和现有做法,没啥区别
当Singleton遇到multi-threading – AllenYoung – ITeye博客
是java,不是我要的python的
Python Thread Safe Singleton Pattern | Works (Tips / Assets / Snippets)
再去看看,还是问题依旧:
其中代码中的:
<code>class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton): curToken = "" def __init__(self): # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call self.curToken = refreshAzureSpeechToken(False) log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken) def updateToken(self, newToken=None): log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken) if newToken: self.curToken = newToken else: self.curToken = refreshAzureSpeechToken(False) # refreshAzureSpeechToken(True) log.info("after updateToken: curToken=%s", self.curToken) gMsTtsTokenSingleton = MsTtsTokenSingleton() log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton) log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken) </code>
中的updateToken中,对于单个进程:
<code>[2018-08-29 17:15:14,222 INFO tasks.py:104 updateToken] updateToken: self=<resources.tasks.MsTtsTokenSingleton object at 0x7fd806f694a8>, newToken=eyJhbxxxqx </code>
是正常的:
单个进程中,的确是单例的。
导致代码中token获取异常:
9个线程中,4个线程的token是None:
而之所以None的原因是:
<code>[2018-08-29 17:15:00,086 INFO tasks.py:57 refreshAzureSpeechToken] resp=<Response [429]> [2018-08-29 17:15:00,087 ERROR tasks.py:65 refreshAzureSpeechToken] from MS_TTS_SECRET_KEY=dfxxx59f, get ms tts token failed for: reason=Too Many Requests, text={ "statusCode": 429, "message": "Rate limit is exceeded. Try again in 1 seconds." } </code>
对于429,找到定义是:
<code>"Response 429 application/json { "statusCode": 429, "message": "Rate limit is exceeded. Try again in 26 seconds." }" </code>
虽然此处可以暂时通过:
<code>log.error("from MS_TTS_SECRET_KEY=%s, get ms tts token failed for: reason=%s, text=%s", settings.MS_TTS_SECRET_KEY, resp.reason, resp.text) if resp.status_code == 429: tryOneMoreDelay = 5 refreshAzureSpeechToken.apply_async([], countdown=tryOneMoreDelay) </code>
去规避此问题,但不是根本解决办法。
根本的办法还是要真正确保:
多线程中也是单例
Singleton pattern for Python (single and multi-threaded) – mor krispil
Glossary — Python 3.3.7 documentation
Double-checked locking – Wikipedia
去自己测试 多线程 是否是单例,然后想办法调试并解决,所以去:
【已解决】Python中用多线程thread去测试单例Singleton
难道是线程thread不对?要用多进程?
所以再去试试:
【已解决】Python中用多进程process测试单例Singleton
但是结果:
用gunicorn的多worker部署到线上环境后,又无效:不是单例了,单个类的实例都不同。
所以问题就转为:
【已解决】Flask的gunicorn中多进程多worker如何共享数据或单实例
【总结】
此处用如下代码:
common/ThreadSafeSingleton.py
<code>import functools import threading thread_lock = threading.Lock() print("ThreadSafeSingleton: thread_lock=%s" % thread_lock) # refer: https://stackoverflow.com/questions/50566934/why-is-this-singleton-implementation-not-thread-safe def synchronized(lock): """ Synchronization decorator """ def wrapper(f): print("synchronized: wrapper: f=%s, lock=%s" % (f, lock)) @functools.wraps(f) def inner_wrapper(*args, **kw): print("functools.wraps: args=%s, kw=%s" % (args, kw)) with lock: return f(*args, **kw) print("inner_wrapper%s" % inner_wrapper) return inner_wrapper return wrapper # class Singleton(type): class ThreadSafeSingleton(type): _instances = {} @synchronized(thread_lock) def __call__(cls, *args, **kwargs): print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs)) print("cls._instances=%s" % cls._instances) if cls not in cls._instances: # cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs) print("after added _instances: cls._instances=%s" % cls._instances) return cls._instances[cls] </code>
然后别的地方调用:
比如logging:
common/FlaskLogSingleton.py
<code>import logging from logging.handlers import RotatingFileHandler from conf.app import settings from common.ThreadSafeSingleton import ThreadSafeSingleton # from sys import stdout def init_logger(flask_settings, enableConsole=True): print("init_logger") flaskAppLogger = logging.getLogger(flask_settings.FLASK_APP_NAME) # <Logger RobotQA (WARNING)> print("flaskAppLogger=%s" % flaskAppLogger) flaskAppLogger.setLevel(flask_settings.LOG_LEVEL_FILE) logFormatter = logging.Formatter(flask_settings.LOG_FORMAT) fileHandler = RotatingFileHandler( flask_settings.LOG_FILE_FILENAME, maxBytes=flask_settings.LOG_FILE_MAX_BYTES, backupCount=flask_settings.LOG_FILE_BACKUP_COUNT, encoding="UTF-8") fileHandler.setLevel(flask_settings.LOG_LEVEL_FILE) fileHandler.setFormatter(logFormatter) flaskAppLogger.addHandler(fileHandler) if enableConsole : # define a Handler which writes INFO messages or higher to the sys.stderr console = logging.StreamHandler() # console = logging.StreamHandler(stdout) console.setLevel(flask_settings.LOG_LEVEL_CONSOLE) # set a format which is simpler for console use formatter = logging.Formatter( # fmt=logFormatter) # fmt=logFormatter, fmt=flask_settings.LOG_FORMAT, datefmt=flask_settings.LOG_CONSOLE_DATA_FORMAT) # tell the handler to use this format console.setFormatter(formatter) flaskAppLogger.addHandler(console) print("init_logger: after init flaskAppLogger%s" % flaskAppLogger) return flaskAppLogger class LoggerSingleton(metaclass=ThreadSafeSingleton): curLog = "" def __init__(self): self.curLog = init_logger(settings) # Note: during __init__, AVOID use log, otherwise will deadlock # log.info("LoggerSingleton __init__: curLog=%s", self.curLog) print("LoggerSingleton __init__: curLog=%s" % self.curLog) logSingleton = LoggerSingleton() log = logSingleton.curLog log.info("LoggerSingleton inited, logSingleton=%s", logSingleton) # <factory.LoggerSingleton object at 0x10cbcafd0> log.info("log=%s", log) # <Logger RobotQA (DEBUG)> # # debug for singleton log # log2 = LoggerSingleton() # print("log2=%s" % log2) </code>
比如,ms的tts的token:
resources/tasks.py
<code> class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton): curToken = "" def __init__(self): # Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call self.curToken = refreshAzureSpeechToken(False) log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken) def updateToken(self, newToken=None): log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken) if newToken: self.curToken = newToken else: self.curToken = refreshAzureSpeechToken(False) # refreshAzureSpeechToken(True) log.info("after updateToken: curToken=%s", self.curToken) gMsTtsTokenSingleton = MsTtsTokenSingleton() log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton) log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken) # debug refreshAzureSpeechToken # newToken = refreshAzureSpeechToken() # log.info("newToken=%s", newToken) </code>
现象是:
在本地环境:Mac中PyCharm,中调试期间:
单例都是完美工作的:
不论是log的:log2 = LoggerSingleton()
还是ms的tts的token的:newToken = refreshAzureSpeechToken()
都是和原有的类相同
-》此时已能证明,至少在单进程Process或单线程thread中,上述代码的单例是工作的
-》另外,本地又去调试了上面的MsTtsTokenSingleton的:
多线程thread和多进程Process
代码:
<code>log.info("========== test multiple thread singleton ==========") import time, threading def singleThreadDo(): log.info("---------- singleThreadDo ----------") cur_thread = threading.current_thread() cur_thread_name = cur_thread.name curThreadTokenSingleton = MsTtsTokenSingleton() log.info("[%s] cur_thread=%s, curThreadTokenSingleton=%s", cur_thread_name, cur_thread, curThreadTokenSingleton) max_thread_num = 5 for idx in range(max_thread_num): cur_num = idx + 1 each_thread_name = "T%s" % cur_num cur_thread = threading.Thread(target=singleThreadDo, name=each_thread_name) log.info("[%d] %s, %s", cur_num, each_thread_name, cur_thread) # cur_thread.start() cur_thread.run() log.info("========== test multiple process singleton ==========") import multiprocessing def singleProcessDo(cur_num): log.info("---------- singleProcessDo ----------") cur_process = multiprocessing.current_process() curProcessTTokenSingleton = MsTtsTokenSingleton() log.info("curProcessTTokenSingleton=%s", curProcessTTokenSingleton) log.info("[%d] name=%s, pid=%s, process=%s", cur_num, cur_process.name, cur_process.pid, cur_process) max_process_num = 5 for idx in range(max_process_num): cur_num = idx + 1 each_process_name = "P%s" % cur_num cur_process = multiprocessing.Process(target=singleProcessDo, name=each_process_name, args=(cur_num, )) log.info("[%d] name=%s, process=%s", cur_num, each_process_name, cur_process) cur_process.start() </code>
-》结论是:
单例也是工作的:多个线程或进程中的单例都是一个类的实例
-》但是问题来了:
在部署到线上后,多进程或多线程时:
且不论部署方式是:
gunicorn的多worker=9个,type为sync -》 多进程=共9个线程
gunicorn是单worker=1,type是gevent -》 单进程
加上额外的2个线程:supervisor管理的celery的worker和beat的
共3个线程
上述单例失效:初始化出来的实例,都不同。
且没有很好的办法去解决celery的2个额外的进程导致上述单例失效的办法。
【后记】
后来想到一个,估计是更好的办法:
对于这种,多个Process进程之间共享数据(包括需要读取和修改)的事情,最好还是用通用的数据共享的方式,比如redis
如果是redis,则可以实现一个getToken,setToken/updateToken等函数,内部直接访问redis即可。
且还可以考虑利用redis的高级功能:expired,给set的token设置一个过期的时间,比如9分钟(其中是考虑到ms的tts的token的过期时间是10分钟),然后或许还有个callback,这样就可以去updateToken更新token了。
而此处由于时间限制 + 暂时单例失效但是业务逻辑中3个Process的获取tokne都返回200,都正常,业务逻辑中可以继续正常运行,所以就暂时不去尝试这个办法了。
等之后继续抽空优化,去试试redis的方案。