折腾:
后,接着去把本地测试通过的celery的task集成到Flask的app中。
先去防止Flask的app名字和celery的app名字冲突,改名为celeryApp:
celery_task.py
<code># celery_task.py from celery import Celery import os # celeryApp = Celery('tasks', broker='redis://localhost//') celeryApp = Celery('tasks', broker='redis://localhost') # @celeryApp.task() # def add(x, y): # return x + y @celeryApp.task() def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ # audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"] print("deleteTmpAudioFile: filename=%s", filename) audioTmpFolder = "tmp/audio" print("audioTmpFolder=%s" % audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/crifan/xx/server' print("curFolderAbsPath=%s" % curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) print("audioTmpFolderFullPath=%s" % audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/xxxserver/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) print("Ok to delete file %s" % tempAudioFullname) else: print("No need to remove for not exist file %s" % tempAudioFullname) </code>
然后想办法合并到Flask的app中
celery in flask
此处celery是4.0了:
<code>➜ server celery --version 4.1.0 (latentcall) </code>
所以:
ask/flask-celery: Celery integration for Flask (SINCE CELERY 3.0 THIS IS NO LONGER NEEDED)
都不需要了。
在 Flask 中使用 Celery — using celery with flask 1.00 documentation
看起来是:
把celery的代码,都合并到Flask的app中了:
就不存在我此处担心的:
考虑如何把flask的app.py和celery_task.py如何合并呢
因为现在存在循环调用
flask的app.py要import celery_task
而celery_task要import flask的app
还不知道如何解决呢
基于 Celery 的后台任务 — Flask 0.10.1 文档
好像还要搞个:make_celery?好麻烦
Celery Background Tasks — Flask 1.0.2 documentation
Celery Based Background Tasks — Flask 0.12.4 documentation
“Celery和Flask一起使用并没有什么不和谐的地方,都可以不用定制的Flask扩展,按照网上随处可见的示例也很简单
然而,稍微上点规模的Flask应用都会使用Factory模式,即只有在创建Flask实例时,才会初始化各种扩展,这样可以动态的修改扩展程序的配置。比如你有一套线上部署的配置和一套本地开发测试的配置,希望通过不同的启动入口,就使用不同的配置。”
里面解释的很复杂,很深入。
暂时觉得没必要这么深入研究。
所以还是尽量简单点吧
抽空再去研究:
Flask的app的factory 工厂模式
算了,就是简单的,把celery的代码,合并到flask的app中吧
结果运行出错:
【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type
【总结】
最终是把celery集成到了flask中:
flask的app.py
<code>from flask import Flask from celery import Celery app = Flask(__name__) celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celeryApp.conf.update(app.config) @celeryApp.task def deleteTmpAudioFile(filename): 。。。 # call task deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete) </code>
flask的config.py
<code># CELERY_TASK_NAME = "Celery_" + FLASK_APP_NAME # CELERY_BROKER_URL = "redis://localhost" CELERY_BROKER_URL = "redis://localhost:6379/0" # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result CELERY_DELETE_TMP_AUDIO_FILE_DELAY = 20 </code>
然后在Flask的app运行之前,确保先运行celery的worker
<code>celery worker -A app.celeryApp </code>
想要看到更新信息,比如tasks的列表,其中包含此处的deleteTmpAudioFile的全名:
app.deleteTmpAudioFile
可以加上debug参数:
<code>celery worker -A app.celeryApp --loglevel=DEBUG </code>
然后代码运行了
deleteTmpAudioFile.apply_async
后,celery的log会看到:
Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] ETA:[2018-05-14 07:14:30.839388+00:00]
过了指定的延期时间后,会收到任务并执行:
<code>TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:('Celery_RobotQA.deleteTmpAudioFile', '9845ee03-35e5-4fd1-b641-f2f77fd47665', ... [2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16 [2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326 ... </code>
即可正常在Flask中执行了。
但是此处貌似有个缺点:
celery放在了flask中,导致每次启动celery的worker时,同时启动了flask的app。。。
不过暂时就这么着吧,有空再优化。
转载请注明:在路上 » 【已解决】把celery的task集成到Flask的app中