折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celeryApp.conf.update(app.config) log.info("celeryApp=%s", celeryApp) #---------------------------------------- # Celery tasks #---------------------------------------- # @celeryApp.task() @celeryApp.task # @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ global log log.info("deleteTmpAudioFile: filename=%s", filename) audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"] # audioTmpFolder = "tmp/audio" log.info("audioTmpFolder=%s", audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info("curFolderAbsPath=%s", curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info("Ok to delete file %s", tempAudioFullname) else: log.warning("No need to remove for not exist file %s", tempAudioFullname) # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celeryApp.tasks=%s", celeryApp.tasks) @celeryApp.task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celeryApp.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: sender=%s", sender) sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes")
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
from celery import Celery from config import config, Config celery = Celery(__name__, broker=Config.CELERY_BROKER_URL) def create_app(config_name): # ... celery.conf.update(app.config) # ... return app
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
在Celery中使用Flask的上下文
目前写成:
resources/celery_task.py
import os from app import app, celery, log from resources.tts import refreshAzureSpeechToken #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery.task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ log.info("deleteTmpAudioFile: filename=%s", filename) audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"] # audioTmpFolder = "tmp/audio" log.info("audioTmpFolder=%s", audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info("curFolderAbsPath=%s", curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info("Ok to delete file %s", tempAudioFullname) else: log.warning("No need to remove for not exist file %s", tempAudioFullname) @celery.task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" refreshAzureSpeechToken() @celery.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: sender=%s", sender) sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes") # log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile) # log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name) # log.info("celery.tasks=%s", celery.tasks)
app.py
from factory import celery app = create_app(settings) log.debug("celery=%s", celery)
factory.py
from celery import Celery celery = Celery() print("celery=%" % celery) def create_app(config_object): app = Flask(__name__) CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) create_extensions(app) def create_extensions(app): global log, mongo, fsCollection, api, celery log = create_log(app) ... celery = create_celery(app) log.info("celery=%s", celery) return app
待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
import os # from app import celery # from app import app, log # from flask import current_app as app # from factory import celery from flask import g from resources.tts import refreshAzureSpeechToken app = g.app log = g.log celery = g.celery #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery.task # @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # log = app.logger with celery.app.app_context(): log.info("deleteTmpAudioFile: filename=%s", filename) audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"] # audioTmpFolder = "tmp/audio" ... @celery.task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" with celery.app.app_context(): refreshAzureSpeechToken() @celery.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): with celery.app.app_context(): # log = app.logger log.info("celerySetupPeriodicTasks: sender=%s", sender) sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes")
而app.py是:
from conf.app import settings from factory import create_app app = create_app(settings) # register_extensions(app) log = app.logger if __name__ == "__main__": app.run( host=app.config["FLASK_HOST"], port=app.config["FLASK_PORT"], debug=app.config["DEBUG"], use_reloader=False )
factory.py
import os from flask import Flask ... from celery import Celery from flask import g ################################################################################ # Global Function ################################################################################ def create_app(config_object): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log ... register_extensions(app) return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log ... celery = create_celery(app) g.celery = celery log.info("celery=%s", celery) ... return app ... def create_celery(app): celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery ...
目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
# from flask_celery import Celery from celery import Celery
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
# for Flask-Celery-Helper # CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', ) # CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', )
-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── app.py ... ├── conf │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-36.pyc │ ├── app │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-36.pyc │ │ │ └── settings.cpython-36.pyc │ │ ├── development │ │ │ └── __init__.py │ │ ├── production │ │ │ └── __init__.py │ │ └── settings.py ... ├── factory.py ├── resources │ ├── __init__.py │ ├── asr.py │ ├── extensions_celery.py │ ├── files.py │ ├── qa.py │ ├── tasks.py │ └── tts.py ...
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
# from flask_celery import Celery from conf.app import settings from celery import Celery from celery.utils.log import get_task_logger # celery = Celery() celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL) celery_logger = get_task_logger(__name__) print("in extensions_celery: celery=%s" % celery)
(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
- 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
- log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
from resources.tasks import deleteTmpAudioFile
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
- 把app.config[“xxx”] 改为settings.xxx了
- -》而单独引入settings不会产生循环导入
- 用另外的办法去获得log
- 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
import os # from resources.extensions_celery import celery # print("celery=%s" % celery) # print("celery.app=%s" % celery.app) # print("celery.app.log=%s" % celery.app.log) # print("celery.log=%s" % celery.log) from conf.app import settings # try: # from flask import g # print("tasks: import flask g ok") # print("g=%s" % g) # log = g.log # print("log=%s" % log) # except RuntimeError as err: # # except: # print("tasks: failed to import flask g, err=%s" % err) # from factory import create_app # print("tasks: import create_app ok") # app = create_app(settings) # print("create_app ok, app=%s" % app) # log = app.logger # print("log=%s" % log) # import logging # log = logging.getLogger(settings.FLASK_APP_NAME) # log.info("test logging getLogger from flask app, work?") # print("-----before: from factory import create_celery_app") # from factory import create_celery_app # from factory import create_log, create_app # print("from factory import create_celery_app ok") # celery = create_celery_app() # app = create_app(settings) # celery = create_celery_app(app) # log = app.logger from resources.extensions_celery import celery, celery_logger as log print("create_celery_app return: celery=%s, log=%s" % (celery, log)) #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery.task # @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename)) # log = app.logger # with celery.app.app_context(): # log = celery.app.logger # app = celery.app # log = celery.log # print("celery.log=%s" % celery.log) # print("log=%s" % log) log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename) audioTmpFolder = settings.AUDIO_TEMP_FOLDER # audioTmpFolder = "tmp/audio" log.info("audioTmpFolder=%s", audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server' log.info("curFolderAbsPath=%s", curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) log.info("Ok to delete file %s", tempAudioFullname) else: log.warning("No need to remove for not exist file %s", tempAudioFullname) @celery.task def celeryRefreshAzureSpeechToken(): """celery's task: refreshAzureSpeechToken""" log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken() @celery.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender)) # with celery.app.app_context(): # log = app.logger # log = celery.app.logger # app = celery.app # log = celery.log # # print("celery.log=%s" % celery.log) # # print("log=%s" % log) # print("sender=%s" % sender) # log.info("celerySetupPeriodicTasks: sender=%s", sender) # print("celerySetupPeriodicTasks: log is usable") sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL, celeryRefreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes")
(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
import os from flask import Flask import logging from logging.handlers import RotatingFileHandler # from flask_pymongo import PyMongo from gridfs import GridFS from pymongo import MongoClient from flask_restful import Api from flask_cors import CORS from conf.app import settings from celery import Celery # from flask_celery import Celery # from resources.extensions_celery import celery from flask import g ################################################################################ # Global Variables ################################################################################ # # log = logging.getLogger() #<RootLogger root (WARNING)> # log = None # print("log=%s" % log) # # # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True) # mongo = None # print("mongo=%s" % mongo) # fsCollection = None #None # print("fsCollection=%s" % fsCollection) # # celery = Celery() #<Celery __main__ at 0x1068d8b38> # celery = None # print("celery=%s" % celery) ################################################################################ # Global Function ################################################################################ def create_app(config_object, init_extensions=True): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log log.debug("after load from object: app.config=%s", app.config) log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s', app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"]) if init_extensions: register_extensions(app) log.info("flask app extensions init completed") return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log mongo = create_mongo(app) g.mongo = mongo log.info("mongo=%s", mongo) mongoServerInfo = mongo.server_info() log.debug("mongoServerInfo=%s", mongoServerInfo) fsCollection = create_gridfs_fs_collection(mongo) g.fsCollection = fsCollection log.info("fsCollection=%s", fsCollection) celery = create_celery_app(app) g.celery = celery log.info("celery=%s", celery) # api = Api(app) api = create_rest_api(app) log.debug("api=%s", api) g.api = api return app def create_rest_api(app): from resources.qa import RobotQaAPI from resources.asr import RobotAsrAPI from resources.files import GridfsAPI, TmpAudioAPI rest_api = Api() rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa') rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr') rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs') rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio') rest_api.init_app(app) return rest_api def create_log(app): print("create_log: before init log: app.logger=%s" % app.logger) logFormatterStr = app.config["LOG_FORMAT"] logFormatter = logging.Formatter(logFormatterStr) fileHandler = RotatingFileHandler( app.config['LOG_FILE_FILENAME'], maxBytes=app.config["LOG_FILE_MAX_BYTES"], backupCount=app.config["LOG_FILE_BACKUP_COUNT"], encoding="UTF-8") fileHandler.setLevel(logging.DEBUG) fileHandler.setFormatter(logFormatter) app.logger.addHandler(fileHandler) # Note: should NOT set StreamHandler here, otherwise will duplicate debug log app.logger.setLevel(logging.DEBUG) # set root log level log = app.logger log.info("app=%s", app) # log.debug("app.config=%s", app.config) print("create_log: after init log: app.logger=%s" % app.logger) return log def create_mongo(app): # mongo_client = MongoClient( # host=app.config["MONGODB_HOST"], # port=app.config["MONGODB_PORT"], # username=app.config["MONGODB_USERNAME"], # password=app.config["MONGODB_PASSWORD"], # authSource=app.config["MONGODB_AUTH_SOURCE"] # ) if settings.MONGODB_AUTH_SOURCE: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), username=settings.MONGODB_USERNAME, password=settings.MONGODB_PASSWORD, authSource=settings.MONGODB_AUTH_SOURCE ) elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), username=settings.MONGODB_USERNAME, password=settings.MONGODB_PASSWORD, ) elif settings.MONGODB_PORT: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), ) elif settings.MONGODB_HOST: mongo_client = MongoClient( host=settings.MONGODB_HOST, ) else: mongo_client = MongoClient() return mongo_client def create_gridfs_fs_collection(mongo_db): # Pure PyMongo gridfs_db = mongo_db.gridfs # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs') gridfs_fs_collection = GridFS(gridfs_db) # <gridfs.GridFS object at 0x1107b2390> return gridfs_fs_collection def create_celery_app(app=None): print("create_celery_app: app=%s" % app) app = app or create_app(settings, init_extensions=False) app_import_name = app.import_name # app_name = app.name # celery_app_name = app_name celery_app_name = app_import_name celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) # celery.log = app.logger TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask # celery.init_app(app) print("init celery ok") return celery
(4)app.py
app中只是调用create_app
而不去初始化celery了
import os from conf.app import settings from factory import create_app # from factory import log # from factory import create_celery_app ################################################################################ # Global Definitions ################################################################################ ################################################################################ # Global Variables ################################################################################ ################################################################################ # Global Function ################################################################################ ################################################################################ # Global Init App ################################################################################ print("in flask app: settings=%s" % (settings)) app = create_app(settings) app.app_context().push() # register_extensions(app) log = app.logger log.debug("app=%s", app) log.debug("log=%s", log) log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV) log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s", settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST) # celery = None # with app.app_context(): # celery = create_celery_app(app) # print("celery=%s" % celery) if __name__ == "__main__": app.run( host=settings.FLASK_HOST, port=settings.FLASK_PORT, debug=settings.DEBUG, use_reloader=False )
(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
from flask import g from resources.tts import gTempAudioFolder log = g.log mongo = g.mongo fsCollection = g.fsCollection class GridfsAPI(Resource): def get(self, fileId, fileName=None): # log = app.logger log.info("fileId=%s, file_name=%s", fileId, fileName) ...
resources/qa.py
from flask import g app = g.app log = g.log fsCollection = g.fsCollection
resources/tts.py
from flask import g app = g.app log = g.log
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:
接着再去Mac的终端中去运行celery的worker
celery worker -A resources.tasks.celery --loglevel=DEBUG
然后就可以有正常输出了:
➜ xxxRobotDemoServer git:(master) ✗ celery worker -A resources.tasks.celery --loglevel=DEBUG cur_flask_environ=None FLASK_ENV=development cur_dir=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app env_folder=development dotenv_path=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app/development/.env dotenv_load_ok=True After load .env: DEBUG=True, MONGODB_HOST=xxx, FILE_URL_HOST=127.0.0.1 in extensions_celery: celery=<Celery RobotQA at 0x104277e80> create_celery_app return: celery=<Celery RobotQA at 0x104277e80>, log=<Logger resources.extensions_celery (WARNING)> [2018-08-26 11:11:45,796: DEBUG/MainProcess] | Worker: Preparing bootsteps. [2018-08-26 11:11:45,801: DEBUG/MainProcess] | Worker: Building graph... [2018-08-26 11:11:45,802: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer} [2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Preparing bootsteps. [2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Building graph... [2018-08-26 11:11:45,876: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop} [email protected] v4.2.1 (windowlicker) Darwin-17.7.0-x86_64-i386-64bit-PE 2018-08-26 11:11:45 [config] .> app: RobotQA:0x104277e80 .> transport: redis://localhost:6379/0 .> results: disabled:// .> concurrency: 4 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile [2018-08-26 11:11:45,921: DEBUG/MainProcess] | Worker: Starting Hub [2018-08-26 11:11:45,922: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:45,922: DEBUG/MainProcess] | Worker: Starting Pool [2018-08-26 11:11:46,059: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:46,060: DEBUG/MainProcess] | Worker: Starting Consumer [2018-08-26 11:11:46,061: DEBUG/MainProcess] | Consumer: Starting Connection [2018-08-26 11:11:46,098: INFO/MainProcess] Connected to redis://localhost:6379/0 [2018-08-26 11:11:46,098: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:46,098: DEBUG/MainProcess] | Consumer: Starting Events [2018-08-26 11:11:46,114: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:46,114: DEBUG/MainProcess] | Consumer: Starting Heart [2018-08-26 11:11:46,117: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:46,119: DEBUG/MainProcess] | Consumer: Starting Mingle [2018-08-26 11:11:46,119: INFO/MainProcess] mingle: searching for neighbors [2018-08-26 11:11:47,155: INFO/MainProcess] mingle: all alone [2018-08-26 11:11:47,156: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:47,156: DEBUG/MainProcess] | Consumer: Starting Tasks [2018-08-26 11:11:47,161: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:47,161: DEBUG/MainProcess] | Consumer: Starting Control [2018-08-26 11:11:47,167: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:47,167: DEBUG/MainProcess] | Consumer: Starting Gossip [2018-08-26 11:11:47,172: DEBUG/MainProcess] ^-- substep ok [2018-08-26 11:11:47,172: DEBUG/MainProcess] | Consumer: Starting event loop [2018-08-26 11:11:47,173: DEBUG/MainProcess] | Worker: Hub.register Pool... [2018-08-26 11:11:47,173: INFO/MainProcess] [email protected] ready. [2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
其中的task是:
[tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16 [2018-08-26 11:14:28,913: INFO/MainProcess] Received task: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] ETA:[2018-08-26 03:14:38.521681+00:00] [2018-08-26 11:14:28,915: DEBUG/MainProcess] basic.qos: prefetch_count->17 [2018-08-26 11:14:38,527: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840> (args:('resources.tasks.deleteTmpAudioFile', '6ea6c103-7b58-4d38-ad54-06666b0ebb59', {'lang': 'py', 'task': 'resources.tasks.deleteTmpAudioFile', 'id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'shadow': None, 'eta': '2018-08-26T03:14:38.521681+00:00', 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'parent_id': None, 'argsrepr': "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']", 'kwargsrepr': '{}', 'origin': ' [email protected] ', 'reply_to': '10c57fe8-23c6-3e20-bea6-0576fa07fe57', 'correlation_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[["42667515-281a-4cab-bfac-5d2c23c25a34.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{}) [2018-08-26 11:14:38,564: DEBUG/MainProcess] basic.qos: prefetch_count->16 [2018-08-26 11:14:38,626: DEBUG/MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] pid:47617 [2018-08-26 11:14:38,628: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: deleteTmpAudioFile: celery=<Celery RobotQA at 0x104277e80>, filename=42667515-281a-4cab-bfac-5d2c23c25a34.mp3 [2018-08-26 11:14:38,632: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolder=tmp/audio [2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: curFolderAbsPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer [2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio [2018-08-26 11:14:38,635: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: Ok to delete file /Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio/42667515-281a-4cab-bfac-5d2c23c25a34.mp3 [2018-08-26 11:14:38,638: INFO/ForkPoolWorker-2] Task resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] succeeded in 0.012065329006873071s: None
加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
@celery.task # def celeryRefreshAzureSpeechToken(): def refreshAzureSpeechToken(): """celery's task: refresh microsoft azure speech token key for later call tts/ASR api""" log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): # from resources.tts import refreshAzureSpeechToken # refreshAzureSpeechToken() # global gMsToken # log = app.logger # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken) # log.info("refreshAzureSpeechToken") getMsTokenUrl = settings.MS_GET_TOKEN_URL reqHeaders = { "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY } log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders) resp = requests.post(getMsTokenUrl, headers=reqHeaders) log.info("resp=%s", resp) respTokenText = resp.text # eyxxxxiJ9.xxx.xxx log.info("respTokenText=%s", respTokenText) # gMsToken = respTokenText updatedToken = respTokenText # # for debug # gMsToken = "eyJ0eXAiOxxxxnez" # log.info("after refresh: gMsToken=%s", gMsToken) return updatedToken
避免了之前的:
from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken()
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
而
resources/tts.py
中的,对于:
from common.util import generateUUID
必要要加上导入sys的path:
import sys resouces_dir = os.path.dirname(__file__) project_root_dir = os.path.abspath(os.path.join(resouces_dir, "..")) if project_root_dir not in sys.path: sys.path.append(project_root_dir)
否则会出现无法导入
以及tts内的token的代码也去更新了:
from resources.tasks import refreshAzureSpeechToken def getAzureSpeechToken(): """get Microsoft Azure speech service token key""" # log = app.logger global gMsToken gMsToken = refreshAzureSpeechToken() ... else: # if errNo == BAIDU_ERR_TOKEN_INVALID: if errNo == settings.MS_ERR_UNAUTHORIZED: log.warning("Token invalid -> retry one for refresh token") # refreshBaiduToken() gMsToken = refreshAzureSpeechToken() # isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText) isOk, audioBinData, errNo, errMsg = msTTS(unicodeText)
心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config['CELERY_IMPORTS'] = ('tasks.add_together', ) app.config['CELERY_BROKER_URL'] = 'redis://localhost' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost' celery.init_app(app) return app
或:
from celery import Celery from flask import Flask from app.config import BaseConfig celery = Celery(__name__, broker=BaseConfig.CELERY_BROKER_URL) def create_app(): app = Flask(__name__) # .... celery.conf.update(app.config) # 更新 celery 的配置 # ... return app
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
或
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0" # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result # for periodical celery task CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
celery worker -A resources.tasks.celery --loglevel=DEBUG
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
pipenv shell celery worker -A resources.tasks.celery --loglevel=DEBUG
和:
pipenv shell celery beat -A resources.tasks.celery -s runtime/celerybeat-schedule --loglevel=DEBUG
终于可以正常执行celery的异步函数和定期任务了。
转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery