折腾:
【已解决】把微软Azure语言合成TTS集成到Flask本地环境
期间,需要去给已有实现了基本的:
获取ms的azure的token
并通过token去获取tts的语音
的flask中,新增celery的周期性的,定期的task任务,去刷新对应token
因为azure的token的有效期只有10分钟。
celery 周期性 任务
#celery#周期性任务 – Hochikong的Blog
Django Celery定时任务和时间设置 – 杨仕航的博客
分布式任务队列Celery – 微店技术团队 – SegmentFault 思否
Periodic Tasks — Celery 4.1.0 documentation
Configuration and defaults — Celery 4.1.0 documentation
celery.bin.beat — Celery 4.1.0 documentation
First Steps with Celery — Celery 4.1.0 documentation
Configuration and defaults — Celery 4.1.0 documentation
现在有点不清楚:
flask中,配置celery的timezone,到底是:
timezone =xxx
还是
CELERY_TIMEZONE = xxx
flask celery timezone config
Configuration and defaults — Celery 4.1.0 documentation
解释的很清楚:
之前是:
CELERY_TIMEZONE
现在是:
timezone
去确认此处celery版本是:
<code>➜ naturlingRobotDemoServer git:(master) ✗ pipenv graph celery==4.1.0 </code>
且4.0之后,也支持旧的写法:
CELERY_TIMEZONE
而此处,在Flask中的配置文件中,为了:
保证配置定义名字不和已有Flask中其他配置混淆
一般的通俗约定也是配置方面,最好用大写
显得是定义,而不是变量
所以此处还是继续用旧的写法:
<code>CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True </code>
celery beat schedule timezone and execute time · Issue #4177 · celery/celery
celerybeat – celery beat timezone problems – Stack Overflow
python – Celery beat – different time zone per task – Stack Overflow
继续参考:
celery — Distributed processing — Celery 4.1.0 documentation
【总结】
然后用代码:
app.py
<code> from celery import Celery def refreshMsToken(): """refresh microsoft azure token for later call tts api""" global app, log, gMsToken log.info("refreshMsToken: gMsToken=%s", gMsToken) ... def getMsToken(): """get ms azure token""" refreshMsToken() celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celeryApp.conf.update(app.config) log.info("celeryApp=%s", celeryApp) <-------------------------------------- # Celery tasks <-------------------------------------- # @celeryApp.task() @celeryApp.task # @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile") def deleteTmpAudioFile(filename): """ delete tmp audio file from filename eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3 """ global log log.info("deleteTmpAudioFile: filename=%s", filename) ... @celeryApp.task def celeryRefreshMsToken(): """celery's task: refreshMsToken""" refreshMsToken() @celeryApp.on_after_configure.connect def celerySetupPeriodicTasks(sender, **kwargs): log.info("celerySetupPeriodicTasks: sender=%s", sender) sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshMsToken.s(), name="refresh ms Azure token every less than 10 minutes") </code>
config.py
<code> # CELERY_TASK_NAME = "Celery_" + FLASK_APP_NAME # CELERY_BROKER_URL = "redis://localhost" CELERY_BROKER_URL = "redis://localhost:6379/0" # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result CELERY_DELETE_TMP_AUDIO_FILE_DELAY = 60 * 2 # two minutes # for periodical celery task CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True # CELERY_REFRESH_MS_TOKEN_INTERVAL = 60 * 9 # 9 minutes (< 10 minutes) # for debug CELERY_REFRESH_MS_TOKEN_INTERVAL = 20 </code>
去运行:
在分开的终端中运行:
<code>celery worker -A app.celeryApp --loglevel=DEBUG </code>
和:
<code>celery beat -A app.celeryApp --loglevel=DEBUG </code>
对应效果:
然后每隔20秒,beat进程会发送消息给worker:
<code>➜ naturlingRobotDemoServer git:(master) ✗ celery beat -A app.celeryApp --loglevel=DEBUG [2018-05-23 17:52:29,941] INFO in app: app=<Flask 'app'> [2018-05-23 17:52:29,943] INFO in app: api=<flask_restful.Api object at 0x10db60ba8> [2018-05-23 17:52:29,947] INFO in app: celeryApp=<Celery app at 0x10cba09e8> [2018-05-23 17:52:29,948] INFO in app: aiContext=<DialogueManager.Context object at 0x10dadf518> … </code>
[2018-05-23 17:52:30,814] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x10cba09e8>
celery beat v4.1.0 (latentcall) is starting.
__ – … __ – _
LocalTime -> 2018-05-23 17:52:30
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%DEBUG
. maxinterval -> 5.00 minutes (300s)
[2018-05-23 17:52:30,892: DEBUG/MainProcess] Setting default socket timeout to 30
[2018-05-23 17:52:30,893: INFO/MainProcess] beat: Starting…
[2018-05-23 17:52:30,995: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds>
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>
[2018-05-23 17:52:30,995: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2018-05-23 17:52:31,006: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)
[2018-05-23 17:52:31,026: DEBUG/MainProcess] beat: Synchronizing schedule…
[2018-05-23 17:52:31,059: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->dd150a9f-0271-41f7-8163-d97a47660f6c
[2018-05-23 17:52:31,061: DEBUG/MainProcess] beat: Waking up in 19.93 seconds.
[2018-05-23 17:52:50,997: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)
[2018-05-23 17:52:50,998: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->05a3eebf-ae01-44f3-9d8d-86fa422ca9c8
[2018-05-23 17:52:50,998: DEBUG/MainProcess] beat: Waking up in 19.99 seconds.
[2018-05-23 17:53:10,997: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)
[2018-05-23 17:53:10,999: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->ecc0af10-627f-4e6a-a54d-cf4c6c5e8331
[2018-05-23 17:53:10,999: DEBUG/MainProcess] beat: Waking up in 19.99 seconds.
然后worker收到消息后,会去执行任务:
<code>[2018-05-23 18:04:34,279] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x106d559e8> [2018-05-23 18:04:34,392: DEBUG/MainProcess] | Worker: Preparing bootsteps. [2018-05-23 18:04:34,396: DEBUG/MainProcess] | Worker: Building graph... [2018-05-23 18:04:34,397: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer} [2018-05-23 18:04:34,418: DEBUG/MainProcess] | Consumer: Preparing bootsteps. [2018-05-23 18:04:34,418: DEBUG/MainProcess] | Consumer: Building graph... [2018-05-23 18:04:34,441: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Agent, Mingle, Tasks, Control, Gossip, event loop} [email protected] v4.1.0 (latentcall) Darwin-17.5.0-x86_64-i386-64bit-PE 2018-05-23 18:04:34 [config] .> app: app:0x106d559e8 .> transport: redis://localhost:6379/0 .> results: disabled:// .> concurrency: 4 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> celery exchange=celery(direct) key=celery [tasks] . app.celeryRefreshMsToken . app.deleteTmpAudioFile . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap [2018-05-23 18:04:34,504: DEBUG/MainProcess] | Worker: Starting Hub [2018-05-23 18:04:34,506: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:34,506: DEBUG/MainProcess] | Worker: Starting Pool [2018-05-23 18:04:34,789: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:34,791: DEBUG/MainProcess] | Worker: Starting Consumer [2018-05-23 18:04:34,792: DEBUG/MainProcess] | Consumer: Starting Connection [2018-05-23 18:04:34,838: INFO/MainProcess] Connected to redis://localhost:6379/0 [2018-05-23 18:04:34,839: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:34,839: DEBUG/MainProcess] | Consumer: Starting Events [2018-05-23 18:04:34,864: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:34,864: DEBUG/MainProcess] | Consumer: Starting Heart [2018-05-23 18:04:34,870: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:34,870: DEBUG/MainProcess] | Consumer: Starting Mingle [2018-05-23 18:04:34,870: INFO/MainProcess] mingle: searching for neighbors [2018-05-23 18:04:35,904: INFO/MainProcess] mingle: all alone [2018-05-23 18:04:35,904: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:35,905: DEBUG/MainProcess] | Consumer: Starting Tasks [2018-05-23 18:04:35,910: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:35,910: DEBUG/MainProcess] | Consumer: Starting Control [2018-05-23 18:04:35,914: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:35,915: DEBUG/MainProcess] | Consumer: Starting Gossip [2018-05-23 18:04:35,919: DEBUG/MainProcess] ^-- substep ok [2018-05-23 18:04:35,919: DEBUG/MainProcess] | Consumer: Starting event loop [2018-05-23 18:04:35,919: DEBUG/MainProcess] | Worker: Hub.register Pool... [2018-05-23 18:04:35,920: INFO/MainProcess] [email protected] ready. [2018-05-23 18:04:35,920: DEBUG/MainProcess] basic.qos: prefetch_count->16 [2018-05-23 18:04:42,933: INFO/MainProcess] Received task: app.celeryRefreshMsToken[3ab5b8a8-52a7-449f-8f87-871c77b525a1] [2018-05-23 18:04:42,935: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x108649268> (args:('app.celeryRefreshMsToken', '3ab5b8a8-52a7-449f-8f87-871c77b525a1', {'lang': 'py', 'task': 'app.celeryRefreshMsToken', 'id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': '[email protected]', 'reply_to': '3b8e4f1a-e52b-352e-b301-07fe3e100c15', 'correlation_id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{}) [2018-05-23 18:04:42,952: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:42,949] INFO in app: refreshMsToken: gMsToken=eyJxxxzvb8 [2018-05-23 18:04:42,954: DEBUG/MainProcess] Task accepted: app.celeryRefreshMsToken[3ab5b8a8-52a7-449f-8f87-871c77b525a1] pid:28371 [2018-05-23 18:04:42,949: INFO/ForkPoolWorker-1] refreshMsToken: gMsToken=eyxxxb8 [2018-05-23 18:04:42,957: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:42,956] INFO in app: getMsTokenUrl=https://westus.api.cognitive.microsoft.com/sts/v1.0/issueToken, reqHeaders={'Ocp-Apim-Subscription-Key': '2240b6cd8e9c4e87b194647732bef8a2'} [2018-05-23 18:04:42,956: INFO/ForkPoolWorker-1] getMsTokenUrl=https://westus.api.cognitive.microsoft.com/sts/v1.0/issueToken, reqHeaders={'Ocp-Apim-Subscription-Key': '2240b6cd8e9c4e87b194647732bef8a2'} [2018-05-23 18:04:42,973: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): westus.api.cognitive.microsoft.com [2018-05-23 18:04:43,989: DEBUG/ForkPoolWorker-1] https://westus.api.cognitive.microsoft.com:443 "POST /sts/v1.0/issueToken HTTP/1.1" 200 517 [2018-05-23 18:04:43,993: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,993] INFO in app: resp=<Response [200]> [2018-05-23 18:04:43,993: INFO/ForkPoolWorker-1] resp=<Response [200]> [2018-05-23 18:04:43,994: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,994] INFO in app: respTokenText=eyJ0eXAiOiJKV1QiLxxxxxGyQrH0 [2018-05-23 18:04:43,994: INFO/ForkPoolWorker-1] respTokenText=exxxxH0 [2018-05-23 18:04:43,995: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,995] INFO in app: after refresh: gMsToken=eyJxxxrH0 [2018-05-23 18:04:43,995: INFO/ForkPoolWorker-1] after refresh: gMsToken=eyJxxx0 </code>
【后记1】
此处发现在当前文件夹中生成了文件:
celerybeat-schedule
需要去想办法,放到另外的,单独的文件夹中
celerybeat-schedule location
celerybeat-schedule file location
Periodic Tasks — Celery 4.1.0 documentation
“The default scheduler (storing the schedule in the celerybeat-schedule file) will automatically detect that the time zone has changed
Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule”
所以是加上-s参数,指定路径即可。
然后运行命令就换成:
<code>celery beat -A app.celeryApp -s runtime/celerybeat-schedule --loglevel=DEBUG </code>
效果:
<code>[2018-05-24 10:33:37,733] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x104f3e9e8> celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2018-05-24 10:33:37 Configuration -> . broker -> redis://localhost:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> runtime/celerybeat-schedule . logfile -> [stderr]@%DEBUG . maxinterval -> 5.00 minutes (300s) [2018-05-24 10:33:37,792: DEBUG/MainProcess] Setting default socket timeout to 30 [2018-05-24 10:33:37,793: INFO/MainProcess] beat: Starting... [2018-05-24 10:33:37,914: DEBUG/MainProcess] Current schedule: <ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds> <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)> [2018-05-24 10:33:37,915: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes [2018-05-24 10:33:37,916: DEBUG/MainProcess] beat: Waking up in 19.93 seconds. [2018-05-24 10:33:57,853: DEBUG/MainProcess] beat: Synchronizing schedule... [2018-05-24 10:33:57,886: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken) [2018-05-24 10:33:57,907: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->7400d508-c8aa-4bdd-90d9-d5d0cfb6b504 [2018-05-24 10:33:57,911: DEBUG/MainProcess] beat: Waking up in 19.95 seconds. </code>
其中有显示:
db -> runtime/celerybeat-schedule
然后对应的文件也放在了runtime文件夹中了:
【后记2】
后来在:
Calling Tasks — Celery 4.1.0 documentation
看到了:
“There’s another way…
You’ll learn more about this later while reading about the Canvas, but signature‘s are objects used to pass around the signature of a task invocation, (for example to send it over the network), and they also support the Calling API:
task.s(arg1, arg2, kwarg1=’x’, kwargs2=’y’).apply_async()”
->
celery — Distributed processing — Celery 4.1.0 documentation
“class celery.Signature(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)[source]
Task Signature.”
才知道,原来前面的:
<code> sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"], celeryRefreshMsToken.s(), name="refresh ms Azure token every less than 10 minutes") </code>
中的:
<code>celeryRefreshMsToken.s() </code>
可以写成:
<code>celeryRefreshMsToken.signature() </code>
是表示函数签名的意思
然后此处等价于:
<code>celeryRefreshMsToken.apply_async() </code>