Flask中,用RQ去运行后台任务,已有代码:
@rq.job def updateTaskErrandorLocation(): runningTaskIdListKey = genRunningTaskIdListKey() gLog.debug(“runningTaskIdListKey=%s”, runningTaskIdListKey) updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule( timedelta(seconds=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS), repeat=0) gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob) @rq.job def updateUserStatus(): global gOnlineUserIdList # for debug gLog.debug(“gOnlineUserIdList=%s”, gOnlineUserIdList) curDatetime = datetime.now() gLog.debug(“curDatetime=%s”, curDatetime) updateUserStatusJob = updateUserStatus.schedule( timedelta(seconds=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS), repeat=0) gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob) |
即:
每隔几秒就去调度运行rq的job
问题现象是:
哪个rq的job放在前面,哪个job就可以被执行:
[2016-12-19 17:36:17,135 DEBUG BackgroundWorker.py:84 <module>] updateTaskErrandorLocationJob=<Job 48147d2c-51bb-4622-8e66-585089caad89: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> [2016-12-19 17:36:17,137 DEBUG BackgroundWorker.py:148 <module>] updateUserStatusJob=<Job cd35db06-3bf9-4ea2-9650-c26652fe9452: runningfast.resources.BackgroundWorker.updateUserStatus()> [2016-12-19 17:36:17,137 DEBUG BackgroundWorker.py:48 updateTaskErrandorLocation] runningTaskIdListKey=develop|staging|RunningTaskIdList [2016-12-19 17:36:17,139 DEBUG BackgroundWorker.py:50 updateTaskErrandorLocation] type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([]) |
以及:
[2016-12-19 17:32:06,774 DEBUG BackgroundWorker.py:102 <module>] updateUserStatusJob=<Job ad8eff3c-1be4-429d-8a54-d8d5f7f130b2: runningfast.resources.BackgroundWorker.updateUserStatus()> [2016-12-19 17:32:06,775 DEBUG BackgroundWorker.py:145 <module>] updateTaskErrandorLocationJob=<Job 830334e6-199d-4a28-99ce-9aea020b6a88: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> [2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:47 updateUserStatus] gOnlineUserIdList=[] [2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:50 updateUserStatus] curDatetime=2016-12-19 17:32:06.776366 [2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:87 updateUserStatus] after remove offline: gOnlineUserIdList=[] |
但两个job无法同时被执行
所以要去搞清楚:
Flask中的RQ,如何支持同时schedule多个后台的任务
flask rq schedule multiple job
flask rq2 schedule Repeated job
GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)
去试试把:
repeat的值从0改为None
但是好像没有变化。
不过,通过log中看到的:
<div–<—————————————————————————— 20:48:45 default: Job OK (8736f59d-63e3-4854-8409-715055e295a8) 20:48:45 Result is kept for 500 seconds 20:48:45 20:48:45 *** Listening on default… 20:48:45 default: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation() (9b6b8062-a7c5-4db2-897f-8a7bba3d7792) <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:113]: redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:119]: app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516b290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc16b7c210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516b6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ba50> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:141]: API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]: updateTaskErrandorLocationJob=<Job a5df8b7c-1895-4339-a741-b0ac1ce148ea: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]: updateUserStatusJob=<Job 02adbbb3-5081-494b-8ee6-7e7d557ceceb: runningfast.resources.BackgroundWorker.updateUserStatus()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:48]: runningTaskIdListKey=develop|staging|RunningTaskIdList <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:50]: type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([]) <div–<—————————————————————————— 20:48:46 default: Job OK (9b6b8062-a7c5-4db2-897f-8a7bba3d7792) 20:48:46 Result is kept for 500 seconds 20:48:46 20:48:46 *** Listening on default… 20:48:46 default: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation() (ae3973d0-af0c-405d-b5e7-172c9f2f2386) <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:113]: redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:119]: app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516b290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc16b7c210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516b6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ba50> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:141]: API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]: updateTaskErrandorLocationJob=<Job 3d9bc26c-e26f-4b91-9a17-dea5c78a8020: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]: updateUserStatusJob=<Job 13c10fd2-7cec-4dfc-b066-7f5d62fa844c: runningfast.resources.BackgroundWorker.updateUserStatus()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:48]: runningTaskIdListKey=develop|staging|RunningTaskIdList <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:50]: type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([]) <div–<—————————————————————————— 20:48:48 default: Job OK (ae3973d0-af0c-405d-b5e7-172c9f2f2386) 20:48:48 Result is kept for 500 seconds 20:48:48 20:48:48 *** Listening on default… 20:48:48 default: runningfast.resources.BackgroundWorker.updateUserStatus() (49151c04-987b-43f9-9926-01e64e2b6312) <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:113]: redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:119]: app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516c290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc168fb210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516c6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ca50> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [./runningfast/app.py:141]: API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]: updateTaskErrandorLocationJob=<Job 35f043c2-7c25-4dac-841a-f9da2dfe9156: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]: updateUserStatusJob=<Job 7c8531c9-c9f6-4c95-b8bf-fdae0ced6bff: runningfast.resources.BackgroundWorker.updateUserStatus()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:96]: gOnlineUserIdList=[] <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:99]: curDatetime=2016-12-19 20:48:49.635094 <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:136]: after remove offline: gOnlineUserIdList=[] <div–<—————————————————————————— 20:48:49 default: Job OK (49151c04-987b-43f9-9926-01e64e2b6312) 20:48:49 Result is kept for 500 seconds 20:48:49 20:48:49 *** Listening on default… 20:48:49 default: runningfast.resources.BackgroundWorker.updateUserStatus() (e328deb9-e534-43c6-b397-3ad857b19306) |
感觉是:
其实,两个job,都已经在后台运行了:
所以才有:
runningfast.resources.BackgroundWorker.updateUserStatus()
runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()
但是:
updateUserStatus
updateTaskErrandorLocation
两个job的运行间隔时间,都不太对:
不是我自己设置的10秒和5秒,
感觉是每个1秒,3秒左右就执行了
后来看到自己代码是:
updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule(timedelta(seconds=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS), #repeat=0) repeat=None) updateUserStatusJob = updateUserStatus.schedule( timedelta(seconds=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS), #repeat=0) repeat=None) |
好像是:
自己参数传递错了?
应该加上:
GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)
中的interval的
而:
timedelta(xxx)
只是:
在多长时间之后运行,不是间隔的时间。
所以去改为:
updateUserStatusJob = updateUserStatus.schedule( datetime.utcnow(), interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS, #repeat=0) repeat=None) |
但是好像结果还是:
每隔3秒 就执行一次
-》
感觉是:
每次都运行了代码:
updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule( datetime.utcnow(), interval=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS, #repeat=0) repeat=None) gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob) # schedule periodic job to update user status updateUserStatusJob = updateUserStatus.schedule( datetime.utcnow(), interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS, #repeat=0) repeat=None) gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob) |
每次都是重新创建任务
然后等待被调度时就去执行。
-》所以一直都是下一秒或下几秒。
-》所以要去把这段代码,放到别的,初始化的地方。
结果把:
def initUpdateTaskErrandorLocationJob(): # schedule periodic job to update running task errandor location updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule( datetime.utcnow(), interval=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS, #repeat=0) repeat=None) gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob) def initUpdateUserStatusJob(): # schedule periodic job to update user status updateUserStatusJob = updateUserStatus.schedule( datetime.utcnow(), interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS, #repeat=0) repeat=None) gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob) def initBackgroundWorker(): initUpdateUserStatusJob() initUpdateTaskErrandorLocationJob() |
/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/app.py
# from runningfast.resources.BackgroundWorker import * from runningfast.resources.BackgroundWorker import initBackgroundWorker from runningfast.resources.Image import ImageAPI, ImageUploadUrl # Background worker initBackgroundWorker() |
结果第一次初始化没问题:
[2016-12-19 21:32:56 +0000] [7652] [INFO] Booting worker with pid: 7652 <div–<—————————————————————————— DEBUG in app [/root/RunningFast/staging/runningfast/app.py:113]: redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [/root/RunningFast/staging/runningfast/app.py:119]: app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7f1b3b1cabd0>, redis_store=<flask_redis.FlaskRedis object at 0x7f1b3c3690d0>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7f1b3b1cafd0>, sockets=<flask_sockets.Sockets object at 0x7f1b3b164390> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [/root/RunningFast/staging/runningfast/resources/BackgroundWorker.py:155]: updateUserStatusJob=<Job 07d01286-af4e-42b8-bf89-b0e69af5eede: runningfast.resources.BackgroundWorker.updateUserStatus()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in BackgroundWorker [/root/RunningFast/staging/runningfast/resources/BackgroundWorker.py:90]: updateTaskErrandorLocationJob=<Job 5049efa4-a9eb-4cf8-8784-5988e139b9af: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()> <div–<—————————————————————————— <div–<—————————————————————————— DEBUG in app [/root/RunningFast/staging/runningfast/app.py:144]: API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open <div–<—————————————————————————— |
但是后台执行任务时再去调用就出错了:
/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/logs/rq_worker.log
Traceback (most recent call last): File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/worker.py”, line 588, in perform_job rv = job.perform() File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/job.py”, line 498, in perform self._result = self.func(*self.args, **self.kwargs) File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/job.py”, line 206, in func return import_attribute(self.func_name) File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/utils.py”, line 150, in import_attribute module = importlib.import_module(module_name) File “/usr/local/lib/python2.7/importlib/__init__.py”, line 37, in import_module __import__(name) File “./runningfast/resources/BackgroundWorker.py”, line 7, in <module> from runningfast.app import gLog File “./runningfast/app.py”, line 132, in <module> from runningfast.resources.BackgroundWorker import initBackgroundWorker ImportError: cannot import name initBackgroundWorker 21:33:13 Moving job to u’failed’ queue |
flask ImportError: cannot import name
flask rq2 job ImportError: cannot import name
后来参考:
GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)
去加上:
/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/app.py
scheduler = rq.get_scheduler() |
然后再去:
/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/resources/BackgroundWorker.py
curAllJobs = scheduler.get_jobs() gLog.debug(“curAllJobs=%s”, curAllJobs) initBackgroundWorker() |
然后打印出的jobs列表,太多,且无法区分是不是这两个任务:
<div–<—————————————————————————— DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:163]: curAllJobs=[Job(u’ff8f0a71-d93d-473d-bcb2-83459eec3e85′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’05f153d0-ed71-477c-8d55-6f3e0df666b6′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’186dc290-080c-4182-ba82-ee1be2944f5a’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’3853d022-d5f7-4365-b3b4-fe92cd62f46c’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’3bf599b0-4d20-418b-92e7-b6a4f64b675d’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’462cdcf1-a7b7-49b2-89b2-6369999dc664′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’465c41f6-bdbc-4752-ba45-356464f37fbf’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job 。。。。 |
GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)
提到了rq的job的实例和id
但是没找到说明
参考:
GitHub – jezdez/Flask-RQ2: A Flask extension for RQ.
-》
看到是job.id
(创建队列Queue时)说是允许设置job的id
不过queue和job不是等价的?
但是对于此处的
@rq.job
如何设置job的id?
找了半天,看到源码:
rq/job.py at master · nvie/rq · GitHub
# Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None, origin=None, meta=None): “””Creates a new Job instance for the given function, arguments, and keyword arguments. |
好像是可以通过:
@rq.job(id=xxx)
去设置的。
也找到了job的其他的属性(参数)了。
结果出错:
File “./runningfast/resources/BackgroundWorker.py”, line 46, in <module> @rq.job(id=”updateTaskErrandorLocation”) TypeError: job() got an unexpected keyword argument ‘id’ |
-》貌似job的id是不可以设置的。
转载请注明:在路上 » 【未解决】Flask中的RQ不能同时调度运行多个后台任务