最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】Flask中如何用工厂模式初始化Celery

Flask crifan 1669浏览 0评论
折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)
 
#----------------------------------------
# Celery tasks
#----------------------------------------
 
# @celeryApp.task()
@celeryApp.task
# @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    global log
 
    log.info("deleteTmpAudioFile: filename=%s", filename)
 
    audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)
 
# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celeryApp.tasks=%s", celeryApp.tasks)
 
@celeryApp.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    refreshAzureSpeechToken()
 
@celeryApp.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: sender=%s", sender)
 
    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
zenyui/celery-flask-factory: Implementing Celery within a Flask application factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern
1
2
3
4
5
6
7
8
9
10
from celery import Celery
from config import config, Config
 
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
 
def create_app(config_name):
    # ...
    celery.conf.update(app.config)
    # ...
    return app
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
Using Celery with Flask Factories
Flask with create_app, SQLAlchemy and Celery – Stack Overflow
python – Celery/Flask Receiving unregistered task of type (App Factory + Blueprints) – Stack Overflow
python – Celery using default broker instead of reddis. Flask + Celery + Factory pattern – Stack Overflow
python – Flask and Celery large application structure – Stack Overflow
在Celery中使用Flask的上下文
https://jiayi.space/post/zai-celeryzhong-shi-yong-flaskde-shang-xia-wen
在 Flask 项目中使用 Celery – 李林克斯
目前写成:
resources/celery_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
from app import app, celery, log
 
from resources.tts import refreshAzureSpeechToken
 
 
#----------------------------------------
# Celery tasks
#----------------------------------------
 
# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    log.info("deleteTmpAudioFile: filename=%s", filename)
 
    audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)
 
 
@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    refreshAzureSpeechToken()
 
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: sender=%s", sender)
 
    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
 
 
# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celery.tasks=%s", celery.tasks)
app.py
1
2
3
4
from factory import celery
 
app = create_app(settings)
log.debug("celery=%s", celery)
factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from celery import Celery
 
 
celery = Celery()
print("celery=%" % celery)
 
 
def create_app(config_object):
    app = Flask(__name__)
    CORS(app)
 
    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')
 
    app.config.from_object(config_object)
 
    create_extensions(app)
 
 
def create_extensions(app):
    global log, mongo, fsCollection, api, celery
 
    log = create_log(app)
    ...
    celery = create_celery(app)
    log.info("celery=%s", celery)
 
    return app
待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
# from app import celery
# from app import app, log
# from flask import current_app as app
# from factory import celery
from flask import g
from resources.tts import refreshAzureSpeechToken
 
app = g.app
log = g.log
celery = g.celery
 
#----------------------------------------
# Celery tasks
#----------------------------------------
 
# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # log = app.logger
    with celery.app.app_context():
        log.info("deleteTmpAudioFile: filename=%s", filename)
 
        audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
        # audioTmpFolder = "tmp/audio"
        ...
 
@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    with celery.app.app_context():
        refreshAzureSpeechToken()
 
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    with celery.app.app_context():
        # log = app.logger
        log.info("celerySetupPeriodicTasks: sender=%s", sender)
 
        sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                                 celeryRefreshAzureSpeechToken.s(),
                                 name="refresh ms Azure token every less than 10 minutes")
而app.py是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from conf.app import settings
 
from factory import create_app
 
app = create_app(settings)
# register_extensions(app)
log = app.logger
 
if __name__ == "__main__":
    app.run(
        host=app.config["FLASK_HOST"],
        port=app.config["FLASK_PORT"],
        debug=app.config["DEBUG"],
        use_reloader=False
    )
factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
from flask import Flask
...
from celery import Celery
 
from flask import g
 
################################################################################
# Global Function
################################################################################
 
def create_app(config_object):
    # global log
 
    # app = Flask(__name__) #<Flask 'factory'>
    app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
    CORS(app)
 
    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')
 
    app.config.from_object(config_object)
    with app.app_context():
        g.app = app
 
        log = create_log(app)
        g.log = log
 
        ...
 
        register_extensions(app)
 
    return app
 
 
def register_extensions(app):
    # global log, mongo, fsCollection, api, celery
 
    log = g.log
...
    celery = create_celery(app)
    g.celery = celery
    log.info("celery=%s", celery)
...
    return app
 
...
 
 
def create_celery(app):
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
 
    TaskBase = celery.Task
 
    class ContextTask(TaskBase):
        abstract = True
 
        def __call__(self, *args, **kwargs):
            with app.app_context():
                g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                return TaskBase.__call__(self, *args, **kwargs)
 
    celery.Task = ContextTask
 
    return celery
...
目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
1
2
# from flask_celery import Celery
from celery import Celery
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
1
2
3
# for Flask-Celery-Helper
# CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', )
# CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', )
-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
➜  xxxRobotDemoServer git:(master) ✗ tree .       
...
├── app.py
...
├── conf
│   ├── __init__.py
│   ├── __pycache__
│   │   └── __init__.cpython-36.pyc
│   ├── app
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-36.pyc
│   │   │   └── settings.cpython-36.pyc
│   │   ├── development
│   │   │   └── __init__.py
│   │   ├── production
│   │   │   └── __init__.py
│   │   └── settings.py
...
├── factory.py
├── resources
│   ├── __init__.py
│   ├── asr.py
│   ├── extensions_celery.py
│   ├── files.py
│   ├── qa.py
│   ├── tasks.py
│   └── tts.py
...
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
1
2
3
4
5
6
7
8
9
# from flask_celery import Celery
from conf.app import settings
from celery import Celery
from celery.utils.log import get_task_logger
 
# celery = Celery()
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
celery_logger = get_task_logger(__name__)
print("in extensions_celery: celery=%s" % celery)
(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
  • 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
  • log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
1
from resources.tasks import deleteTmpAudioFile
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
  • 把app.config[“xxx”]  改为settings.xxx了
    • -》而单独引入settings不会产生循环导入
  • 用另外的办法去获得log
    • 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
 
# from resources.extensions_celery import celery
# print("celery=%s" % celery)
# print("celery.app=%s" % celery.app)
# print("celery.app.log=%s" % celery.app.log)
# print("celery.log=%s" % celery.log)
 
from conf.app import settings
 
# try:
#     from flask import g
#     print("tasks: import flask g ok")
#     print("g=%s" % g)
#     log = g.log
#     print("log=%s" % log)
# except RuntimeError as err:
#     # except:
#     print("tasks: failed to import flask g, err=%s" % err)
#     from factory import create_app
#     print("tasks: import create_app ok")
#     app = create_app(settings)
#     print("create_app ok, app=%s" % app)
#     log = app.logger
#     print("log=%s" % log)
 
# import logging
# log = logging.getLogger(settings.FLASK_APP_NAME)
# log.info("test logging getLogger from flask app, work?")
 
# print("-----before: from factory import create_celery_app")
# from factory import create_celery_app
# from factory import create_log, create_app
# print("from factory import create_celery_app ok")
# celery = create_celery_app()
# app = create_app(settings)
# celery = create_celery_app(app)
# log = app.logger
 
from resources.extensions_celery import celery, celery_logger as log
print("create_celery_app return: celery=%s, log=%s" % (celery, log))
 
#----------------------------------------
# Celery tasks
#----------------------------------------
 
# @celery.task()
@celery.task
# @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename))
 
    # log = app.logger
    # with celery.app.app_context():
    #     log = celery.app.logger
    #     app = celery.app
 
    # log = celery.log
    # print("celery.log=%s" % celery.log)
    # print("log=%s" % log)
 
    log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename)
 
    audioTmpFolder = settings.AUDIO_TEMP_FOLDER
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)
 
 
@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)
 
    # with celery.app.app_context():
    from resources.tts import refreshAzureSpeechToken
    refreshAzureSpeechToken()
 
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender))
 
    # with celery.app.app_context():
    # log = app.logger
    # log = celery.app.logger
    # app = celery.app
 
    # log = celery.log
    # # print("celery.log=%s" % celery.log)
    # # print("log=%s" % log)
    # print("sender=%s" % sender)
 
    # log.info("celerySetupPeriodicTasks: sender=%s", sender)
    # print("celerySetupPeriodicTasks: log is usable")
 
    sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
from flask import Flask
import logging
from logging.handlers import RotatingFileHandler
# from flask_pymongo import PyMongo
from gridfs import GridFS
from pymongo import MongoClient
from flask_restful import Api
from flask_cors import CORS
 
from conf.app import settings
 
from celery import Celery
# from flask_celery import Celery
# from  resources.extensions_celery import celery
 
from flask import g
 
################################################################################
# Global Variables
################################################################################
# # log = logging.getLogger() #<RootLogger root (WARNING)>
# log = None
# print("log=%s" % log)
#
# # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)
# mongo = None
# print("mongo=%s" % mongo)
# fsCollection = None #None
# print("fsCollection=%s" % fsCollection)
#
# celery = Celery() #<Celery __main__ at 0x1068d8b38>
# celery = None
# print("celery=%s" % celery)
 
################################################################################
# Global Function
################################################################################
 
def create_app(config_object, init_extensions=True):
    # global log
 
    # app = Flask(__name__) #<Flask 'factory'>
    app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
    CORS(app)
 
    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')
 
    app.config.from_object(config_object)
    with app.app_context():
        g.app = app
 
        log = create_log(app)
        g.log = log
 
        log.debug("after load from object: app.config=%s", app.config)
        log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s',
                  app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"])
 
        if init_extensions:
            register_extensions(app)
            log.info("flask app extensions init completed")
 
    return app
 
 
def register_extensions(app):
    # global log, mongo, fsCollection, api, celery
 
    log = g.log
 
    mongo = create_mongo(app)
    g.mongo = mongo
    log.info("mongo=%s", mongo)
    mongoServerInfo = mongo.server_info()
    log.debug("mongoServerInfo=%s", mongoServerInfo)
 
    fsCollection = create_gridfs_fs_collection(mongo)
    g.fsCollection = fsCollection
    log.info("fsCollection=%s", fsCollection)
 
    celery = create_celery_app(app)
    g.celery = celery
    log.info("celery=%s", celery)
 
    # api = Api(app)
    api = create_rest_api(app)
    log.debug("api=%s", api)
    g.api = api
 
    return app
 
 
def create_rest_api(app):
    from resources.qa import RobotQaAPI
    from resources.asr import RobotAsrAPI
    from resources.files import GridfsAPI, TmpAudioAPI
 
    rest_api = Api()
    rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa')
    rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr')
    rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs')
    rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio')
 
    rest_api.init_app(app)
    return rest_api
 
 
def create_log(app):
    print("create_log: before init log: app.logger=%s" % app.logger)
    logFormatterStr = app.config["LOG_FORMAT"]
    logFormatter = logging.Formatter(logFormatterStr)
 
    fileHandler = RotatingFileHandler(
        app.config['LOG_FILE_FILENAME'],
        maxBytes=app.config["LOG_FILE_MAX_BYTES"],
        backupCount=app.config["LOG_FILE_BACKUP_COUNT"],
        encoding="UTF-8")
    fileHandler.setLevel(logging.DEBUG)
    fileHandler.setFormatter(logFormatter)
    app.logger.addHandler(fileHandler)
    # Note: should NOT set StreamHandler here, otherwise will duplicate debug log
    app.logger.setLevel(logging.DEBUG)  # set root log level
 
    log = app.logger
    log.info("app=%s", app)
    # log.debug("app.config=%s", app.config)
    print("create_log: after init log: app.logger=%s" % app.logger)
 
    return log
 
 
def create_mongo(app):
    # mongo_client = MongoClient(
    #     host=app.config["MONGODB_HOST"],
    #     port=app.config["MONGODB_PORT"],
    #     username=app.config["MONGODB_USERNAME"],
    #     password=app.config["MONGODB_PASSWORD"],
    #     authSource=app.config["MONGODB_AUTH_SOURCE"]
    # )
 
    if settings.MONGODB_AUTH_SOURCE:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
            authSource=settings.MONGODB_AUTH_SOURCE
        )
    elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
        )
    elif settings.MONGODB_PORT:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
        )
    elif settings.MONGODB_HOST:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
        )
    else:
        mongo_client = MongoClient()
 
 
    return mongo_client
 
 
def create_gridfs_fs_collection(mongo_db):
    # Pure PyMongo
    gridfs_db = mongo_db.gridfs  # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs')
    gridfs_fs_collection = GridFS(gridfs_db)  # <gridfs.GridFS object at 0x1107b2390>
    return gridfs_fs_collection
 
 
def create_celery_app(app=None):
    print("create_celery_app: app=%s" % app)
 
    app = app or create_app(settings, init_extensions=False)
    app_import_name = app.import_name
    # app_name = app.name
    # celery_app_name = app_name
    celery_app_name = app_import_name
    celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
 
    # celery.log = app.logger
 
    TaskBase = celery.Task
 
    class ContextTask(TaskBase):
        abstract = True
 
        def __call__(self, *args, **kwargs):
            with app.app_context():
                # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                return TaskBase.__call__(self, *args, **kwargs)
 
    celery.Task = ContextTask
 
 
    # celery.init_app(app)
 
    print("init celery ok")
 
    return celery
(4)app.py
app中只是调用create_app
而不去初始化celery了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os
 
