折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | celeryApp = Celery(app.name, broker = app.config[ 'CELERY_BROKER_URL' ]) celeryApp.conf.update(app.config) log.info( "celeryApp=%s" , celeryApp) #---------------------------------------- # Celery tasks #---------------------------------------- # @celeryApp.task() @celeryApp .task # @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ global log log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celeryApp.tasks=%s", celeryApp.tasks) @celeryApp .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celeryApp .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
1 2 3 4 5 6 7 8 9 10 | from celery import Celery from config import config, Config celery = Celery(__name__, broker = Config.CELERY_BROKER_URL) def create_app(config_name): # ... celery.conf.update(app.config) # ... return app |
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
在Celery中使用Flask的上下文
目前写成:
resources/celery_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import os from app import app, celery, log from resources.tts import refreshAzureSpeechToken #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celery.tasks=%s", celery.tasks) |
app.py
1 2 3 4 | from factory import celery app = create_app(settings) log.debug( "celery=%s" , celery) |
factory.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | from celery import Celery celery = Celery() print ( "celery=%" % celery) def create_app(config_object): app = Flask(__name__) CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) create_extensions(app) def create_extensions(app): global log, mongo, fsCollection, api, celery log = create_log(app) ... celery = create_celery(app) log.info( "celery=%s" , celery) return app |
待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | import os # from app import celery # from app import app, log # from flask import current_app as app # from factory import celery from flask import g from resources.tts import refreshAzureSpeechToken app = g.app log = g.log celery = g.celery #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # log = app.logger with celery.app.app_context(): log.info( "deleteTmpAudioFile: filename=%s" , filename) audioTmpFolder = app.config[ "AUDIO_TEMP_FOLDER" ] # audioTmpFolder = "tmp/audio" ... @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" with celery.app.app_context(): refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): with celery.app.app_context(): # log = app.logger log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
而app.py是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from conf.app import settings from factory import create_app app = create_app(settings) # register_extensions(app) log = app.logger if __name__ = = "__main__" : app.run( host = app.config[ "FLASK_HOST" ], port = app.config[ "FLASK_PORT" ], debug = app.config[ "DEBUG" ], use_reloader = False ) |
factory.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | import os from flask import Flask ... from celery import Celery from flask import g ################################################################################ # Global Function ################################################################################ def create_app(config_object): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log ... register_extensions(app) return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log ... celery = create_celery(app) g.celery = celery log.info( "celery=%s" , celery) ... return app ... def create_celery(app): celery = Celery(app.name, broker = app.config[ 'CELERY_BROKER_URL' ]) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__( self , * args, * * kwargs): with app.app_context(): g.log.info( "in celery ContextTask __call__: args=%s, kwargs=%s" , args, kwargs) return TaskBase.__call__( self , * args, * * kwargs) celery.Task = ContextTask return celery ... |
目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
1 2 | # from flask_celery import Celery from celery import Celery |
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
1 2 3 | # for Flask-Celery-Helper # CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', ) # CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', ) |
-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | ➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── app.py ... ├── conf │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython - 36.pyc │ ├── app │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython - 36.pyc │ │ │ └── settings.cpython - 36.pyc │ │ ├── development │ │ │ └── __init__.py │ │ ├── production │ │ │ └── __init__.py │ │ └── settings.py ... ├── factory.py ├── resources │ ├── __init__.py │ ├── asr.py │ ├── extensions_celery.py │ ├── files.py │ ├── qa.py │ ├── tasks.py │ └── tts.py ... |
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
1 2 3 4 5 6 7 8 9 | # from flask_celery import Celery from conf.app import settings from celery import Celery from celery.utils.log import get_task_logger # celery = Celery() celery = Celery(settings.FLASK_APP_NAME, broker = settings.CELERY_BROKER_URL) celery_logger = get_task_logger(__name__) print ( "in extensions_celery: celery=%s" % celery) |
(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
- 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
- log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
1 | from resources.tasks import deleteTmpAudioFile |
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
- 把app.config[“xxx”] 改为settings.xxx了
- -》而单独引入settings不会产生循环导入
- 用另外的办法去获得log
- 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | import os # from resources.extensions_celery import celery # print("celery=%s" % celery) # print("celery.app=%s" % celery.app) # print("celery.app.log=%s" % celery.app.log) # print("celery.log=%s" % celery.log) from conf.app import settings # try: # from flask import g # print("tasks: import flask g ok") # print("g=%s" % g) # log = g.log # print("log=%s" % log) # except RuntimeError as err: # # except: # print("tasks: failed to import flask g, err=%s" % err) # from factory import create_app # print("tasks: import create_app ok") # app = create_app(settings) # print("create_app ok, app=%s" % app) # log = app.logger # print("log=%s" % log) # import logging # log = logging.getLogger(settings.FLASK_APP_NAME) # log.info("test logging getLogger from flask app, work?") # print("-----before: from factory import create_celery_app") # from factory import create_celery_app # from factory import create_log, create_app # print("from factory import create_celery_app ok") # celery = create_celery_app() # app = create_app(settings) # celery = create_celery_app(app) # log = app.logger from resources.extensions_celery import celery, celery_logger as log print ( "create_celery_app return: celery=%s, log=%s" % (celery, log)) #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery .task # @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename)) # log = app.logger # with celery.app.app_context(): # log = celery.app.logger # app = celery.app # log = celery.log # print("celery.log=%s" % celery.log) # print("log=%s" % log) log.info( "deleteTmpAudioFile: celery=%s, filename=%s" , celery, filename) audioTmpFolder = settings.AUDIO_TEMP_FOLDER # audioTmpFolder = "tmp/audio" log.info( "audioTmpFolder=%s" , audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info( "curFolderAbsPath=%s" , curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info( "audioTmpFolderFullPath=%s" , audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info( "Ok to delete file %s" , tempAudioFullname) else : log.warning( "No need to remove for not exist file %s" , tempAudioFullname) @celery .task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" log.info( "celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken() @celery .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender)) # with celery.app.app_context(): # log = app.logger # log = celery.app.logger # app = celery.app # log = celery.log # # print("celery.log=%s" % celery.log) # # print("log=%s" % log) # print("sender=%s" % sender) # log.info("celerySetupPeriodicTasks: sender=%s", sender) # print("celerySetupPeriodicTasks: log is usable") sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL, celeryRefreshAzureSpeechToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
| import os from flask import Flask import logging from logging.handlers import RotatingFileHandler # from flask_pymongo import PyMongo from gridfs import GridFS from pymongo import MongoClient from flask_restful import Api from flask_cors import CORS from conf.app import settings from celery import Celery # from flask_celery import Celery # from resources.extensions_celery import celery from flask import g ################################################################################ # Global Variables ################################################################################ # # log = logging.getLogger() #<RootLogger root (WARNING)> # log = None # print("log=%s" % log) # # # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True) # mongo = None # print("mongo=%s" % mongo) # fsCollection = None #None # print("fsCollection=%s" % fsCollection) # # celery = Celery() #<Celery __main__ at 0x1068d8b38> # celery = None # print("celery=%s" % celery) ################################################################################ # Global Function ################################################################################ def create_app(config_object, init_extensions = True ): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log log.debug( "after load from object: app.config=%s" , app.config) log.debug( 'app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s' , app.config[ "DEBUG" ], app.config[ "MONGODB_HOST" ], app.config[ "FILE_URL_HOST" ]) if init_extensions: register_extensions(app) log.info( "flask app extensions init completed" ) return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log mongo = create_mongo(app) g.mongo = mongo log.info( "mongo=%s" , mongo) mongoServerInfo = mongo.server_info() log.debug( "mongoServerInfo=%s" , mongoServerInfo) fsCollection = create_gridfs_fs_collection(mongo) g.fsCollection = fsCollection log.info( "fsCollection=%s" , fsCollection) celery = create_celery_app(app) g.celery = celery log.info( "celery=%s" , celery) # api = Api(app) api = create_rest_api(app) log.debug( "api=%s" , api) g.api = api return app def create_rest_api(app): from resources.qa import RobotQaAPI from resources.asr import RobotAsrAPI from resources.files import GridfsAPI, TmpAudioAPI rest_api = Api() rest_api.add_resource(RobotQaAPI, '/qa' , endpoint = 'qa' ) rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>' , endpoint = 'asr' ) rest_api.add_resource(GridfsAPI, '/files/<fileId>' , '/files/<fileId>/<fileName>' , endpoint = 'gridfs' ) rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>' , endpoint = 'TmpAudio' ) rest_api.init_app(app) return rest_api def create_log(app): print ( "create_log: before init log: app.logger=%s" % app.logger) logFormatterStr = app.config[ "LOG_FORMAT" ] logFormatter = logging.Formatter(logFormatterStr) fileHandler = RotatingFileHandler( app.config[ 'LOG_FILE_FILENAME' ], maxBytes = app.config[ "LOG_FILE_MAX_BYTES" ], backupCount = app.config[ "LOG_FILE_BACKUP_COUNT" ], encoding = "UTF-8" ) fileHandler.setLevel(logging.DEBUG) fileHandler.setFormatter(logFormatter) app.logger.addHandler(fileHandler) # Note: should NOT set StreamHandler here, otherwise will duplicate debug log app.logger.setLevel(logging.DEBUG) # set root log level log = app.logger log.info( "app=%s" , app) # log.debug("app.config=%s", app.config) print ( "create_log: after init log: app.logger=%s" % app.logger) return log def create_mongo(app): # mongo_client = MongoClient( # host=app.config["MONGODB_HOST"], # port=app.config["MONGODB_PORT"], # username=app.config["MONGODB_USERNAME"], # password=app.config["MONGODB_PASSWORD"], # authSource=app.config["MONGODB_AUTH_SOURCE"] # ) if settings.MONGODB_AUTH_SOURCE: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), username = settings.MONGODB_USERNAME, password = settings.MONGODB_PASSWORD, authSource = settings.MONGODB_AUTH_SOURCE ) elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), username = settings.MONGODB_USERNAME, password = settings.MONGODB_PASSWORD, ) elif settings.MONGODB_PORT: mongo_client = MongoClient( host = settings.MONGODB_HOST, port = int (settings.MONGODB_PORT), ) elif settings.MONGODB_HOST: mongo_client = MongoClient( host = settings.MONGODB_HOST, ) else : mongo_client = MongoClient() return mongo_client def create_gridfs_fs_collection(mongo_db): # Pure PyMongo gridfs_db = mongo_db.gridfs # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs') gridfs_fs_collection = GridFS(gridfs_db) # <gridfs.GridFS object at 0x1107b2390> return gridfs_fs_collection def create_celery_app(app = None ): print ( "create_celery_app: app=%s" % app) app = app or create_app(settings, init_extensions = False ) app_import_name = app.import_name # app_name = app.name # celery_app_name = app_name celery_app_name = app_import_name celery = Celery(celery_app_name, broker = app.config[ 'CELERY_BROKER_URL' ]) celery.conf.update(app.config) # celery.log = app.logger TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__( self , * args, * * kwargs): with app.app_context(): # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) app.logger.info( "in celery ContextTask __call__: args=%s, kwargs=%s" , args, kwargs) return TaskBase.__call__( self , * args, * * kwargs) celery.Task = ContextTask # celery.init_app(app) print ( "init celery ok" ) return celery |
(4)app.py
app中只是调用create_app
而不去初始化celery了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import os from conf.app import settings from factory import create_app # from factory import log # from factory import create_celery_app ################################################################################ # Global Definitions ################################################################################ ################################################################################ # Global Variables ################################################################################ ################################################################################ # Global Function ################################################################################ ################################################################################ # Global Init App ################################################################################ print ( "in flask app: settings=%s" % (settings)) app = create_app(settings) app.app_context().push() # register_extensions(app) log = app.logger log.debug( "app=%s" , app) log.debug( "log=%s" , log) log.debug( "settings.FLASK_ENV=%s" , settings.FLASK_ENV) log.debug( "settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s" , settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST) # celery = None # with app.app_context(): # celery = create_celery_app(app) # print("celery=%s" % celery) if __name__ = = "__main__" : app.run( host = settings.FLASK_HOST, port = settings.FLASK_PORT, debug = settings.DEBUG, use_reloader = False ) |
(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
1 2 3 4 5 6 7 8 9 10 11 12 13 | from flask import g from resources.tts import gTempAudioFolder log = g.log mongo = g.mongo fsCollection = g.fsCollection class GridfsAPI(Resource): def get( self , fileId, fileName = None ): # log = app.logger log.info( "fileId=%s, file_name=%s" , fileId, fileName) ... |
resources/qa.py
1 2 3 4 5 | from flask import g app = g.app log = g.log fsCollection = g.fsCollection |
resources/tts.py
1 2 3 4 | from flask import g app = g.app log = g.log |
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:

接着再去Mac的终端中去运行celery的worker
1 | celery worker -A resources.tasks.celery --loglevel=DEBUG |
然后就可以有正常输出了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | ➜ xxxRobotDemoServer git:(master) ✗ celery worker - A resources.tasks.celery - - loglevel = DEBUG cur_flask_environ = None FLASK_ENV = development cur_dir = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / conf / app env_folder = development dotenv_path = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / conf / app / development / .env dotenv_load_ok = True After load .env: DEBUG = True , MONGODB_HOST = xxx, FILE_URL_HOST = 127.0 . 0.1 in extensions_celery: celery = <Celery RobotQA at 0x104277e80 > create_celery_app return : celery = <Celery RobotQA at 0x104277e80 >, log = <Logger resources.extensions_celery (WARNING)> [ 2018 - 08 - 26 11 : 11 : 45 , 796 : DEBUG / MainProcess] | Worker: Preparing bootsteps. [ 2018 - 08 - 26 11 : 11 : 45 , 801 : DEBUG / MainProcess] | Worker: Building graph... [ 2018 - 08 - 26 11 : 11 : 45 , 802 : DEBUG / MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer} [ 2018 - 08 - 26 11 : 11 : 45 , 831 : DEBUG / MainProcess] | Consumer: Preparing bootsteps. [ 2018 - 08 - 26 11 : 11 : 45 , 831 : DEBUG / MainProcess] | Consumer: Building graph... [ 2018 - 08 - 26 11 : 11 : 45 , 876 : DEBUG / MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop} celery@licrifandeMacBook - Pro.local v4. 2.1 (windowlicker) Darwin - 17.7 . 0 - x86_64 - i386 - 64bit - PE 2018 - 08 - 26 11 : 11 : 45 [config] .> app: RobotQA: 0x104277e80 .> transport: redis: / / localhost: 6379 / 0 .> results: disabled: / / .> concurrency: 4 (prefork) .> task events: OFF (enable - E to monitor tasks in this worker) [queues] .> celery exchange = celery(direct) key = celery [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery. map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile [ 2018 - 08 - 26 11 : 11 : 45 , 921 : DEBUG / MainProcess] | Worker: Starting Hub [ 2018 - 08 - 26 11 : 11 : 45 , 922 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 45 , 922 : DEBUG / MainProcess] | Worker: Starting Pool [ 2018 - 08 - 26 11 : 11 : 46 , 059 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 060 : DEBUG / MainProcess] | Worker: Starting Consumer [ 2018 - 08 - 26 11 : 11 : 46 , 061 : DEBUG / MainProcess] | Consumer: Starting Connection [ 2018 - 08 - 26 11 : 11 : 46 , 098 : INFO / MainProcess] Connected to redis: / / localhost: 6379 / 0 [ 2018 - 08 - 26 11 : 11 : 46 , 098 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 098 : DEBUG / MainProcess] | Consumer: Starting Events [ 2018 - 08 - 26 11 : 11 : 46 , 114 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 114 : DEBUG / MainProcess] | Consumer: Starting Heart [ 2018 - 08 - 26 11 : 11 : 46 , 117 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 46 , 119 : DEBUG / MainProcess] | Consumer: Starting Mingle [ 2018 - 08 - 26 11 : 11 : 46 , 119 : INFO / MainProcess] mingle: searching for neighbors [ 2018 - 08 - 26 11 : 11 : 47 , 155 : INFO / MainProcess] mingle: all alone [ 2018 - 08 - 26 11 : 11 : 47 , 156 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 156 : DEBUG / MainProcess] | Consumer: Starting Tasks [ 2018 - 08 - 26 11 : 11 : 47 , 161 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 161 : DEBUG / MainProcess] | Consumer: Starting Control [ 2018 - 08 - 26 11 : 11 : 47 , 167 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 167 : DEBUG / MainProcess] | Consumer: Starting Gossip [ 2018 - 08 - 26 11 : 11 : 47 , 172 : DEBUG / MainProcess] ^ - - substep ok [ 2018 - 08 - 26 11 : 11 : 47 , 172 : DEBUG / MainProcess] | Consumer: Starting event loop [ 2018 - 08 - 26 11 : 11 : 47 , 173 : DEBUG / MainProcess] | Worker: Hub.register Pool... [ 2018 - 08 - 26 11 : 11 : 47 , 173 : INFO / MainProcess] celery@licrifandeMacBook - Pro.local ready. [ 2018 - 08 - 26 11 : 11 : 47 , 174 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 |

其中的task是:
1 2 3 4 5 6 7 8 9 10 11 12 | [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile |
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 2018 - 08 - 26 11 : 11 : 47 , 174 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 [ 2018 - 08 - 26 11 : 14 : 28 , 913 : INFO / MainProcess] Received task: resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] ETA:[ 2018 - 08 - 26 03 : 14 : 38.521681 + 00 : 00 ] [ 2018 - 08 - 26 11 : 14 : 28 , 915 : DEBUG / MainProcess] basic.qos: prefetch_count - > 17 [ 2018 - 08 - 26 11 : 14 : 38 , 527 : DEBUG / MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840 > (args:( 'resources.tasks.deleteTmpAudioFile' , '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , { 'lang' : 'py' , 'task' : 'resources.tasks.deleteTmpAudioFile' , 'id' : '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , 'shadow' : None , 'eta' : '2018-08-26T03:14:38.521681+00:00' , 'expires' : None , 'group' : None , 'retries' : 0 , 'timelimit' : [ None , None ], 'root_id' : '6ea6c103-7b58-4d38-ad54-06666b0ebb59' , 'parent_id' : None , 'argsrepr' : "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']" , 'kwargsrepr' : '{}' , 'origin' : ' gen47559@licrifandeMacBook - Pro.local ', ' reply_to ': ' 10c57fe8 - 23c6 - 3e20 - bea6 - 0576fa07fe57 ', ' correlation_id ': ' 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ', ' delivery_info ': {' exchange ': ' ', ' routing_key ': ' celery ', ' priority ': 0, ' redelivered ': None}}, b' [[ "42667515-281a-4cab-bfac-5d2c23c25a34.mp3" ], {}, { "callbacks" : null, "errbacks" : null, "chain" : null, "chord" : null}] ', ' application / json ', ' utf - 8 ') kwargs:{}) [ 2018 - 08 - 26 11 : 14 : 38 , 564 : DEBUG / MainProcess] basic.qos: prefetch_count - > 16 [ 2018 - 08 - 26 11 : 14 : 38 , 626 : DEBUG / MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] pid: 47617 [ 2018 - 08 - 26 11 : 14 : 38 , 628 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: deleteTmpAudioFile: celery = <Celery RobotQA at 0x104277e80 >, filename = 42667515 - 281a - 4cab - bfac - 5d2c23c25a34 .mp3 [ 2018 - 08 - 26 11 : 14 : 38 , 632 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: audioTmpFolder = tmp / audio [ 2018 - 08 - 26 11 : 14 : 38 , 634 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: curFolderAbsPath = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer [ 2018 - 08 - 26 11 : 14 : 38 , 634 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: audioTmpFolderFullPath = / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / tmp / audio [ 2018 - 08 - 26 11 : 14 : 38 , 635 : INFO / ForkPoolWorker - 2 ] resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ]: Ok to delete file / Users / crifan / dev / dev_root / company / xxx / projects / robotDemo / server / xxxRobotDemoServer / tmp / audio / 42667515 - 281a - 4cab - bfac - 5d2c23c25a34 .mp3 [ 2018 - 08 - 26 11 : 14 : 38 , 638 : INFO / ForkPoolWorker - 2 ] Task resources.tasks.deleteTmpAudioFile[ 6ea6c103 - 7b58 - 4d38 - ad54 - 06666b0ebb59 ] succeeded in 0.012065329006873071s : None |

加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | @celery .task # def celeryRefreshAzureSpeechToken(): def refreshAzureSpeechToken(): """celery's task: refresh microsoft azure speech token key for later call tts/ASR api""" log.info( "celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): # from resources.tts import refreshAzureSpeechToken # refreshAzureSpeechToken() # global gMsToken # log = app.logger # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken) # log.info("refreshAzureSpeechToken") getMsTokenUrl = settings.MS_GET_TOKEN_URL reqHeaders = { "Ocp-Apim-Subscription-Key" : settings.MS_TTS_SECRET_KEY } log.info( "getMsTokenUrl=%s, reqHeaders=%s" , getMsTokenUrl, reqHeaders) resp = requests.post(getMsTokenUrl, headers = reqHeaders) log.info( "resp=%s" , resp) respTokenText = resp.text # eyxxxxiJ9.xxx.xxx log.info( "respTokenText=%s" , respTokenText) # gMsToken = respTokenText updatedToken = respTokenText # # for debug # gMsToken = "eyJ0eXAiOxxxxnez" # log.info("after refresh: gMsToken=%s", gMsToken) return updatedToken |
避免了之前的:
1 2 | from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken() |
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
而
resources/tts.py
中的,对于:
1 | from common.util import generateUUID |
必要要加上导入sys的path:
1 2 3 4 5 | import sys resouces_dir = os.path.dirname(__file__) project_root_dir = os.path.abspath(os.path.join(resouces_dir, ".." )) if project_root_dir not in sys.path: sys.path.append(project_root_dir) |
否则会出现无法导入
以及tts内的token的代码也去更新了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from resources.tasks import refreshAzureSpeechToken def getAzureSpeechToken(): """get Microsoft Azure speech service token key""" # log = app.logger global gMsToken gMsToken = refreshAzureSpeechToken() ... else : # if errNo == BAIDU_ERR_TOKEN_INVALID: if errNo = = settings.MS_ERR_UNAUTHORIZED: log.warning( "Token invalid -> retry one for refresh token" ) # refreshBaiduToken() gMsToken = refreshAzureSpeechToken() # isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText) isOk, audioBinData, errNo, errMsg = msTTS(unicodeText) |
心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config[ 'CELERY_IMPORTS' ] = ( 'tasks.add_together' , ) celery.init_app(app) return app |
或:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from celery import Celery from flask import Flask from app.config import BaseConfig celery = Celery(__name__, broker = BaseConfig.CELERY_BROKER_URL) def create_app(): app = Flask(__name__) # .... celery.conf.update(app.config) # 更新 celery 的配置 # ... return app |
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
或
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
1 2 3 4 5 | # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result # for periodical celery task CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True |
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
1 | celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL) |
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
1 | celery worker -A resources.tasks.celery --loglevel=DEBUG |
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
1 2 | pipenv shell celery worker -A resources.tasks.celery --loglevel=DEBUG |
和:
1 2 | pipenv shell celery beat -A resources.tasks.celery -s runtime /celerybeat-schedule --loglevel=DEBUG |
终于可以正常执行celery的异步函数和定期任务了。
转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery