最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】用蓝图和工厂模式去优化现有Flask项目代码结构

Flask crifan 1454浏览 0评论
之前Flask的代码是都写到一个app.py中的:
整个扩展性不够好。
听说有个blueprint和factory mode?可以更好的组织代码的架构,所以去搞清楚如何实现蓝图和工厂模式。
flask 蓝图 工厂模式
flask blueprint factory
Application Factories — Flask 1.0.2 documentation
Patterns for Flask — Flask 1.0.2 documentation
python – Flask: are blueprints necessary for app factories? – Stack Overflow
感觉意思是:factories是不同的模式,比如dev还是prod
如果是,那么我此处之前已经都实现了,就不需要了(之前经过通过doctenv,动态监测传入的FLASK_ENV去加载dev还是prod)
Working with Blueprints
Modular Applications with Blueprints — Flask 1.0.2 documentation
0.7版Flask后才加入的的Blueprint
Flask的功能的模块化,叫做蓝图Blueprint,
在一个app中,或跨多个app,去支持常见的模式
是组织和扩展Flask的应用的一种方式
一个蓝图,本身不是app,而Flask是app
蓝图是一对操作的集合,可以挂载到Flask的app中
-》感觉貌似暂时不需要去用到什么蓝图
而:
Application Factories — Flask 1.0.2 documentation
def create_app(config_filename):
    app = Flask(__name__)
。。。
看起来倒是需要的
组织 Flask 项目的结构 | 欢乐蟒
“查看单个蓝图(如 users),它会封装所有组件以处理应用程序的用户管理方面(定义路由,处理表单,生成模板等)”
我此处暂时对于,单独的路由,表单,模板等,没需求。
但是对于模块化,有需求。
Flask Web Development —— 大型应用程序结构(上) – young – SegmentFault 思否
看起来还是感觉用不到
Flask进阶系列(七)–应用最佳实践 – 思诚之道
有点明白,但是不是很清楚
理解 Flask中的 工厂函数 – CSDN博客
用 Flask 来写个轻博客 (18) — 使用工厂模式来生成应用对象 – CSDN博客
Flask 工厂函数 – 简书
应用程序的工厂模式 — Flask 0.7 documentation
应用程序的工厂函数 — Flask 0.10.1 文档
第五步: 视图函数 — Flask 0.7 documentation
我此处还用到了:flask_restful
所以和普通的app的route()还不一样
所以要搞清楚flask_restful的蓝图和工厂模式
flask_restful factory
rest – python flask-restful blueprint and factory pattern work together? – Stack Overflow
python – 404 when accessing resource Flask-Restful – Stack Overflow
definitive guide to using flask-restful with Blueprints · Issue #204 · flask-restful/flask-restful
用蓝图实现模块化的应用 — Flask 0.10.1 文档
flask-restful factory
Python flask RESTful: Directory structure – Stack Overflow
flask-restful factory
Delayed initialization — init_app() not working · Issue #203 · flask-restful/flask-restful
Api.init_app() does not work as intended · Issue #644 · flask-restful/flask-restful
init_app does not set the application · Issue #679 · flask-restful/flask-restful
Flask RESTful API 开发—-基础篇 (2)
flyhigher139/restapi_example: A flask restful api development example
去折腾试试
期间遇到很多把flask的app的初始化,转换为factory的模式:
【未解决】Flask中如何用工厂模式初始化pymongo
然后再去解决:
【未解决】Flask中如何用工厂模式初始化flask-restful的api
其中对于代码的目录结构,本来是打开不同的业务逻辑,单独放在一个目录下,比如tts,asr,qa等,然后每个目录中包含views.py的:
➜  xxxRobotDemoServer git:(master) ✗ tree .
...
├── app.py
├── asr
│   └── views.py
├── conf
├── factory.py
├── files
│   └── views.py
├── qa
│   └── views.py
└── tts
    └── views.py
后来参考:
Intermediate Usage — Flask-RESTful 0.2.1 documentation
这个flask-restful的官网教程,发现最好是:
都放到resources目录下,变成:
├── app.py
├── factory.py
├── resources
│   ├── __init__.py
│   ├── ars.py
│   ├── files.py
│   ├── qa.py
│   └── tts.py
接着再去:
【未解决】Flask中如何用工厂模式初始化Celery
另外,关于flask-restful的api的导入,后续可以参考:
rest – python flask-restful blueprint and factory pattern work together? – Stack Overflow
放到resources中试试
期间遇到了循环导入的问题:
【已解决】Flask中循环导入app和create_app的问题:ImportError: cannot import name ‘create_app’
然后此处,终于,至少可以PyCharm中,能调试和跑起来Flask的app了:
不过感觉还是有点点小问题:
【已解决】Flask中换成工厂模式去初始化app实例后app被初始化两次
目前看来,好像是可以了。
再继续调试,确保本地可以正常运行起来产品demo的Flask的app,给前端提供可用的api
此处又出现问题了:
【已解决】Flask中如何用工厂模式初始化Celery
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
【总结】
最终实现了:
  • 避免了循环导入
  • 避免了两次初始化Flask的app
    • Flask的调试模式去掉
    • celery中不要create_app
