之前在python的flask中用到celery去执行一些异步任务或周期性任务
有些心得,整理如下:
【celery的配置选项的写法:大写还是小写】
通过官网:
可得知:
celery 4.0之后,引入新的配置参数写法:
大部分都从之前的大写变成小写了
且大部分都去掉了CELERY_的前缀,或者统一前缀了
比如:
CELERY_TIMEZONE -》 timezone
不过,鉴于兼容旧版本的考虑,目前还是支持旧的写法的。
但是对于新的项目来说,建议换用新的写法。
详见:
【celery的异步任务】
【坑:调用apply_async时,传递只有一个参数的时候,要注意调用方式】
对于普通的异步任务:
@celeryApp.task def deleteTmpAudioFile(filename): ...
调用apply_async时,当被调用函数参数只有一个的时候,要写成:
deleteTmpAudioFile.apply_async((tempFilename,), countdown=10)
或:
deleteTmpAudioFile.apply_async([tempFilename], countdown=10)
否则始终无法正常传递参数进去,比如写成:
deleteTmpAudioFile.apply_async((tempFilename), countdown=10)
就会报错:
TypeError: task args must be a list or tuple
因为去看官网的api解释时,其args参数类型就是写的是tuple:
“Task.apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
Apply tasks asynchronously by sending a message.
Parameters:
* args (Tuple) – The positional arguments to pass on to the task.”
之前一直没搞懂,我argg写成:
(tempFilename)
为何会报错,直到想起来了,去看看:
>>> type(("abc")) <class 'str'> >>> type(("abc",)) <class 'tuple'>
即,对于单个字符串,放在tuple中,具体写法:
- (“abc”):类型是str,字符串
- (“abc”,):类型才是tuple
即:深究后发现,其实算是自己不够了解Python的tuple本身,误把:
(“abc”),以为是tuple类型了
-》但是想要吐槽python语法本身:
为何没有把:
(“abc”)识别为tuple,而只是str类型?
虽然tuple表示元祖,应该有两个元素,但是你既然支持:
(“abc”,)
那么也应该支持:
(“abc”)
才对啊
总之:
此处通过apply_async调用celery的异步task时,如果被调用函数只有一个参数,则根据api文档中apply_async的args类型介绍,应该传递tuple,
而对于python中,单个变量,比如字符串str的”abc”,其tuple应该写成:
(“abc”, )
即:
deleteTmpAudioFile.apply_async(("abc",), countdown=10)
而不是:
deleteTmpAudioFile.apply_async(("abc"), countdown=10)
否则就会出现:
TypeError: task args must be a list or tuple
的错误。
而另外,通过上述错误,而发现:
apply_async的args,也支持list类型
所以也可以写成:
deleteTmpAudioFile.apply_async(["abc"], countdown=10)
所以也才看到很多官网教程和别人的示例中:
有写成list的:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y’}) result =add.apply_async(args=[10, 10],countdown=3)
对应的,后来也看到官网教程:
中,对于传递单个参数时的写法是带逗号的tuple:
T.apply_async((arg,), {'kwarg': value})
详见:
【周期性任务periodic task的工作逻】
是celery的beat,到期后,发送消息给worker,然后worker去执行task
具体表现是:
对于代码中定义好的周期性任务:
@celeryApp.task def celeryRefreshMsToken(): """celery's task: refreshMsToken""" refreshMsToken() @celeryApp.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: sender=%s", sender) sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshMsToken.s(), name="refresh ms Azure token every less than 10 minutes")
来说,是:
先运行了woker
celery worker -A app.celeryApp --loglevel=DEBUG
再运行了beat
celery beat -A app.celeryApp -s runtime/celerybeat-schedule --loglevel=DEBUG
然后会发现:
worker的debug输出中可以看到(周期性的)task:
[tasks] . app.celeryRefreshMsToken . app.deleteTmpAudioFile . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap
app.celeryRefreshMsToken
而beat启动后输出的是:
[2018-05-24 10:33:37,733] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x104f3e9e8> celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2018-05-24 10:33:37 Configuration -> . broker -> redis://localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> runtime/celerybeat-schedule . logfile -> [stderr]@%DEBUG . maxinterval -> 5.00 minutes (300s) [2018-05-24 10:33:37,792: DEBUG/MainProcess] Setting default socket timeout to 30 [2018-05-24 10:33:37,793: INFO/MainProcess] beat: Starting … [2018-05-24 10:33:37,914: DEBUG/MainProcess] Current schedule: <ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds> <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)> [2018-05-24 10:33:37,915: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes [2018-05-24 10:33:37,916: DEBUG/MainProcess] beat: Waking up in 19.93 seconds.
然后过了(周期设置的是每20秒执行一次)20秒后,则发送消息给woker:
[2018-05-24 10:33:57,853: DEBUG/MainProcess] beat: Synchronizing schedule... [2018-05-24 10:33:57,886: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken) [2018-05-24 10:33:57,907: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->7400d508-c8aa-4bdd-90d9-d5d0cfb6b504 [2018-05-24 10:33:57,911: DEBUG/MainProcess] beat: Waking up in 19.95 seconds.
然后woker那边就收到了消息,并执行task,去调用函数执行任务了:
[2018-05-24 10:33:57,910: INFO/MainProcess] Received task: app.celeryRefreshMsToken[7400d508-c8aa-4bdd-90d9-d5d0cfb6b504] [2018-05-24 10:33:57,913: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x110654268> (args:('app.celeryRefreshMsToken', '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', {'lang': 'py', 'task': 'app.celeryRefreshMsToken', 'id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': ' [email protected] ', 'reply_to': 'e629fa42-5446-3302-8bce-c59fdacca655', 'correlation_id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{}) [2018-05-24 10:33:57,920: DEBUG/MainProcess] Task accepted: app.celeryRefreshMsToken[7400d508-c8aa-4bdd-90d9-d5d0cfb6b504] pid:30484 [2018-05-24 10:33:57,924: WARNING/ForkPoolWorker-1] [2018-05-24 10:33:57,922] INFO in app: refreshMsToken: gMsToken=xxxxx
如此,再循环进入下一个周期:
- beat倒计时
- 到时之后,触发,发消息给woker
- woker接收到消息
- 执行(周期性)任务
【celery的task的signature】
sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshMsToken.s(), name="refresh ms Azure token every less than 10 minutes")
中的celeryRefreshMsToken.s()中的s,指的是signature,函数签名
完整写法是:
celeryRefreshMsToken. signature ()
详见:
转载请注明:在路上 » 【整理】Celery心得+经验+注意事项