from conf.app import settings
from factory import create_app
# from factory import log
# from factory import create_celery_app
 
 
################################################################################
# Global Definitions
################################################################################
 
################################################################################
# Global Variables
################################################################################
 
 
################################################################################
# Global Function
################################################################################
 
 
################################################################################
# Global Init App
################################################################################
 
print("in flask app: settings=%s" % (settings))
app = create_app(settings)
app.app_context().push()
# register_extensions(app)
log = app.logger
log.debug("app=%s", app)
 
log.debug("log=%s", log)
log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV)
log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s",
          settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST)
 
# celery = None
# with app.app_context():
# celery = create_celery_app(app)
# print("celery=%s" % celery)
 
if __name__ == "__main__":
    app.run(
        host=settings.FLASK_HOST,
        port=settings.FLASK_PORT,
        debug=settings.DEBUG,
        use_reloader=False
    )
(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
from flask import g
from resources.tts import gTempAudioFolder
 
log = g.log
mongo = g.mongo
fsCollection = g.fsCollection
 
class GridfsAPI(Resource):
 
    def get(self, fileId, fileName=None):
        # log = app.logger
        log.info("fileId=%s, file_name=%s", fileId, fileName)
        ...
resources/qa.py
1
2
3
4
5
from flask import g
 
app = g.app
log = g.log
fsCollection = g.fsCollection
resources/tts.py
1
2
3
4
from flask import g
 
app = g.app
log = g.log
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:
接着再去Mac的终端中去运行celery的worker
1
celery worker -A resources.tasks.celery --loglevel=DEBUG
然后就可以有正常输出了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
➜  xxxRobotDemoServer git:(master) ✗ celery worker -A resources.tasks.celery --loglevel=DEBUG
cur_flask_environ=None
FLASK_ENV=development
cur_dir=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app
env_folder=development
dotenv_path=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app/development/.env
dotenv_load_ok=True
After  load .env: DEBUG=True, MONGODB_HOST=xxx, FILE_URL_HOST=127.0.0.1
in extensions_celery: celery=<Celery RobotQA at 0x104277e80>
create_celery_app return: celery=<Celery RobotQA at 0x104277e80>, log=<Logger resources.extensions_celery (WARNING)>
[2018-08-26 11:11:45,796: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-08-26 11:11:45,801: DEBUG/MainProcess] | Worker: Building graph...
[2018-08-26 11:11:45,802: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer}
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Building graph...
[2018-08-26 11:11:45,876: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop}
 
 
celery@licrifandeMacBook-Pro.local
 v4.2.1 (windowlicker)
 
Darwin-17.7.0-x86_64-i386-64bit-PE 2018-08-26 11:11:45
 
[config]
.> app:         RobotQA:0x104277e80
.> transport:   
redis://localhost:6379/0
.> results:     disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
 
[queues]
.> celery           exchange=celery(direct) key=celery
 
 
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . resources.tasks.celeryRefreshAzureSpeechToken
  . resources.tasks.deleteTmpAudioFile
 
[2018-08-26 11:11:45,921: DEBUG/MainProcess] | Worker: Starting Hub
[2018-08-26 11:11:45,922: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:45,922: DEBUG/MainProcess] | Worker: Starting Pool
[2018-08-26 11:11:46,059: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,060: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-08-26 11:11:46,061: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-08-26 11:11:46,098: INFO/MainProcess] Connected to
redis://localhost:6379/0
[2018-08-26 11:11:46,098: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,098: DEBUG/MainProcess] | Consumer: Starting Events
[2018-08-26 11:11:46,114: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,114: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-08-26 11:11:46,117: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,119: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-08-26 11:11:46,119: INFO/MainProcess] mingle: searching for neighbors
[2018-08-26 11:11:47,155: INFO/MainProcess] mingle: all alone
[2018-08-26 11:11:47,156: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,156: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-08-26 11:11:47,161: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,161: DEBUG/MainProcess] | Consumer: Starting Control
[2018-08-26 11:11:47,167: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,167: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-08-26 11:11:47,172: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,172: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-08-26 11:11:47,173: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2018-08-26 11:11:47,173: INFO/MainProcess]
celery@licrifandeMacBook-Pro.local
 ready.
[2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
其中的task是:
1
2
3
4
5
6
7
8
9
10
11
12
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . resources.tasks.celeryRefreshAzureSpeechToken
  . resources.tasks.deleteTmpAudioFile
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:28,913: INFO/MainProcess] Received task: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]  ETA:[2018-08-26 03:14:38.521681+00:00]
[2018-08-26 11:14:28,915: DEBUG/MainProcess] basic.qos: prefetch_count->17
[2018-08-26 11:14:38,527: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840> (args:('resources.tasks.deleteTmpAudioFile', '6ea6c103-7b58-4d38-ad54-06666b0ebb59', {'lang': 'py', 'task': 'resources.tasks.deleteTmpAudioFile', 'id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'shadow': None, 'eta': '2018-08-26T03:14:38.521681+00:00', 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'parent_id': None, 'argsrepr': "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']", 'kwargsrepr': '{}', 'origin': '
gen47559@licrifandeMacBook-Pro.local
', 'reply_to': '10c57fe8-23c6-3e20-bea6-0576fa07fe57', 'correlation_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[["42667515-281a-4cab-bfac-5d2c23c25a34.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-08-26 11:14:38,564: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:38,626: DEBUG/MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] pid:47617
[2018-08-26 11:14:38,628: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: deleteTmpAudioFile: celery=<Celery RobotQA at 0x104277e80>, filename=42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,632: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolder=tmp/audio
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: curFolderAbsPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio
[2018-08-26 11:14:38,635: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: Ok to delete file /Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio/42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,638: INFO/ForkPoolWorker-2] Task resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] succeeded in 0.012065329006873071s: None
加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
    raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken():
    """celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)
 
    # with celery.app.app_context():
    # from resources.tts import refreshAzureSpeechToken
    # refreshAzureSpeechToken()
 
    # global gMsToken
    # log = app.logger
    # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken)
    # log.info("refreshAzureSpeechToken")
 
    getMsTokenUrl = settings.MS_GET_TOKEN_URL
    reqHeaders = {
        "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
    }
    log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
    resp = requests.post(getMsTokenUrl, headers=reqHeaders)
    log.info("resp=%s", resp)
    respTokenText = resp.text  # eyxxxxiJ9.xxx.xxx
    log.info("respTokenText=%s", respTokenText)
    # gMsToken = respTokenText
    updatedToken = respTokenText
 
    # # for debug
    # gMsToken = "eyJ0eXAiOxxxxnez"
 
    # log.info("after refresh: gMsToken=%s", gMsToken)
 
    return updatedToken
避免了之前的:
1
2
from resources.tts import refreshAzureSpeechToken
refreshAzureSpeechToken()
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
resources/tts.py
中的,对于:
1
from common.util import generateUUID
必要要加上导入sys的path:
1
2
3
4
5
import sys
resouces_dir = os.path.dirname(__file__)
project_root_dir = os.path.abspath(os.path.join(resouces_dir, ".."))
if project_root_dir not in sys.path:
    sys.path.append(project_root_dir)
否则会出现无法导入
以及tts内的token的代码也去更新了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from resources.tasks import refreshAzureSpeechToken
 
 
def getAzureSpeechToken():
    """get Microsoft Azure speech service token key"""
    # log = app.logger
    global gMsToken
    gMsToken = refreshAzureSpeechToken()
...
    else:
        # if errNo == BAIDU_ERR_TOKEN_INVALID:
        if errNo == settings.MS_ERR_UNAUTHORIZED:
            log.warning("Token invalid -> retry one for refresh token")
            # refreshBaiduToken()
            gMsToken = refreshAzureSpeechToken()
 
            # isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText)
            isOk, audioBinData, errNo, errMsg = msTTS(unicodeText)
心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# extensions.py
from flask_celery import Celery
celery = Celery()
 
# application.py
from flask import Flask
from extensions import celery
 
def create_app():
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['CELERY_BROKER_URL'] = 'redis://localhost'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
    celery.init_app(app)
    return app
或:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import Celery
from flask import Flask
 
from app.config import BaseConfig
 
celery = Celery(__name__, broker=BaseConfig.CELERY_BROKER_URL)
 
 
def create_app():
    app = Flask(__name__)
    # ....
    celery.conf.update(app.config)    # 更新 celery 的配置
    # ...
    return app
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
1
2
3
4
5
CELERY_BROKER_URL = "redis://localhost:6379/0"
# CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result
# for periodical celery task
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
1
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
1
celery worker -A resources.tasks.celery --loglevel=DEBUG
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
1
2
pipenv shell
celery worker -A resources.tasks.celery --loglevel=DEBUG
和:
1
2
pipenv shell
celery beat -A resources.tasks.celery -s runtime/celerybeat-schedule --loglevel=DEBUG
终于可以正常执行celery的异步函数和定期任务了。

转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
82 queries in 0.824 seconds, using 22.27MB memory