折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | celeryApp = Celery(app.name, broker = app.config[ 'CELERY_BROKER_URL' ]) celeryApp.conf.update(app.config) log.info( "celeryApp=%s" , celeryApp) #---------------------------------------- # Celery tasks #---------------------------------------- # @celeryApp.task() @celeryApp .task # @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ global log log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celeryApp.tasks=%s", celeryApp.tasks) @celeryApp .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celeryApp .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
1 2 3 4 5 6 7 8 9 10 | from celery import Celery from config import config, Config celery = Celery(__name__, broker = Config.CELERY_BROKER_URL) def create_app(config_name): # ... celery.conf.update(app.config) # ... return app |
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
在Celery中使用Flask的上下文
目前写成:
resources/celery_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import os from app import app, celery, log from resources.tts import refreshAzureSpeechToken #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celery.tasks=%s", celery.tasks) |
app.py
1 2 3 4 | from factory import celery app = create_app(settings) log.debug( "celery=%s" , celery) |
factory.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | from celery import Celery celery = Celery() print ( "celery=%" % celery) def create_app(config_object): app = Flask(__name__) CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) create_extensions(app) def create_extensions(app): global log, mongo, fsCollection, api, celery log = create_log(app) ... celery = create_celery(app) log.info( "celery=%s" , celery) return app |
待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | import os # from app import celery # from app import app, log # from flask import current_app as app # from factory import celery from flask import g from resources.tts import refreshAzureSpeechToken app = g.app log = g.log celery = g.celery #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # log = app.logger with celery.app.app_context(): log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" ... @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" with celery.app.app_context(): refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): with celery.app.app_context(): # log = app.logger log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
而app.py是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from conf.app import settings from factory import create_app app = create_app(settings) # register_extensions(app) log = app.logger if __name__ = = "__main__" : app.run( host = app.config[ "FLASK_HOST" ], port = app.config[ "FLASK_PORT" ], debug = app.config[ "DEBUG" ], use_reloader = False ) |
factory.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | import os from flask import Flask ... from celery import Celery from flask import g ################################################################################ # Global Function ################################################################################ def create_app(config_object): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log ... register_extensions(app) return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log ... celery = create_celery(app) g.celery = celery log.info( "celery=%s" , celery) ... return app ... def create_celery(app): celery = Celery(app.name, broker = app.config[ 'CELERY_BROKER_URL' ]) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__( self , * args, * * kwargs): with app.app_context(): g.log.info( "in celery ContextTask __call__: args=%s, kwargs=%s" , args, kwargs) return TaskBase.__call__( self , * args, * * kwargs) celery.Task = ContextTask return celery ... |
目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
1 2 | # from flask_celery import Celery from celery import Celery |
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
1 2 3 | # for Flask-Celery-Helper # CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', ) # CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', ) |
-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | ➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── app.py ... ├── conf │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython - 36.pyc │ ├── app │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython - 36.pyc │ │ │ └── settings.cpython - 36.pyc │ │ ├── development │ │ │ └── __init__.py │ │ ├── production │ │ │ └── __init__.py │ │ └── settings.py ... ├── factory.py ├── resources │ ├── __init__.py │ ├── asr.py │ ├── extensions_celery.py │ ├── files.py │ ├── qa.py │ ├── tasks.py │ └── tts.py ... |
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
1 2 3 4 5 6 7 8 9 | # from flask_celery import Celery from conf.app import settings from celery import Celery from celery.utils.log import get_task_logger # celery = Celery() celery = Celery(settings.FLASK_APP_NAME, broker = settings.CELERY_BROKER_URL) celery_logger = get_task_logger(__name__) print ( "in extensions_celery: celery=%s" % celery) |
(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
- 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
- log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
1 | from resources.tasks import deleteTmpAudioFile |
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
- 把app.config[“xxx”] 改为settings.xxx了
- -》而单独引入settings不会产生循环导入
- 用另外的办法去获得log
- 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | import os # from resources.extensions_celery import celery # print("celery=%s" % celery) # print("celery.app=%s" % celery.app) # print("celery.app.log=%s" % celery.app.log) # print("celery.log=%s" % celery.log) from conf.app import settings # try: # from flask import g # print("tasks: import flask g ok") # print("g=%s" % g) # log = g.log # print("log=%s" % log) # except RuntimeError as err: # # except: # print("tasks: failed to import flask g, err=%s" % err) # from factory import create_app # print("tasks: import create_app ok") # app = create_app(settings) # print("create_app ok, app=%s" % app) # log = app.logger # print("log=%s" % log) # import logging # log = logging.getLogger(settings.FLASK_APP_NAME) # log.info("test logging getLogger from flask app, work?") # print("-----before: from factory import create_celery_app") # from factory import create_celery_app # from factory import create_log, create_app # print("from factory import create_celery_app ok") # celery = create_celery_app() # app = create_app(settings) # celery = create_celery_app(app) # log = app.logger from resources.extensions_celery import celery, celery_logger as log print ( "create_celery_app return: celery=%s, log=%s" % (celery, log)) #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename)) # log = app.logger # with celery.app.app_context(): # log = celery.app.logger # app = celery.app # log = celery.log # print("celery.log=%s" % celery.log) # print("log=%s" % log) log.info( "deleteTmpAudioFile: celery=%s, filename=%s" , celery, filename) audioTmpFolder = settings.AUDIO_TEMP_FOLDER # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" log.info( "celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender)) # with celery.app.app_context(): # log = app.logger # log = celery.app.logger # app = celery.app # log = celery.log # # print("celery.log=%s" % celery.log) # # print("log=%s" % log) # print("sender=%s" % sender) # log.info("celerySetupPeriodicTasks: sender=%s", sender) # print("celerySetupPeriodicTasks: log is usable") sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL, celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | import os from flask import Flask import logging from logging.handlers import RotatingFileHandler # from flask_pymongo import PyMongo from gridfs import GridFS from pymongo import MongoClient from flask_restful import Api from flask_cors import CORS from conf.app import settings from celery import Celery # from flask_celery import Celery # from resources.extensions_celery import celery from flask import g ################################################################################ # Global Variables ################################################################################ # # log = logging.getLogger() #<RootLogger root (WARNING)> # log = None # print("log=%s" % log) # # # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True) # mongo = None # print("mongo=%s" % mongo) # fsCollection = None #None # print("fsCollection=%s" % fsCollection) # # celery = Celery() #<Celery __main__ at 0x1068d8b38> # celery = None # print("celery=%s" % celery) ################################################################################ # Global Function ################################################################################ def create_app(config_object, init_extensions = True ): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log log.debug( "after load from object: app.config=%s" , app.config) log.debug( 'app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s' , app.config[ "DEBUG" ], app.config[ "MONGODB_HOST" ], app.config[ "FILE_URL_HOST" ]) if init_extensions: register_extensions(app) log.info( "flask app extensions init completed" ) return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log mongo = create_mongo(app) g.mongo = mongo log.info( "mongo=%s" , mongo) mongoServerInfo = mongo.server_info() log.debug( "mongoServerInfo=%s" , mongoServerInfo) fsCollection = create_gridfs_fs_collection(mongo) g.fsCollection = fsCollection log.info( "fsCollection=%s" , fsCollection) celery = create_celery_app(app) g.celery = celery log.info( "celery=%s" , celery) # api = Api(app) api = create_rest_api(app) log.debug( "api=%s" , api) g.api = api return app def create_rest_api(app): from resources.qa import RobotQaAPI from resources.asr import RobotAsrAPI from resources.files import GridfsAPI, TmpAudioAPI rest_api = Api() rest_api.add_resource(RobotQaAPI, '/qa' , endpoint = 'qa' ) rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>' , endpoint = 'asr' ) rest_api.add_resource(GridfsAPI, '/files/<fileId>' , '/files/<fileId>/<fileName>' , endpoint = 'gridfs' ) rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>' , endpoint = 'TmpAudio' ) rest_api.init_app(app) return rest_api def create_log(app): print ( "create_log: before init log: app.logger=%s" % app.logger) logFormatterStr = app.config[ "LOG_FORMAT" ] logFormatter = logging.Formatter(logFormatterStr) fileHandler = RotatingFileHandler( app.config[ 'LOG_FILE_FILENAME' ], maxBytes = app.config[ "LOG_FILE_MAX_BYTES" ], backupCount = app.config[ "LOG_FILE_BACKUP_COUNT" ], encoding = "UTF-8" ) fileHandler.setLevel(logging.DEBUG) fileHandler.setFormatter(logFormatter) app.logger.addHandler(fileHandler) # Note: should NOT set StreamHandler here, otherwise will duplicate debug log app.logger.setLevel(logging.DEBUG) # set root log level log = app.logger log.info( "app=%s" , app) # log.debug("app.config=%s", app.config) print ( "create_log: after init log: app.logger=%s" % app.logger) return log def create_mongo(app): # mongo_client = MongoClient( # host=app.config["MONGODB_HOST"], # port=app.config["MONGODB_PORT"], # username=app.config["MONGODB_USERNAME"], # password=app.config["MONGODB_PASSWORD"], # authSource=app.config["MONGODB_AUTH_SOURCE"] # ) if settings.MONGODB_AUTH_SOURCE: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), username = settings.MONGODB_USERNAME, password = settings.MONGODB_PASSWORD, authSource = settings.MONGODB_AUTH_SOURCE ) elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), username = settings.MONGODB_USERNAME, password = settings.MONGODB_PASSWORD, ) elif settings.MONGODB_PORT: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), ) elif settings.MONGODB_HOST: mongo_client = MongoClient( host = settings.MONGODB_HOST, ) else : mongo_client = MongoClient() return mongo_client def create_gridfs_fs_collection(mongo_db): # Pure PyMongo gridfs_db = mongo_db.gridfs # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs') gridfs_fs_collection = GridFS(gridfs_db) # <gridfs.GridFS object at 0x1107b2390> return gridfs_fs_collection def create_celery_app(app = None ): print ( "create_celery_app: app=%s" % app) app = app or create_app(settings, init_extensions = False ) app_import_name = app.import_name # app_name = app.name # celery_app_name = app_name celery_app_name = app_import_name celery = Celery(celery_app_name, broker = app.config[ 'CELERY_BROKER_URL' ]) celery.conf.update(app.config) # celery.log = app.logger TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__( self , * args, * * kwargs): with app.app_context(): # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) app.logger.info( "in celery ContextTask __call__: args=%s, kwargs=%s" , args, kwargs) return TaskBase.__call__( self , * args, * * kwargs) celery.Task = ContextTask # celery.init_app(app) print ( "init celery ok" ) return celery |
(4)app.py
app中只是调用create_app
而不去初始化celery了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import os from conf.app import settings from factory import create_app # from factory import log # from factory import create_celery_app ################################################################################ # Global Definitions ################################################################################ ################################################################################ # Global Variables ################################################################################ ################################################################################ # Global Function ################################################################################ ################################################################################ # Global Init App ################################################################################ print ( "in flask app: settings=%s" % (settings)) app = create_app(settings) app.app_context().push() # register_extensions(app) log = app.logger log.debug( "app=%s" , app) log.debug( "log=%s" , log) log.debug( "settings.FLASK_ENV=%s" , settings.FLASK_ENV) log.debug( "settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s" , settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST) # celery = None # with app.app_context(): # celery = create_celery_app(app) # print("celery=%s" % celery) if __name__ = = "__main__" : app.run( host = settings.FLASK_HOST, port = settings.FLASK_PORT, debug = settings.DEBUG, use_reloader = False ) |
(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
1 2 3 4 5 6 7 8 9 10 11 12 13 | from flask import g from resources.tts import gTempAudioFolder log = g.log mongo = g.mongo fsCollection = g.fsCollection class GridfsAPI(Resource): def get( self , fileId, fileName = None ): # log = app.logger log.info( "fileId=%s, file_name=%s" , fileId, fileName) ... |
resources/qa.py
1 2 3 4 5 | from flask import g app = g.app log = g.log fsCollection = g.fsCollection |
resources/tts.py
1 2 3 4 | from flask import g app = g.app log = g.log |
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:

接着再去Mac的终端中去运行celery的worker
1 | celery worker -A resources.tasks.celery --loglevel=DEBUG |
然后就可以有正常输出了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | ➜ xxxRobotDemoServer git:(master) ✗ celery worker - A resources.tasks.celery - - loglevel = DEBUG cur_flask_environ = None FLASK_ENV = development cur_dir = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / conf / app env_folder = development dotenv_path = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / conf / app / development / .env dotenv_load_ok = True After load .env: DEBUG = True , MONGODB_HOST = xxx, FILE_URL_HOST = 127.0 . 0.1 in extensions_celery: celery = <Celery RobotQA at 0x104277e80 > create_celery_app return : celery = <Celery RobotQA at 0x104277e80 >, log = <Logger resources.extensions_celery (WARNING)> [ 2018 - 08 - 26 11 : 11 : 45 , 796 : DEBUG / MainProcess] | Worker: Preparing bootsteps. [ 2018 - 08 - 26 11 : 11 : 45 , 801 : DEBUG / MainProcess] | Worker: Building graph... [ 2018 - 08 - 26 11 : 11 : 45 , 802 : DEBUG / MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer} [ 2018 - 08 - 26 11 : 11 : 45 , 831 : DEBUG / MainProcess] | Consumer: Preparing bootsteps. [ 2018 - 08 - 26 11 : 11 : 45 , 831 : DEBUG / MainProcess] | Consumer: Building graph... [ 2018 - 08 - 26 11 : 11 : 45 , 876 : DEBUG / MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop} celery@licrifandeMacBook - Pro.local v4. 2.1 (windowlicker) Darwin - 17.7 . 0 - x86_64 - i386 - 64bit - PE 2018 - 08 - 26 11 : 11 : 45 [config] .> app: RobotQA: 0x104277e80 .> transport: redis: / / localhost: 6379 / 0 .> results: disabled: / / .> concurrency: 4 (prefork) .> task events: OFF (enable - E to monitor tasks in this worker) [queues] .> celery exchange = celery(direct) key = celery [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery. map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile [ 2018 - 08 - 26 11 : 11 : 45 , 921 : DEBUG / MainProcess] | Worker: Starting Hub [ 2018 - 08 - 26 11 : 11 : 45 , 922 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 45 , 922 : DEBUG / MainProcess] | Worker: Starting Pool [ 2018 - 08 - 26 11 : 11 : 46 , 059 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 060 : DEBUG / MainProcess] | Worker: Starting Consumer [ 2018 - 08 - 26 11 : 11 : 46 , 061 : DEBUG / MainProcess] | Consumer: Starting Connection [ 2018 - 08 - 26 11 : 11 : 46 , 098 : INFO / MainProcess] Connected to redis: / / localhost: 6379 / 0 [ 2018 - 08 - 26 11 : 11 : 46 , 098 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 098 : DEBUG / MainProcess] | Consumer: Starting Events [ 2018 - 08 - 26 11 : 11 : 46 , 114 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 114 : DEBUG / MainProcess] | Consumer: Starting Heart [ 2018 - 08 - 26 11 : 11 : 46 , 117 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 119 : DEBUG / MainProcess] | Consumer: Starting Mingle [ 2018 - 08 - 26 11 : 11 : 46 , 119 : INFO / MainProcess] mingle: searching for neighbors [ 2018 - 08 - 26 11 : 11 : 47 , 155 : INFO / MainProcess] mingle: all alone [ 2018 - 08 - 26 11 : 11 : 47 , 156 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 156 : DEBUG / MainProcess] | Consumer: Starting Tasks [ 2018 - 08 - 26 11 : 11 : 47 , 161 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 161 : DEBUG / MainProcess] | Consumer: Starting Control [ 2018 - 08 - 26 11 : 11 : 47 , 167 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 167 : DEBUG / MainProcess] | Consumer: Starting Gossip [ 2018 - 08 - 26 11 : 11 : 47 , 172 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 172 : DEBUG / MainProcess] | Consumer: Starting event loop [ 2018 - 08 - 26 11 : 11 : 47 , 173 : DEBUG / MainProcess] | Worker: Hub.register Pool... [ 2018 - 08 - 26 11 : 11 : 47 , 173 : INFO / MainProcess] celery@licrifandeMacBook - Pro.local ready. [ 2018 - 08 - 26 11 : 11 : 47 , 174 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 |

其中的task是:
1 2 3 4 5 6 7 8 9 10 11 12 | [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile |
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 2018 - 08 - 26 11 : 11 : 47 , 174 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 [ 2018 - 08 - 26 11 : 14 : 28 , 913 : INFO / MainProcess] Received task: resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] ETA:[ 2018 - 08 - 26 03 : 14 : 38.521681 + 00 : 00 ] [ 2018 - 08 - 26 11 : 14 : 28 , 915 : DEBUG / MainProcess] basic.qos: prefetch_count - > 17 [ 2018 - 08 - 26 11 : 14 : 38 , 527 : DEBUG / MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840 > (args:( 'resources.tasks.deleteTmpAudioFile' , '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , { 'lang' : 'py' , 'task' : 'resources.tasks.deleteTmpAudioFile' , 'id' : '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , 'shadow' : None , 'eta' : '2018-08-26T03:14:38.521681+00:00' , 'expires' : None , 'group' : None , 'retries' : 0 , 'timelimit' : [ None , None ], 'root_id' : '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , 'parent_id' : None , 'argsrepr' : "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']" , 'kwargsrepr' : '{}' , 'origin' : ' gen47559@licrifandeMacBook - Pro.local ', ' reply_to ': ' 10c57fe8 - 23c6 - 3e20 - bea6 - 0576fa07fe57 ', ' correlation_id ': ' 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ', ' delivery_info ': {' exchange ': ' ', ' routing_key ': ' celery ', ' priority ': 0, ' redelivered ': None}}, b' [[ "42667515-281a-4cab-bfac-5d2c23c25a34.mp3" ], {}, { "callbacks" : null, "errbacks" : null, "chain" : null, "chord" : null}] ', ' application / json ', ' utf - 8 ') kwargs:{}) [ 2018 - 08 - 26 11 : 14 : 38 , 564 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 [ 2018 - 08 - 26 11 : 14 : 38 , 626 : DEBUG / MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] pid: 47617 [ 2018 - 08 - 26 11 : 14 : 38 , 628 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: deleteTmpAudioFile: celery = <Celery RobotQA at 0x104277e80 >, filename = 42667515 - 281a - 4cab - bfac - 5d2c23c25a34 .mp3 [ 2018 - 08 - 26 11 : 14 : 38 , 632 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: audioTmpFolder = tmp / audio [ 2018 - 08 - 26 11 : 14 : 38 , 634 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: curFolderAbsPath = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer [ 2018 - 08 - 26 11 : 14 : 38 , 634 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: audioTmpFolderFullPath = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / tmp / audio [ 2018 - 08 - 26 11 : 14 : 38 , 635 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: Ok to delete file / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / tmp / audio / 42667515 - 281a - 4cab - bfac - 5d2c23c25a34 .mp3 [ 2018 - 08 - 26 11 : 14 : 38 , 638 : INFO / ForkPoolWorker - 2 ] Task resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] succeeded in 0.012065329006873071s : None |

加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | @celery .task # def celeryRefreshAzureSpeechToken(): def refreshAzureSpeechToken(): """celery's task: refresh microsoft azure speech token key for later call tts/ASR api""" log.info( "celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): # from resources.tts import refreshAzureSpeechToken # refreshAzureSpeechToken() # global gMsToken # log = app.logger # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken) # log.info("refreshAzureSpeechToken") getMsTokenUrl = settings.MS_GET_TOKEN_URL reqHeaders = { "Ocp-Apim-Subscription-Key" : settings.MS_TTS_SECRET_KEY } log.info( "getMsTokenUrl=%s, reqHeaders=%s" , getMsTokenUrl, reqHeaders) resp = requests.post(getMsTokenUrl, headers = reqHeaders) log.info( "resp=%s" , resp) respTokenText = resp.text # eyxxxxiJ9.xxx.xxx log.info( "respTokenText=%s" , respTokenText) # gMsToken = respTokenText updatedToken = respTokenText # # for debug # gMsToken = "eyJ0eXAiOxxxxnez" # log.info("after refresh: gMsToken=%s", gMsToken) return updatedToken |
避免了之前的:
1 2 | from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken() |
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
而
resources/tts.py
中的,对于:
1 | from common.util import generateUUID |
必要要加上导入sys的path:
1 2 3 4 5 | import sys resouces_dir = os.path.dirname(__file__) project_root_dir = os.path.abspath(os.path.join(resouces_dir, ".." )) if project_root_dir not in sys.path: sys.path.append(project_root_dir) |
否则会出现无法导入
以及tts内的token的代码也去更新了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from resources.tasks import refreshAzureSpeechToken def getAzureSpeechToken(): """get Microsoft Azure speech service token key""" # log = app.logger global gMsToken gMsToken = refreshAzureSpeechToken() ... else : # if errNo == BAIDU_ERR_TOKEN_INVALID: if errNo = = settings.MS_ERR_UNAUTHORIZED: log.warning( "Token invalid -> retry one for refresh token" ) # refreshBaiduToken() gMsToken = refreshAzureSpeechToken() # isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText) isOk, audioBinData, errNo, errMsg = msTTS(unicodeText) |
心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config[ 'CELERY_IMPORTS' ] = ( 'tasks.add_together' , ) celery.init_app(app) return app |
或:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from celery import Celery from flask import Flask from app.config import BaseConfig celery = Celery(__name__, broker = BaseConfig.CELERY_BROKER_URL) def create_app(): app = Flask(__name__) # .... celery.conf.update(app.config) # 更新 celery 的配置 # ... return app |
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
或
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
1 2 3 4 5 | # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result # for periodical celery task CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True |
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
1 | celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL) |
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
1 | celery worker -A resources.tasks.celery --loglevel=DEBUG |
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
1 2 | pipenv shell celery worker -A resources.tasks.celery --loglevel=DEBUG |
和:
1 2 | pipenv shell celery beat -A resources.tasks.celery -s runtime /celerybeat-schedule --loglevel=DEBUG |
终于可以正常执行celery的异步函数和定期任务了。
转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery