之前Flask的代码是都写到一个app.py中的:
整个扩展性不够好。
听说有个blueprint和factory mode?可以更好的组织代码的架构,所以去搞清楚如何实现蓝图和工厂模式。
flask 蓝图 工厂模式
flask blueprint factory
感觉意思是:factories是不同的模式,比如dev还是prod
如果是,那么我此处之前已经都实现了,就不需要了(之前经过通过doctenv,动态监测传入的FLASK_ENV去加载dev还是prod)
0.7版Flask后才加入的的Blueprint
Flask的功能的模块化,叫做蓝图Blueprint,
在一个app中,或跨多个app,去支持常见的模式
是组织和扩展Flask的应用的一种方式
一个蓝图,本身不是app,而Flask是app
蓝图是一对操作的集合,可以挂载到Flask的app中
-》感觉貌似暂时不需要去用到什么蓝图
而:
def create_app(config_filename): app = Flask(__name__) 。。。
看起来倒是需要的
“查看单个蓝图(如 users),它会封装所有组件以处理应用程序的用户管理方面(定义路由,处理表单,生成模板等)”
我此处暂时对于,单独的路由,表单,模板等,没需求。
但是对于模块化,有需求。
看起来还是感觉用不到
有点明白,但是不是很清楚
我此处还用到了:flask_restful
所以和普通的app的route()还不一样
所以要搞清楚flask_restful的蓝图和工厂模式
flask_restful factory
flask-restful factory
flask-restful factory
去折腾试试
期间遇到很多把flask的app的初始化,转换为factory的模式:
【未解决】Flask中如何用工厂模式初始化pymongo
然后再去解决:
【未解决】Flask中如何用工厂模式初始化flask-restful的api
其中对于代码的目录结构,本来是打开不同的业务逻辑,单独放在一个目录下,比如tts,asr,qa等,然后每个目录中包含views.py的:
➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── app.py ├── asr │ └── views.py ├── conf ├── factory.py ├── files │ └── views.py ├── qa │ └── views.py └── tts └── views.py
后来参考:
这个flask-restful的官网教程,发现最好是:
都放到resources目录下,变成:
├── app.py ├── factory.py ├── resources │ ├── __init__.py │ ├── ars.py │ ├── files.py │ ├── qa.py │ └── tts.py
接着再去:
【未解决】Flask中如何用工厂模式初始化Celery
另外,关于flask-restful的api的导入,后续可以参考:
放到resources中试试
期间遇到了循环导入的问题:
【已解决】Flask中循环导入app和create_app的问题:ImportError: cannot import name ‘create_app’
然后此处,终于,至少可以PyCharm中,能调试和跑起来Flask的app了:
不过感觉还是有点点小问题:
【已解决】Flask中换成工厂模式去初始化app实例后app被初始化两次
目前看来,好像是可以了。
再继续调试,确保本地可以正常运行起来产品demo的Flask的app,给前端提供可用的api
此处又出现问题了:
【已解决】Flask中如何用工厂模式初始化Celery
的
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
【总结】
最终实现了:
- 避免了循环导入
- 避免了两次初始化Flask的app
- Flask的调试模式去掉
- celery中不要create_app
代码结构:
➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── README.md ├── app.py ├── conf │ ├── __init__.py │ ├── app │ │ ├── __init__.py │ │ ├── development │ │ │ └── __init__.py │ │ ├── production │ │ │ └── __init__.py │ │ └── settings.py │ ├── gunicorn │ │ └── gunicorn_config.py │ └── supervisor │ ├── supervisord_local.conf │ └── supervisord_server.conf ├── factory.py ├── resources │ ├── __init__.py │ ├── asr.py │ ├── extensions_celery.py │ ├── files.py │ ├── qa.py │ ├── tasks.py │ └── tts.py ...
代码如下:
app.py
import os from conf.app import settings from factory import create_app # from factory import log # from factory import create_celery_app ################################################################################ # Global Definitions ################################################################################ ################################################################################ # Global Variables ################################################################################ ################################################################################ # Global Function ################################################################################ ################################################################################ # Global Init App ################################################################################ print("in flask app: settings=%s" % (settings)) app = create_app(settings) app.app_context().push() # register_extensions(app) log = app.logger log.debug("app=%s", app) log.debug("log=%s", log) log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV) log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s", settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST) # celery = None # with app.app_context(): # celery = create_celery_app(app) # print("celery=%s" % celery) if __name__ == "__main__": app.run( host=settings.FLASK_HOST, port=settings.FLASK_PORT, debug=settings.DEBUG, use_reloader=False )
factory.py
import os from flask import Flask import logging from logging.handlers import RotatingFileHandler # from flask_pymongo import PyMongo from gridfs import GridFS from pymongo import MongoClient from flask_restful import Api from flask_cors import CORS from conf.app import settings from celery import Celery # from flask_celery import Celery # from resources.extensions_celery import celery from flask import g ################################################################################ # Global Variables ################################################################################ # # log = logging.getLogger() #<RootLogger root (WARNING)> # log = None # print("log=%s" % log) # # # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True) # mongo = None # print("mongo=%s" % mongo) # fsCollection = None #None # print("fsCollection=%s" % fsCollection) # # celery = Celery() #<Celery __main__ at 0x1068d8b38> # celery = None # print("celery=%s" % celery) ################################################################################ # Global Function ################################################################################ def create_app(config_object, init_extensions=True): # global log # app = Flask(__name__) #<Flask 'factory'> app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'> CORS(app) # app.config.from_object('config.DevelopmentConfig') # # app.config.from_object('config.ProductionConfig') app.config.from_object(config_object) with app.app_context(): g.app = app log = create_log(app) g.log = log log.debug("after load from object: app.config=%s", app.config) log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s', app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"]) if init_extensions: register_extensions(app) log.info("flask app extensions init completed") return app def register_extensions(app): # global log, mongo, fsCollection, api, celery log = g.log mongo = create_mongo(app) g.mongo = mongo log.info("mongo=%s", mongo) mongoServerInfo = mongo.server_info() log.debug("mongoServerInfo=%s", mongoServerInfo) fsCollection = create_gridfs_fs_collection(mongo) g.fsCollection = fsCollection log.info("fsCollection=%s", fsCollection) celery = create_celery_app(app) g.celery = celery log.info("celery=%s", celery) # api = Api(app) api = create_rest_api(app) log.debug("api=%s", api) g.api = api return app def create_rest_api(app): from resources.qa import RobotQaAPI from resources.asr import RobotAsrAPI from resources.files import GridfsAPI, TmpAudioAPI rest_api = Api() rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa') rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr') rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs') rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio') rest_api.init_app(app) return rest_api def create_log(app): print("create_log: before init log: app.logger=%s" % app.logger) logFormatterStr = app.config["LOG_FORMAT"] logFormatter = logging.Formatter(logFormatterStr) fileHandler = RotatingFileHandler( app.config['LOG_FILE_FILENAME'], maxBytes=app.config["LOG_FILE_MAX_BYTES"], backupCount=app.config["LOG_FILE_BACKUP_COUNT"], encoding="UTF-8") fileHandler.setLevel(logging.DEBUG) fileHandler.setFormatter(logFormatter) app.logger.addHandler(fileHandler) # Note: should NOT set StreamHandler here, otherwise will duplicate debug log app.logger.setLevel(logging.DEBUG) # set root log level log = app.logger log.info("app=%s", app) # log.debug("app.config=%s", app.config) print("create_log: after init log: app.logger=%s" % app.logger) return log def create_mongo(app): # mongo_client = MongoClient( # host=app.config["MONGODB_HOST"], # port=app.config["MONGODB_PORT"], # username=app.config["MONGODB_USERNAME"], # password=app.config["MONGODB_PASSWORD"], # authSource=app.config["MONGODB_AUTH_SOURCE"] # ) if settings.MONGODB_AUTH_SOURCE: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), username=settings.MONGODB_USERNAME, password=settings.MONGODB_PASSWORD, authSource=settings.MONGODB_AUTH_SOURCE ) elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), username=settings.MONGODB_USERNAME, password=settings.MONGODB_PASSWORD, ) elif settings.MONGODB_PORT: mongo_client = MongoClient( host=settings.MONGODB_HOST, port=int(settings.MONGODB_PORT), ) elif settings.MONGODB_HOST: mongo_client = MongoClient( host=settings.MONGODB_HOST, ) else: mongo_client = MongoClient() return mongo_client def create_gridfs_fs_collection(mongo_db): # Pure PyMongo gridfs_db = mongo_db.gridfs # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs') gridfs_fs_collection = GridFS(gridfs_db) # <gridfs.GridFS object at 0x1107b2390> return gridfs_fs_collection def create_celery_app(app=None): print("create_celery_app: app=%s" % app) app = app or create_app(settings, init_extensions=False) app_import_name = app.import_name # app_name = app.name # celery_app_name = app_name celery_app_name = app_import_name celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) # celery.log = app.logger TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs) return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask # celery.init_app(app) print("init celery ok") return celery
其他几个模块:
resources/files.py
import os from flask_restful import Resource from bson.objectid import ObjectId from flask import request from flask import jsonify from common.util import sendFile # from app import fsCollection # from app import log # from app import app # from flask import current_app as app # from factory import mongo, fsCollection from flask import g from resources.tts import gTempAudioFolder log = g.log mongo = g.mongo fsCollection = g.fsCollection # log.debug("mongo=%s, fsCollection=%s", mongo, fsCollection) class GridfsAPI(Resource): def get(self, fileId, fileName=None): # log = app.logger log.info("fileId=%s, file_name=%s", fileId, fileName) ... return sendFile(fileBytes, fileObj.content_type, outputFilename) class TmpAudioAPI(Resource): def get(self, filename=None): # log = app.logger log.info("TmpAudioAPI: filename=%s", filename) tmpAudioFullPath = os.path.join(gTempAudioFolder, filename) log.info("tmpAudioFullPath=%s", tmpAudioFullPath) ... return sendFile(fileBytes, contentType, outputFilename)
resources/qa.py
import os import sys from urllib.parse import quote from flask_restful import Resource from flask_restful import reqparse from bson.objectid import ObjectId from conf.app import settings # from app import app, log # from flask import current_app as app # from app import fsCollection from flask import g app = g.app log = g.log fsCollection = g.fsCollection print(__file__) if settings.FLASK_ENV == "production": # production: online dev server ... else: # development: local debug ... sys.path.append(xxxRootPath) ... from resources.tts import processResponse ... class RobotQaAPI(Resource): def get(self): ... return processResponse(respDict, voiceName, voiceRate, voiceVolume)
resources/tts.py
import os import requests from urllib.parse import quote from flask import jsonify # from app import deleteTmpAudioFile # from tasks import deleteTmpAudioFile from resources.tasks import deleteTmpAudioFile from resources.tasks import refreshAzureSpeechToken from conf.app import settings # import sys # resouces_dir = os.path.dirname(__file__) # project_root_dir = os.path.abspath(os.path.join(resouces_dir, "..")) # if project_root_dir not in sys.path: # sys.path.append(project_root_dir) from common.util import generateUUID from flask import g app = g.app log = g.log ################################################################################ # Global Definitions ################################################################################ ################################################################################ # Global Variables ################################################################################ ... gCurBaiduRespDict = {} # get baidu token resp dict gMsToken = "" gTempAudioFolder = "" ################################################################################ # Global Function ################################################################################ #---------------------------------------- # Audio Synthesis / TTS(Text-To-Speech) #---------------------------------------- def createAudioTempFolder(): """create foler to save later temp audio files""" global gTempAudioFolder # log = app.logger # init audio temp folder for later store temp audio file audioTmpFolder = settings.AUDIO_TEMP_FOLDER log.info("audioTmpFolder=%s", audioTmpFolder) ... def saveAudioDataToTmpFile(audioBinData): """ save audio binary data into temp file :param audioBinData: binary data of audio file :return: audio tmp file name """ global gTempAudioFolder # log = app.logger audioBinDataLen = len(audioBinData) log.info("saveAudioDataToTmpFile: audioBinDataLen=%s", audioBinDataLen) ... return tempFilename ... def getAzureSpeechToken(): """get Microsoft Azure speech service token key""" # log = app.logger global gMsToken gMsToken = refreshAzureSpeechToken() def msTTS(unicodeText, voiceName=settings.MS_TTS_VOICE_NAME, voiceRate=settings.MS_TTS_VOICE_RATE, voiceVolume=settings.MS_TTS_VOICE_VOLUME): """call ms azure tts to generate audio(mp3/wav/...) from text""" global gMsToken # log = app.logger log.info("msTTS: unicodeText=%s", unicodeText) ... return isOk, audioBinData, errNo, errMsg ... ################################################################################ # Global Init ################################################################################ # testAudioSynthesis() initAudioService() log.info("TTS init complete")
和Celery相关的:
resources/extensions_celery.py
# from flask_celery import Celery from conf.app import settings from celery import Celery from celery.utils.log import get_task_logger # celery = Celery() celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL) celery_logger = get_task_logger(__name__) print("in extensions_celery: celery=%s" % celery)
resources/tasks.py
import os import requests # from resources.extensions_celery import celery # print("celery=%s" % celery) # print("celery.app=%s" % celery.app) # print("celery.app.log=%s" % celery.app.log) # print("celery.log=%s" % celery.log) from conf.app import settings # try: # from flask import g # print("tasks: import flask g ok") # print("g=%s" % g) # log = g.log # print("log=%s" % log) # except RuntimeError as err: # # except: # print("tasks: failed to import flask g, err=%s" % err) # from factory import create_app # print("tasks: import create_app ok") # app = create_app(settings) # print("create_app ok, app=%s" % app) # log = app.logger # print("log=%s" % log) # import logging # log = logging.getLogger(settings.FLASK_APP_NAME) # log.info("test logging getLogger from flask app, work?") # print("-----before: from factory import create_celery_app") # from factory import create_celery_app # from factory import create_log, create_app # print("from factory import create_celery_app ok") # celery = create_celery_app() # app = create_app(settings) # celery = create_celery_app(app) # log = app.logger from resources.extensions_celery import celery, celery_logger as log print("create_celery_app return: celery=%s, log=%s" % (celery, log)) #---------------------------------------- # Celery tasks #---------------------------------------- # @celery.task() @celery.task # @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename)) # log = app.logger # with celery.app.app_context(): # log = celery.app.logger # app = celery.app # log = celery.log # print("celery.log=%s" % celery.log) # print("log=%s" % log) log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename) audioTmpFolder = settings.AUDIO_TEMP_FOLDER # audioTmpFolder = "tmp/audio" log.info("audioTmpFolder=%s", audioTmpFolder) ... @celery.task # def celeryRefreshAzureSpeechToken(): def refreshAzureSpeechToken(): """celery's task: refresh microsoft azure speech token key for later call tts/ASR api""" log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery) # with celery.app.app_context(): # from resources.tts import refreshAzureSpeechToken # refreshAzureSpeechToken() # global gMsToken # log = app.logger # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken) # log.info("refreshAzureSpeechToken") getMsTokenUrl = settings.MS_GET_TOKEN_URL reqHeaders = { "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY } log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders) resp = requests.post(getMsTokenUrl, headers=reqHeaders) log.info("resp=%s", resp) respTokenText = resp.text # eyxxxxiJ9.xxx.xxx log.info("respTokenText=%s", respTokenText) # gMsToken = respTokenText updatedToken = respTokenText return updatedToken @celery.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender)) # with celery.app.app_context(): # log = app.logger # log = celery.app.logger # app = celery.app # log = celery.log # # print("celery.log=%s" % celery.log) # # print("log=%s" % log) # print("sender=%s" % sender) # log.info("celerySetupPeriodicTasks: sender=%s", sender) # print("celerySetupPeriodicTasks: log is usable") sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL, # celeryRefreshAzureSpeechToken.s(), refreshAzureSpeechToken.s(), name="refresh ms Azure token every less than 10 minutes")
转载请注明:在路上 » 【已解决】用蓝图和工厂模式去优化现有Flask项目代码结构