折腾:
期间,先去Mac本地的Flask中用Celery(加redis)实现延期执行任务。
继续参考:
Calling Tasks — Celery 4.1.0 documentation
和api文档:
celery.app.task — Celery 4.1.0 documentation
参考:
在 Flask 中使用 Celery — using celery with flask 1.00 documentation
就可以去写代码实现了。
python celery redis
First Steps with Celery — Celery 4.1.0 documentation
Using Redis — Celery 4.1.0 documentation
去安装
<code>➜ server pipenv install "celery[redis]" Installing celery[redis]… Looking in indexes: https://pypi.python.org/simple Collecting celery[redis] Downloading https://files.pythonhosted.org/packages/22/9b/88ef5cc7edf5d43215f383ae0a2b1cdeb33f5f07886386c7e4691b2eba0c/celery-4.1.0-py2.py3-none-any.whl (400kB) Requirement already satisfied: pytz>dev in /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages (from celery[redis]) (2018.4) Collecting kombu<5.0,>=4.0.2 (from celery[redis]) Downloading https://files.pythonhosted.org/packages/62/a4/5d16954803224a1e451713293c2a028614099f5538cf626e1fdd7b438c86/kombu-4.1.0-py2.py3-none-any.whl (181kB) Collecting billiard<3.6.0,>=3.5.0.2 (from celery[redis]) Downloading https://files.pythonhosted.org/packages/82/55/76f4e786141b7174926cdffa7a155aeea316b729118fb48ec548f3c6754f/billiard-3.5.0.3-py3-none-any.whl (89kB) Collecting redis>=2.10.5; extra == "redis" (from celery[redis]) Downloading https://files.pythonhosted.org/packages/3b/f6/7a76333cf0b9251ecf49efff635015171843d9b977e4ffcf59f9c4428052/redis-2.10.6-py2.py3-none-any.whl (64kB) Collecting amqp<3.0,>=2.1.4 (from kombu<5.0,>=4.0.2->celery[redis]) Downloading https://files.pythonhosted.org/packages/88/4a/8c45a882d842678963516ebd9cf584a4ded51af719234c3b696c2e884c60/amqp-2.2.2-py2.py3-none-any.whl (48kB) Collecting vine>=1.1.3 (from amqp<3.0,>=2.1.4->kombu<5.0,>=4.0.2->celery[redis]) Downloading https://files.pythonhosted.org/packages/10/50/5b1ebe42843c19f35edb15022ecae339fbec6db5b241a7a13c924dabf2a3/vine-1.1.4-py2.py3-none-any.whl Installing collected packages: vine, amqp, kombu, billiard, redis, celery Successfully installed amqp-2.2.2 billiard-3.5.0.3 celery-4.1.0 kombu-4.1.0 redis-2.10.6 vine-1.1.4 Adding celery[redis] to Pipfile's [packages]… Pipfile.lock (b165df) out of date, updating to (132fcc)… Locking [dev-packages] dependencies… Locking [packages] dependencies… Updated Pipfile.lock (132fcc)! Installing dependencies from Pipfile.lock (132fcc)… 🐍 ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 29/29 — 00 ➜ server pipenv graph celery==4.1.0 - billiard [required: >=3.5.0.2,<3.6.0, installed: 3.5.0.3] - kombu [required: <5.0,>=4.0.2, installed: 4.1.0] - amqp [required: >=2.1.4,<3.0, installed: 2.2.2] - vine [required: >=1.1.3, installed: 1.1.4] - pytz [required: >dev, installed: 2018.4] Flask-Cors==3.0.4 - Flask [required: >=0.9, installed: 1.0.2] - click [required: >=5.1, installed: 6.7] - itsdangerous [required: >=0.24, installed: 0.24] - Jinja2 [required: >=2.10, installed: 2.10] - MarkupSafe [required: >=0.23, installed: 1.0] - Werkzeug [required: >=0.14, installed: 0.14.1] - Six [required: Any, installed: 1.11.0] Flask-PyMongo==0.5.1 - Flask [required: >=0.8, installed: 1.0.2] - click [required: >=5.1, installed: 6.7] - itsdangerous [required: >=0.24, installed: 0.24] - Jinja2 [required: >=2.10, installed: 2.10] - MarkupSafe [required: >=0.23, installed: 1.0] - Werkzeug [required: >=0.14, installed: 0.14.1] - PyMongo [required: >=2.5, installed: 3.6.1] Flask-RESTful==0.3.6 - aniso8601 [required: >=0.82, installed: 3.0.0] - Flask [required: >=0.8, installed: 1.0.2] - click [required: >=5.1, installed: 6.7] - itsdangerous [required: >=0.24, installed: 0.24] - Jinja2 [required: >=2.10, installed: 2.10] - MarkupSafe [required: >=0.23, installed: 1.0] - Werkzeug [required: >=0.14, installed: 0.14.1] - pytz [required: Any, installed: 2018.4] - six [required: >=1.3.0, installed: 1.11.0] gunicorn==19.8.1 openpyxl==2.5.3 - et-xmlfile [required: Any, installed: 1.0.1] - jdcal [required: Any, installed: 1.4] PyMySQL==0.8.1 redis==2.10.6 requests==2.18.4 - certifi [required: >=2017.4.17, installed: 2018.4.16] - chardet [required: >=3.0.2,<3.1.0, installed: 3.0.4] - idna [required: >=2.5,<2.7, installed: 2.6] - urllib3 [required: >=1.21.1,<1.23, installed: 1.22] </code>
可以看到:
安装了Celery和Redis
然后去试试
结果出错:
【已解决】运行Celery出错:consumer Cannot connect to redis Error 61
然后继续测试延迟运行任务的效果
先看看官网demo是正常的:
<code>➜ ~ /Users/crifan/dev/dev_root/company/naturling/projects/robotDemo/server ➜ server ll total 176 -rw-r--r-- 1 crifan staff 303B 5 11 17:22 Pipfile -rw-r--r-- 1 crifan staff 12K 5 11 17:24 Pipfile.lock -rw-r--r-- 1 crifan staff 2.3K 5 10 15:22 README.md drwxr-xr-x 3 crifan staff 96B 4 24 15:12 __pycache__ drwxr-xr-x 7 crifan staff 224B 5 10 15:15 ai -rw-r--r-- 1 crifan staff 28K 5 11 16:56 app.py -rw-r--r-- 1 crifan staff 191B 5 11 17:39 celery_task.py -rw-r--r-- 1 crifan staff 1.3K 5 11 14:30 config.py -rw-r--r-- 1 crifan staff 92B 5 11 17:49 dump.rdb -rw-r--r-- 1 crifan staff 2.0K 4 24 16:36 gunicorn_config.py drwxr-xr-x 8 crifan staff 256B 4 26 14:56 logs -rw-r--r-- 1 crifan staff 10K 4 24 17:00 supervisord_local.conf -rw-r--r-- 1 crifan staff 592B 4 24 16:07 supervisord_server.conf drwxr-xr-x 4 crifan staff 128B 5 11 16:52 tmp ➜ server which python /usr/bin/python ➜ server pipenv shell Spawning environment shell (/bin/zsh). Use 'exit' to leave. . /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/activate ➜ server . /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/activate ➜ server python Python 3.6.4 (default, Mar 22 2018, 13:54:22) [GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> from celery_task import add >>> add.delay(4, 5) <AsyncResult: 9c8caf7a-e571-4f74-9049-cf5076b9a6ee> >>> </code>
结果出错:
【已解决】celery调用task的apply_async传递参数出错:TypeError takes 1 positional argument but 40 were given
然后发现是可以延迟10秒后运行的:
其中一个终端:
<code>>>> deleteTmpAudioFile.apply_async(("98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3",), countdown=10) <AsyncResult: bd89951c-2078-49af-b8ea-8c9d8d5b121a> </code>
另外运行celery的终端:
<code>[2018-05-11 18:33:18,769: WARNING/ForkPoolWorker-2] deleteTmpAudioFile: filename=%s [2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 [2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] audioTmpFolder=tmp/audio [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] curFolderAbsPath=/Users/crifan/dev/xx/server [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] audioTmpFolderFullPath=/Users/crifan/dev/xx/robotDemo/server/tmp/audio [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] No need to remove for not exist file /Users/crifan/dev/dev_root/xx/server/tmp/audio/98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 </code>
注:
之前一次测试,是删除了该文件的:
至此,算是从逻辑上和测试代码上,实现了延迟10秒去删除文件。
后续就是整合到Flask代码中了。
【总结】
至此,已经实现了Mac本地中,在Flask中集成了Celery+Reids实现异步任务,去删除临时文件。
步骤是:
安装celery,其中此处用的是redis
<code>pipenv install "celery[redis]" </code>
然后运行celery之前,要先去安装和运行redis:
Mac中安装redis
<code>brew install redis </code>
然后运行redis服务:
<code>redis-server </code>
然后写了个celery的task,保存为:
celery_task.py
<code> # celery_task.py from celery import Celery import os # app = Celery('tasks', broker='redis://localhost//') app = Celery('tasks', broker='redis://localhost') @app.task() def add(x, y): return x + y @app.task() def deleteTmpAudioFile(filename): # audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"] print("deleteTmpAudioFile: filename=%s", filename) audioTmpFolder = "tmp/audio" print("audioTmpFolder=%s" % audioTmpFolder) curFolderAbsPath = os.getcwd() #'/Users/xx/server' print("curFolderAbsPath=%s" % curFolderAbsPath) audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder) print("audioTmpFolderFullPath=%s" % audioTmpFolderFullPath) tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename) #'/Users/crifan/xx/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3' if os.path.isfile(tempAudioFullname): os.remove(tempAudioFullname) print("Ok to delete file %s" % tempAudioFullname) else: print("No need to remove for not exist file %s" % tempAudioFullname) </code>
再去运行celery服务:
<code>celery -A celery_task worker </code>
然后再去调用即可:
此处是python在终端下的调用:
<code>python >>> import celery_task >>> from celery_task import deleteTmpAudioFile >>> deleteTmpAudioFile.apply_async(("98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3",), countdown=10) <AsyncResult: bd89951c-2078-49af-b8ea-8c9d8d5b121a> </code>
然后过了指定延期的10秒后,即可调用到:celery_task中的deleteTmpAudioFile
然后celery的服务终端即可看到log:
<code>[2018-05-11 18:33:18,769: WARNING/ForkPoolWorker-2] deleteTmpAudioFile: filename=%s [2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 [2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] audioTmpFolder=tmp/audio [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] curFolderAbsPath=/Users/crifan/dev/dev_root/xx/server [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] audioTmpFolderFullPath=/Users/crifan/dev/xx/server/tmp/audio [2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] No need to remove for not exist file /Users/crifan/dev/dev_root/xx/tmp/audio/98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 </code>
如此即可。
转载请注明:在路上 » 【已解决】Mac本地用Celery实现延时执行任务