代码结构:
➜  xxxRobotDemoServer git:(master) ✗ tree .
...
├── README.md
├── app.py
├── conf
│   ├── __init__.py
│   ├── app
│   │   ├── __init__.py
│   │   ├── development
│   │   │   └── __init__.py
│   │   ├── production
│   │   │   └── __init__.py
│   │   └── settings.py
│   ├── gunicorn
│   │   └── gunicorn_config.py
│   └── supervisor
│       ├── supervisord_local.conf
│       └── supervisord_server.conf
├── factory.py
├── resources
│   ├── __init__.py
│   ├── asr.py
│   ├── extensions_celery.py
│   ├── files.py
│   ├── qa.py
│   ├── tasks.py
│   └── tts.py
...
代码如下:
app.py
import os

from conf.app import settings
from factory import create_app
# from factory import log
# from factory import create_celery_app


################################################################################
# Global Definitions
################################################################################

################################################################################
# Global Variables
################################################################################


################################################################################
# Global Function
################################################################################


################################################################################
# Global Init App
################################################################################

print("in flask app: settings=%s" % (settings))
app = create_app(settings)
app.app_context().push()
# register_extensions(app)
log = app.logger
log.debug("app=%s", app)

log.debug("log=%s", log)
log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV)
log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s",
          settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST)

# celery = None
# with app.app_context():
# celery = create_celery_app(app)
# print("celery=%s" % celery)

if __name__ == "__main__":
    app.run(
        host=settings.FLASK_HOST,
        port=settings.FLASK_PORT,
        debug=settings.DEBUG,
        use_reloader=False
    )
factory.py
import os
from flask import Flask
import logging
from logging.handlers import RotatingFileHandler
# from flask_pymongo import PyMongo
from gridfs import GridFS
from pymongo import MongoClient
from flask_restful import Api
from flask_cors import CORS

from conf.app import settings

from celery import Celery
# from flask_celery import Celery
# from  resources.extensions_celery import celery

from flask import g

################################################################################
# Global Variables
################################################################################
# # log = logging.getLogger() #<RootLogger root (WARNING)>
# log = None
# print("log=%s" % log)
#
# # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)
# mongo = None
# print("mongo=%s" % mongo)
# fsCollection = None #None
# print("fsCollection=%s" % fsCollection)
#
# celery = Celery() #<Celery __main__ at 0x1068d8b38>
# celery = None
# print("celery=%s" % celery)

################################################################################
# Global Function
################################################################################

def create_app(config_object, init_extensions=True):
    # global log

    # app = Flask(__name__) #<Flask 'factory'>
    app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
    CORS(app)

    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')

    app.config.from_object(config_object)
    with app.app_context():
        g.app = app

        log = create_log(app)
        g.log = log

        log.debug("after load from object: app.config=%s", app.config)
        log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s',
                  app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"])

        if init_extensions:
            register_extensions(app)
            log.info("flask app extensions init completed")

    return app


def register_extensions(app):
    # global log, mongo, fsCollection, api, celery

    log = g.log

    mongo = create_mongo(app)
    g.mongo = mongo
    log.info("mongo=%s", mongo)
    mongoServerInfo = mongo.server_info()
    log.debug("mongoServerInfo=%s", mongoServerInfo)

    fsCollection = create_gridfs_fs_collection(mongo)
    g.fsCollection = fsCollection
    log.info("fsCollection=%s", fsCollection)

    celery = create_celery_app(app)
    g.celery = celery
    log.info("celery=%s", celery)

    # api = Api(app)
    api = create_rest_api(app)
    log.debug("api=%s", api)
    g.api = api

    return app


def create_rest_api(app):
    from resources.qa import RobotQaAPI
    from resources.asr import RobotAsrAPI
    from resources.files import GridfsAPI, TmpAudioAPI

    rest_api = Api()
    rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa')
    rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr')
    rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs')
    rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio')

    rest_api.init_app(app)
    return rest_api


def create_log(app):
    print("create_log: before init log: app.logger=%s" % app.logger)
    logFormatterStr = app.config["LOG_FORMAT"]
    logFormatter = logging.Formatter(logFormatterStr)

    fileHandler = RotatingFileHandler(
        app.config['LOG_FILE_FILENAME'],
        maxBytes=app.config["LOG_FILE_MAX_BYTES"],
        backupCount=app.config["LOG_FILE_BACKUP_COUNT"],
        encoding="UTF-8")
    fileHandler.setLevel(logging.DEBUG)
    fileHandler.setFormatter(logFormatter)
    app.logger.addHandler(fileHandler)
    # Note: should NOT set StreamHandler here, otherwise will duplicate debug log
    app.logger.setLevel(logging.DEBUG)  # set root log level

    log = app.logger
    log.info("app=%s", app)
    # log.debug("app.config=%s", app.config)
    print("create_log: after init log: app.logger=%s" % app.logger)

    return log


def create_mongo(app):
    # mongo_client = MongoClient(
    #     host=app.config["MONGODB_HOST"],
    #     port=app.config["MONGODB_PORT"],
    #     username=app.config["MONGODB_USERNAME"],
    #     password=app.config["MONGODB_PASSWORD"],
    #     authSource=app.config["MONGODB_AUTH_SOURCE"]
    # )

    if settings.MONGODB_AUTH_SOURCE:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
            authSource=settings.MONGODB_AUTH_SOURCE
        )
    elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
        )
    elif settings.MONGODB_PORT:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
        )
    elif settings.MONGODB_HOST:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
        )
    else:
        mongo_client = MongoClient()


    return mongo_client


def create_gridfs_fs_collection(mongo_db):
    # Pure PyMongo
    gridfs_db = mongo_db.gridfs  # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs')
    gridfs_fs_collection = GridFS(gridfs_db)  # <gridfs.GridFS object at 0x1107b2390>
    return gridfs_fs_collection


def create_celery_app(app=None):
    print("create_celery_app: app=%s" % app)

    app = app or create_app(settings, init_extensions=False)
    app_import_name = app.import_name
    # app_name = app.name
    # celery_app_name = app_name
    celery_app_name = app_import_name
    celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)

    # celery.log = app.logger

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask


    # celery.init_app(app)

    print("init celery ok")

    return celery
其他几个模块:
resources/files.py
import os
from flask_restful import Resource
from bson.objectid import ObjectId
from flask import request
from flask import jsonify

from common.util import sendFile
# from app import fsCollection
# from app import log
# from app import app
# from flask import current_app as app
# from factory import mongo, fsCollection
from flask import g

from resources.tts import gTempAudioFolder

log = g.log
mongo = g.mongo
fsCollection = g.fsCollection
# log.debug("mongo=%s, fsCollection=%s", mongo, fsCollection)

class GridfsAPI(Resource):

    def get(self, fileId, fileName=None):
        # log = app.logger
        log.info("fileId=%s, file_name=%s", fileId, fileName)
        ...
        return sendFile(fileBytes, fileObj.content_type, outputFilename)


class TmpAudioAPI(Resource):

    def get(self, filename=None):
        # log = app.logger
        log.info("TmpAudioAPI: filename=%s", filename)

        tmpAudioFullPath = os.path.join(gTempAudioFolder, filename)
        log.info("tmpAudioFullPath=%s", tmpAudioFullPath)
        ...
            return sendFile(fileBytes, contentType, outputFilename)
resources/qa.py
import os
import sys
from urllib.parse import quote
from flask_restful import Resource
from flask_restful import reqparse
from bson.objectid import ObjectId

from conf.app import settings

# from app import app, log
# from flask import current_app as app
# from app import fsCollection
from flask import g

app = g.app
log = g.log
fsCollection = g.fsCollection

print(__file__)
if settings.FLASK_ENV == "production":
    # production: online dev server
    ...
else:
    # development: local debug
    ...
sys.path.append(xxxRootPath)
...

from resources.tts import processResponse
...

class RobotQaAPI(Resource):

    def get(self):
        ...
            return processResponse(respDict, voiceName, voiceRate, voiceVolume)
resources/tts.py
import os
import requests
from urllib.parse import quote
from flask import jsonify

# from app import deleteTmpAudioFile
# from tasks import deleteTmpAudioFile
from resources.tasks import deleteTmpAudioFile
from resources.tasks import refreshAzureSpeechToken

from conf.app import settings
# import sys
# resouces_dir = os.path.dirname(__file__)
# project_root_dir = os.path.abspath(os.path.join(resouces_dir, ".."))
# if project_root_dir not in sys.path:
#     sys.path.append(project_root_dir)
from common.util import generateUUID


from flask import g

app = g.app
log = g.log

################################################################################
# Global Definitions
################################################################################

################################################################################
# Global Variables
################################################################################
...
gCurBaiduRespDict = {} # get baidu token resp dict

gMsToken = ""

gTempAudioFolder = ""


################################################################################
# Global Function
################################################################################


#----------------------------------------
# Audio Synthesis / TTS(Text-To-Speech)
#----------------------------------------

def createAudioTempFolder():
    """create foler to save later temp audio files"""
    global gTempAudioFolder
    # log = app.logger

    # init audio temp folder for later store temp audio file
    audioTmpFolder = settings.AUDIO_TEMP_FOLDER
    log.info("audioTmpFolder=%s", audioTmpFolder)
    ...

def saveAudioDataToTmpFile(audioBinData):
    """
        save audio binary data into temp file

    :param audioBinData: binary data of audio file
    :return: audio tmp file name
    """
    global gTempAudioFolder
    # log = app.logger

    audioBinDataLen = len(audioBinData)
    log.info("saveAudioDataToTmpFile: audioBinDataLen=%s", audioBinDataLen)
    ...
    return tempFilename

...

def getAzureSpeechToken():
    """get Microsoft Azure speech service token key"""
    # log = app.logger
    global gMsToken
    gMsToken = refreshAzureSpeechToken()

def msTTS(unicodeText,
          voiceName=settings.MS_TTS_VOICE_NAME,
          voiceRate=settings.MS_TTS_VOICE_RATE,
          voiceVolume=settings.MS_TTS_VOICE_VOLUME):
    """call ms azure tts to generate audio(mp3/wav/...) from text"""
    global gMsToken
    # log = app.logger
    log.info("msTTS: unicodeText=%s", unicodeText)
    ...
    return isOk, audioBinData, errNo, errMsg
...

################################################################################
# Global Init
################################################################################

# testAudioSynthesis()
initAudioService()
log.info("TTS init complete")
和Celery相关的:
resources/extensions_celery.py
# from flask_celery import Celery
from conf.app import settings
from celery import Celery
from celery.utils.log import get_task_logger

# celery = Celery()
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
celery_logger = get_task_logger(__name__)
print("in extensions_celery: celery=%s" % celery)
resources/tasks.py
import os
import requests

# from resources.extensions_celery import celery
# print("celery=%s" % celery)
# print("celery.app=%s" % celery.app)
# print("celery.app.log=%s" % celery.app.log)
# print("celery.log=%s" % celery.log)

from conf.app import settings

# try:
#     from flask import g
#     print("tasks: import flask g ok")
#     print("g=%s" % g)
#     log = g.log
#     print("log=%s" % log)
# except RuntimeError as err:
#     # except:
#     print("tasks: failed to import flask g, err=%s" % err)
#     from factory import create_app
#     print("tasks: import create_app ok")
#     app = create_app(settings)
#     print("create_app ok, app=%s" % app)
#     log = app.logger
#     print("log=%s" % log)

# import logging
# log = logging.getLogger(settings.FLASK_APP_NAME)
# log.info("test logging getLogger from flask app, work?")

# print("-----before: from factory import create_celery_app")
# from factory import create_celery_app
# from factory import create_log, create_app
# print("from factory import create_celery_app ok")
# celery = create_celery_app()
# app = create_app(settings)
# celery = create_celery_app(app)
# log = app.logger

from resources.extensions_celery import celery, celery_logger as log
print("create_celery_app return: celery=%s, log=%s" % (celery, log))

#----------------------------------------
# Celery tasks
#----------------------------------------

# @celery.task()
@celery.task
# @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename))

    # log = app.logger
    # with celery.app.app_context():
    #     log = celery.app.logger
    #     app = celery.app

    # log = celery.log
    # print("celery.log=%s" % celery.log)
    # print("log=%s" % log)

    log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename)

    audioTmpFolder = settings.AUDIO_TEMP_FOLDER
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    ...

@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken():
    """celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)

    # with celery.app.app_context():
    # from resources.tts import refreshAzureSpeechToken
    # refreshAzureSpeechToken()

    # global gMsToken
    # log = app.logger
    # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken)
    # log.info("refreshAzureSpeechToken")

    getMsTokenUrl = settings.MS_GET_TOKEN_URL
    reqHeaders = {
        "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
    }
    log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
    resp = requests.post(getMsTokenUrl, headers=reqHeaders)
    log.info("resp=%s", resp)
    respTokenText = resp.text  # eyxxxxiJ9.xxx.xxx
    log.info("respTokenText=%s", respTokenText)
    # gMsToken = respTokenText
    updatedToken = respTokenText
    return updatedToken

@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender))

    # with celery.app.app_context():
    # log = app.logger
    # log = celery.app.logger
    # app = celery.app

    # log = celery.log
    # # print("celery.log=%s" % celery.log)
    # # print("log=%s" % log)
    # print("sender=%s" % sender)

    # log.info("celerySetupPeriodicTasks: sender=%s", sender)
    # print("celerySetupPeriodicTasks: log is usable")

    sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
                             # celeryRefreshAzureSpeechToken.s(),
                             refreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")

转载请注明:在路上 » 【已解决】用蓝图和工厂模式去优化现有Flask项目代码结构

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
89 queries in 0.164 seconds, using 22.22MB memory