之前在python的flask中用到celery去执行一些异步任务或周期性任务
有些心得,整理如下:
【celery的配置选项的写法:大写还是小写】
通过官网:
可得知:
celery 4.0之后,引入新的配置参数写法:
大部分都从之前的大写变成小写了
且大部分都去掉了CELERY_的前缀,或者统一前缀了
比如:
CELERY_TIMEZONE -》 timezone
不过,鉴于兼容旧版本的考虑,目前还是支持旧的写法的。
但是对于新的项目来说,建议换用新的写法。
详见:
【celery的异步任务】
【坑:调用apply_async时,传递只有一个参数的时候,要注意调用方式】
对于普通的异步任务:
1 2 3 | @celeryApp.task def deleteTmpAudioFile(filename): ... |
调用apply_async时,当被调用函数参数只有一个的时候,要写成:
1 | deleteTmpAudioFile.apply_async((tempFilename,), countdown=10) |
或:
1 | deleteTmpAudioFile.apply_async([tempFilename], countdown=10) |
否则始终无法正常传递参数进去,比如写成:
1 | deleteTmpAudioFile.apply_async((tempFilename), countdown=10) |
就会报错:
TypeError: task args must be a list or tuple
因为去看官网的api解释时,其args参数类型就是写的是tuple:
“Task.apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
Apply tasks asynchronously by sending a message.
Parameters:
* args (Tuple) – The positional arguments to pass on to the task.”
之前一直没搞懂,我argg写成:
(tempFilename)
为何会报错,直到想起来了,去看看:
1 2 3 4 | >>> type (( "abc" )) < class 'str' > >>> type (( "abc" ,)) < class 'tuple' > |
即,对于单个字符串,放在tuple中,具体写法:
- (“abc”):类型是str,字符串
- (“abc”,):类型才是tuple
即:深究后发现,其实算是自己不够了解Python的tuple本身,误把:
(“abc”),以为是tuple类型了
-》但是想要吐槽python语法本身:
为何没有把:
(“abc”)识别为tuple,而只是str类型?
虽然tuple表示元祖,应该有两个元素,但是你既然支持:
(“abc”,)
那么也应该支持:
(“abc”)
才对啊
总之:
此处通过apply_async调用celery的异步task时,如果被调用函数只有一个参数,则根据api文档中apply_async的args类型介绍,应该传递tuple,
而对于python中,单个变量,比如字符串str的”abc”,其tuple应该写成:
(“abc”, )
即:
1 | deleteTmpAudioFile.apply_async(( "abc" ,), countdown=10) |
而不是:
1 | deleteTmpAudioFile.apply_async(( "abc" ), countdown=10) |
否则就会出现:
TypeError: task args must be a list or tuple
的错误。
而另外,通过上述错误,而发现:
apply_async的args,也支持list类型
所以也可以写成:
1 | deleteTmpAudioFile.apply_async([ "abc" ], countdown=10) |
所以也才看到很多官网教程和别人的示例中:
有写成list的:
1 2 | task.apply_async(args=[arg1, arg2], kwargs={ 'kwarg1' : 'x' , 'kwarg2' : 'y’}) result =add.apply_async(args=[10, 10],countdown=3) |
对应的,后来也看到官网教程:
中,对于传递单个参数时的写法是带逗号的tuple:
1 | T.apply_async((arg,), { 'kwarg' : value}) |
详见:
【周期性任务periodic task的工作逻】
是celery的beat,到期后,发送消息给worker,然后worker去执行task
具体表现是:
对于代码中定义好的周期性任务:
1 2 3 4 5 6 7 8 9 10 11 12 | @celeryApp .task def celeryRefreshMsToken(): """celery's task: refreshMsToken""" refreshMsToken() @celeryApp .on_after_configure.connect def celerySetupPeriodicTasks(sender, * * kwargs): log.info( "celerySetupPeriodicTasks: sender=%s" , sender) sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshMsToken.s(), name = "refresh ms Azure token every less than 10 minutes" ) |
来说,是:
先运行了woker
1 | celery worker -A app.celeryApp --loglevel=DEBUG |
再运行了beat
1 | celery beat -A app.celeryApp -s runtime /celerybeat-schedule --loglevel=DEBUG |
然后会发现:
worker的debug输出中可以看到(周期性的)task:
1 2 3 4 5 6 7 8 9 10 11 12 | [tasks] . app.celeryRefreshMsToken . app.deleteTmpAudioFile . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap |

app.celeryRefreshMsToken
而beat启动后输出的是:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | [2018-05-24 10:33:37,733] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x104f3e9e8> celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2018-05-24 10:33:37 Configuration -> . broker -> redis: //localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> runtime/celerybeat-schedule . logfile -> [stderr]@%DEBUG . maxinterval -> 5.00 minutes (300s) [2018-05-24 10:33:37,792: DEBUG/MainProcess] Setting default socket timeout to 30 [2018-05-24 10:33:37,793: INFO/MainProcess] beat: Starting … [2018-05-24 10:33:37,914: DEBUG/MainProcess] Current schedule: <ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds> <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)> [2018-05-24 10:33:37,915: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes [2018-05-24 10:33:37,916: DEBUG/MainProcess] beat: Waking up in 19.93 seconds. |
然后过了(周期设置的是每20秒执行一次)20秒后,则发送消息给woker:

1 2 3 4 | [2018-05-24 10:33:57,853: DEBUG/MainProcess] beat: Synchronizing schedule... [2018-05-24 10:33:57,886: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken) [2018-05-24 10:33:57,907: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->7400d508-c8aa-4bdd-90d9-d5d0cfb6b504 [2018-05-24 10:33:57,911: DEBUG/MainProcess] beat: Waking up in 19.95 seconds. |
然后woker那边就收到了消息,并执行task,去调用函数执行任务了:

1 2 3 4 5 6 | [ 2018 - 05 - 24 10 : 33 : 57 , 910 : INFO / MainProcess] Received task: app.celeryRefreshMsToken[ 7400d508 - c8aa - 4bdd - 90d9 - d5d0cfb6b504] [ 2018 - 05 - 24 10 : 33 : 57 , 913 : DEBUG / MainProcess] TaskPool: Apply <function _fast_trace_task at 0x110654268 > (args:( 'app.celeryRefreshMsToken' , '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504' , { 'lang' : 'py' , 'task' : 'app.celeryRefreshMsToken' , 'id' : '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504' , 'eta' : None , 'expires' : None , 'group' : None , 'retries' : 0 , 'timelimit' : [ None , None ], 'root_id' : '7400d508-c8aa-4bdd-90d9-d5d0cfb6b504' , 'parent_id' : None , 'argsrepr' : '()' , 'kwargsrepr' : '{}' , 'origin' : ' gen30492@licrifandeMacBook - Pro.local ', ' reply_to ': ' e629fa42 - 5446 - 3302 - 8bce - c59fdacca655 ', ' correlation_id ': ' 7400d508 - c8aa - 4bdd - 90d9 - d5d0cfb6b504 ', ' delivery_info ': {' exchange ': ' ', ' routing_key ': ' celery ', ' priority ': 0, ' redelivered ': None}}, b' [[], {}, { "callbacks" : null, "errbacks" : null, "chain" : null, "chord" : null}] ', ' application / json ', ' utf - 8 ') kwargs:{}) [ 2018 - 05 - 24 10 : 33 : 57 , 920 : DEBUG / MainProcess] Task accepted: app.celeryRefreshMsToken[ 7400d508 - c8aa - 4bdd - 90d9 - d5d0cfb6b504] pid: 30484 [ 2018 - 05 - 24 10 : 33 : 57 , 924 : WARNING / ForkPoolWorker - 1 ] [ 2018 - 05 - 24 10 : 33 : 57 , 922 ] INFO in app: refreshMsToken: gMsToken = xxxxx |
如此,再循环进入下一个周期:
- beat倒计时
- 到时之后,触发,发消息给woker
- woker接收到消息
- 执行(周期性)任务
【celery的task的signature】
1 2 3 | sender.add_periodic_task(app.config[ "CELERY_REFRESH_MS_TOKEN_INTERVAL" ], celeryRefreshMsToken.s(), name= "refresh ms Azure token every less than 10 minutes" ) |
中的celeryRefreshMsToken.s()中的s,指的是signature,函数签名
完整写法是:
1 2 3 | celeryRefreshMsToken. signature () |
详见:
转载请注明:在路上 » 【整理】Celery心得+经验+注意事项