最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【整理】Celery心得+经验+注意事项

注意事项 crifan 1114浏览 0评论
之前在python的flask中用到celery去执行一些异步任务或周期性任务
有些心得,整理如下:
【celery的配置选项的写法:大写还是小写】
通过官网:
New lowercase settings – Configuration and defaults — Celery 4.1.0 documentation
可得知:
celery 4.0之后,引入新的配置参数写法:
大部分都从之前的大写变成小写了
且大部分都去掉了CELERY_的前缀,或者统一前缀了
比如:
CELERY_TIMEZONE -》 timezone
不过,鉴于兼容旧版本的考虑,目前还是支持旧的写法的。
但是对于新的项目来说,建议换用新的写法。
详见:
【已解决】Flask中新增Celery周期任务去定期更新Azure的token
【celery的异步任务】
【坑:调用apply_async时,传递只有一个参数的时候,要注意调用方式】
对于普通的异步任务:
@celeryApp.task
def deleteTmpAudioFile(filename):
    ...
调用apply_async时,当被调用函数参数只有一个的时候,要写成:
deleteTmpAudioFile.apply_async((tempFilename,), countdown=10)
或:
deleteTmpAudioFile.apply_async([tempFilename], countdown=10)
否则始终无法正常传递参数进去,比如写成:
deleteTmpAudioFile.apply_async((tempFilename), countdown=10)
就会报错:
TypeError: task args must be a list or tuple
因为去看官网的api解释时,其args参数类型就是写的是tuple:
celery.app.task — Celery 4.1.0 documentation
“Task.apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
Apply tasks asynchronously by sending a message.
Parameters:
* args (Tuple) – The positional arguments to pass on to the task.”
之前一直没搞懂,我argg写成:
(tempFilename)
为何会报错,直到想起来了,去看看:
>>> type(("abc"))
<class 'str'>
>>> type(("abc",))
<class 'tuple'>
即,对于单个字符串,放在tuple中,具体写法:
  • (“abc”):类型是str,字符串
  • (“abc”,):类型才是tuple
即:深究后发现,其实算是自己不够了解Python的tuple本身,误把:
(“abc”),以为是tuple类型了
-》但是想要吐槽python语法本身:
为何没有把:
(“abc”)识别为tuple,而只是str类型?
虽然tuple表示元祖,应该有两个元素,但是你既然支持:
(“abc”,)
那么也应该支持:
(“abc”)
才对啊
总之:
此处通过apply_async调用celery的异步task时,如果被调用函数只有一个参数,则根据api文档中apply_async的args类型介绍,应该传递tuple,
而对于python中,单个变量,比如字符串str的”abc”,其tuple应该写成:
(“abc”, )
即:
deleteTmpAudioFile.apply_async(("abc",), countdown=10)
而不是:
deleteTmpAudioFile.apply_async(("abc"), countdown=10)
否则就会出现:
TypeError: task args must be a list or tuple
的错误。
而另外,通过上述错误,而发现:
apply_async的args,也支持list类型
所以也可以写成:
deleteTmpAudioFile.apply_async(["abc"], countdown=10)
所以也才看到很多官网教程和别人的示例中:
Celery-4.1 用户指南: Calling Tasks – CSDN博客
Executing Tasks — Celery 2.0.3 (stable) documentation
有写成list的:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y’})
result =add.apply_async(args=[10, 10],countdown=3)
对应的,后来也看到官网教程:
Calling Tasks — Celery 4.1.0 documentation
中,对于传递单个参数时的写法是带逗号的tuple:
T.apply_async((arg,), {'kwarg': value})
详见:
【已解决】celery调用task的apply_async传递参数出错:TypeError takes 1 positional argument but 40 were given
【周期性任务periodic task的工作逻】
是celery的beat,到期后,发送消息给worker,然后worker去执行task
具体表现是:
对于代码中定义好的周期性任务:
@celeryApp.task
def celeryRefreshMsToken():
    """celery's task: refreshMsToken"""
    refreshMsToken()

@celeryApp.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: sender=%s", sender)

    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshMsToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
来说,是:
先运行了woker
celery worker -A app.celeryApp --loglevel=DEBUG
再运行了beat
celery beat -A app.celeryApp -s runtime/celerybeat-schedule --loglevel=DEBUG
然后会发现:
worker的debug输出中可以看到(周期性的)task:
[tasks]
  . app.celeryRefreshMsToken
  . app.deleteTmpAudioFile
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
app.celeryRefreshMsToken
 而beat启动后输出的是:
[2018-05-24 10:33:37,733] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x104f3e9e8>
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2018-05-24 10:33:37
Configuration ->
    . broker -> 
redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> runtime/celerybeat-schedule
    . logfile -> [stderr]@%DEBUG
    . maxinterval -> 5.00 minutes (300s)
[2018-05-24 10:33:37,792: DEBUG/MainProcess] Setting default socket timeout to 30
[2018-05-24 10:33:37,793: INFO/MainProcess] beat: Starting
…
[2018-05-24 10:33:37,914: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds>
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>
[2018-05-24 10:33:37,915: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2018-05-24 10:33:37,916: DEBUG/MainProcess] beat: Waking up in 19.93 seconds.
然后过了(周期设置的是每20秒执行一次)20秒后,则发送消息给woker:
[2018-05-24 10:33:57,853: DEBUG/MainProcess] beat: Synchronizing schedule...
[2018-05-24 10:33:57,886: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)
[2018-05-24 10:33:57,907: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->7400d508-c8aa-4bdd-90d9-d5d0cfb6b504
[2018-05-24 10:33:57,911: DEBUG/MainProcess] beat: Waking up in 19.95 seconds.
然后woker那边就收到了消息,并执行task,去调用函数执行任务了:
[2018-05-24 10:33:57,910: INFO/MainProcess] Received task: app.celeryRefreshMsToken[7400d508-c8aa-4bdd-90d9-d5d0cfb6b504]
[2018-05-24 10:33:57,913: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x110654268> (args:('app.celeryRefreshMsToken', '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', {'lang': 'py', 'task': 'app.celeryRefreshMsToken', 'id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': '
[email protected]
', 'reply_to': 'e629fa42-5446-3302-8bce-c59fdacca655', 'correlation_id': '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-05-24 10:33:57,920: DEBUG/MainProcess] Task accepted: app.celeryRefreshMsToken[7400d508-c8aa-4bdd-90d9-d5d0cfb6b504] pid:30484
[2018-05-24 10:33:57,924: WARNING/ForkPoolWorker-1] [2018-05-24 10:33:57,922] INFO in app: refreshMsToken: gMsToken=xxxxx
如此,再循环进入下一个周期:
  • beat倒计时
  • 到时之后,触发,发消息给woker
  • woker接收到消息
  • 执行(周期性)任务
【celery的task的signature】
sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                         celeryRefreshMsToken.s(),
                         name="refresh ms Azure token every less than 10 minutes")
中的celeryRefreshMsToken.s()中的s,指的是signature,函数签名
完整写法是:
celeryRefreshMsToken.
signature
()
详见:
【已解决】Flask中新增Celery周期任务去定期更新Azure的token

转载请注明:在路上 » 【整理】Celery心得+经验+注意事项

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
80 queries in 0.187 seconds, using 22.19MB memory