最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】把celery的task集成到Flask的app中

celery crifan 4343浏览 0评论

折腾:

【已解决】Mac本地用Celery实现延时执行任务

后,接着去把本地测试通过的celery的task集成到Flask的app中。

先去防止Flask的app名字和celery的app名字冲突,改名为celeryApp:

celery_task.py

<code># celery_task.py
from celery import Celery
import os

# celeryApp = Celery('tasks', broker='redis://localhost//')
celeryApp = Celery('tasks', broker='redis://localhost')

# @celeryApp.task()
# def add(x, y):
#    return x + y

@celeryApp.task()
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    print("deleteTmpAudioFile: filename=%s", filename)
    audioTmpFolder = "tmp/audio"
    print("audioTmpFolder=%s" % audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/xx/server'
    print("curFolderAbsPath=%s" % curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    print("audioTmpFolderFullPath=%s" % audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/xxxserver/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        print("Ok to delete file %s" % tempAudioFullname)
    else:
        print("No need to remove for not exist file %s" % tempAudioFullname)
</code>

然后想办法合并到Flask的app中

celery in flask

此处celery是4.0了:

<code>➜  server celery --version
4.1.0 (latentcall)
</code>

所以:

ask/flask-celery: Celery integration for Flask (SINCE CELERY 3.0 THIS IS NO LONGER NEEDED)

都不需要了。

miguelgrinberg/flask-celery-example: This repository contains the example code for my blog article Using Celery with Flask.

在 Flask 中使用 Celery — using celery with flask 1.00 documentation

看起来是:

把celery的代码,都合并到Flask的app中了:

就不存在我此处担心的:

考虑如何把flask的app.py和celery_task.py如何合并呢

因为现在存在循环调用

flask的app.py要import celery_task

而celery_task要import flask的app

还不知道如何解决呢

基于 Celery 的后台任务 — Flask 0.10.1 文档

好像还要搞个:make_celery?好麻烦

Celery Background Tasks — Flask 1.0.2 documentation

Celery Based Background Tasks — Flask 0.12.4 documentation

在Celery中使用Flask的上下文

“Celery和Flask一起使用并没有什么不和谐的地方,都可以不用定制的Flask扩展,按照网上随处可见的示例也很简单

然而,稍微上点规模的Flask应用都会使用Factory模式,即只有在创建Flask实例时,才会初始化各种扩展,这样可以动态的修改扩展程序的配置。比如你有一套线上部署的配置和一套本地开发测试的配置,希望通过不同的启动入口,就使用不同的配置。”

里面解释的很复杂,很深入。

暂时觉得没必要这么深入研究。

所以还是尽量简单点吧

抽空再去研究:

Flask的app的factory 工厂模式

Celery+Flask的使用小结(初级)

算了,就是简单的,把celery的代码,合并到flask的app中吧

结果运行出错:

【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type

【总结】

最终是把celery集成到了flask中:

flask的app.py

<code>from flask import Flask
from celery import Celery

app = Flask(__name__)

celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celeryApp.conf.update(app.config)

@celeryApp.task
def deleteTmpAudioFile(filename):
    。。。

# call task
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
</code>

flask的config.py

<code># CELERY_TASK_NAME = "Celery_" + FLASK_APP_NAME
# CELERY_BROKER_URL = "redis://localhost"
CELERY_BROKER_URL = "redis://localhost:6379/0"
# CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result
CELERY_DELETE_TMP_AUDIO_FILE_DELAY = 20
</code>

 

然后在Flask的app运行之前,确保先运行celery的worker

<code>celery worker -A app.celeryApp
</code>

想要看到更新信息,比如tasks的列表,其中包含此处的deleteTmpAudioFile的全名:

app.deleteTmpAudioFile

可以加上debug参数:

<code>celery worker -A app.celeryApp --loglevel=DEBUG
</code>

然后代码运行了

deleteTmpAudioFile.apply_async

后,celery的log会看到:

Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665]  ETA:[2018-05-14 07:14:30.839388+00:00]

过了指定的延期时间后,会收到任务并执行:

<code>TaskPool: Apply &lt;function _fast_trace_task at 0x106ef0b70&gt; (args:('Celery_RobotQA.deleteTmpAudioFile', '9845ee03-35e5-4fd1-b641-f2f77fd47665',  ...
[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count-&gt;16
[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326
...
</code>

即可正常在Flask中执行了。

但是此处貌似有个缺点:

celery放在了flask中,导致每次启动celery的worker时,同时启动了flask的app。。。

不过暂时就这么着吧,有空再优化。

转载请注明:在路上 » 【已解决】把celery的task集成到Flask的app中

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
82 queries in 0.178 seconds, using 22.09MB memory