flask_apscheduler 是基于ASPcheduler的flask扩展插件。如果你的web服务需要一些定时任务,例如定时发送邮件,或者周期性的做某些检查,你可以单独的写一些任务脚本,问题在于你如何部署。
通常,这些任务脚本的启动和web服务的启动是分开进行的,这显然会增加服务的维护成本。如果使用flask_apscheduler扩展,你可以在启动web服务的同时启动定时或周期性任务。
这只是其中的一个好处,更大的好处是flask_apscheduler提供了晚上的接口,直接与项目结合在一起,你可以通过这些接口查看任务的状态,还可以对这些任务进行管理。
使用pip安装
pip install flask_apscheduler
from datetime import datetime
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
app.config['SCHEDULER_API_ENABLED'] = True
scheduler = APScheduler()
scheduler.init_app(app)
@scheduler.task('cron', id='write_file', second='10')
def job1():
with open('job', 'a+')as f:
f.write(str(datetime.now()) + "\n")
@app.route("/")
def index():
return 'ok'
scheduler.start()
if __name__ == '__main__':
app.run(host='0.0.0.0')
这是一个非常简单的示例,但基本上已经展示出如何使用,如果对于APScheduler模块还不太了解,请先学习一下该模块。
将SCHEDULER_API_ENABLED设置为True,服务启动后,会自动融合加载flask_apscheduler提供的API接口:
1. /scheduler [GET] > 获取服务基本信息
2. /scheduler/jobs [POST json job data] > 添加新的任务
3. /scheduler/jobs/<job_id> [GET] > 根据job_id返回任务的详细信息
4. /scheduler/jobs [GET] > 返回所有任务的信息
5. /scheduler/jobs/<job_id> [DELETE] > 删除任务
6. /scheduler/jobs/<job_id> [PATCH json job data] > 更新一个已经存在的任务
7. /scheduler/jobs/<job_id>/pause [POST] > 暂停一个任务并返回任务的信息
8. /scheduler/jobs/<job_id>/resume [POST] > 重新启动一个任务并返回任务信息
9. /scheduler/jobs/<job_id>/run [POST] > 启动一个任务并返回任务的信息
在本示例中,在每分钟的第10秒,会打开文件向里追加当前时间。
如果你对扩展提供的API接口不满意,还可以根据其基础功能制作自己的API接口:
1. scheduler.start()
2. scheduler.shutdown()
3. scheduler.pause() > 暂停所有任务
4. scheduler.resume() > 开启任务
5. scheduler.add_listener(<callback function>,<event>) 添加监听事件
6. scheduler.remove_listener(<callback function>) 去除监听事件
7. scheduler.add_job(<id>,<function>, **kwargs) 添加job
8. scheduler.remove_job(<id>, **<jobstore>) 删除job
9. scheduler.remove_all_jobs(**<jobstore>)
10. scheduler.get_job(<id>,**<jobstore>) 获取job信息
11. scheduler.modify_job(<id>,**<jobstore>, **kwargs) 修改job
12. scheduler.pause_job(<id>, **<jobstore>) 暂停job
13. scheduler.resume_job(<id>, **<jobstore>) 恢复job
14. scheduler.run_job(<id>, **<jobstore>) 启动job
15. scheduler.authenticate(<function>) 验证
flask_apscheduler 启动的任务,是以线程方式启动的,这意味着你的任务不能是CPU密集型任务,不然效果会很差的。
最开是了解flask_apscheduler 时,觉得它很不错,但很快意识到在多进程部署时会有问题。
flask通常使用gunicorn或者uwsgi进行部署,采用的是多进程方式,使用fork创建出子进程提供web服务。如此一来,就会创建出多个scheduler对象,那么也就会创建出多个任务,这些任务就会重复执行。
下面以gunicorn部署为例,下编写conf.py文件
bind = '0.0.0.0:5000'
workers = 4
worker_connections = 1000
daemon = True
debug = False
proc_name = 'scheduler'
access_log_format = '%(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
pidfile = './gunicorn.pid'
errorlog = './gunicorn.log'
accesslog = "./gunicorn_access.log"
启动命令为
gunicorn -c conf.py app:app
服务启动后,有5个进程,1个master进程,4个worker进程,这4个worker进程里分别有一个task在执行,打开job文件,果然如此
2021-03-20 15:55:10.002159
2021-03-20 15:55:10.002501
2021-03-20 15:55:10.002564
2021-03-20 15:55:10.002873
在同一个时间点,有4条数据写入,这显然不是我们想要的效果,难道使用flask_apscheduler 扩展时就只能使用gevent来单进程部署么。
在网上搜索了一些方案,普遍认可的是通过使用文件锁来保证只创建一个scheduler ,修改一下app.py代码
import atexit, fcntl
from datetime import datetime
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
app.config['SCHEDULER_API_ENABLED'] = True
def job1():
with open('job', 'a+')as f:
f.write(str(datetime.now()) + "\n")
@app.route("/")
def index():
return 'ok'
def create_apscheduler():
f = open("./aps.lock", "wb")
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
apscheduler = APScheduler()
apscheduler.init_app(app)
apscheduler.start()
#这是一个定时任务,也可以在配置文件configs中进行定时任务配置
apscheduler.add_job(id="write_file", func=job1, trigger="cron", second=10)
except:
pass
def unlock():
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
atexit.register(unlock)
create_apscheduler()
if __name__ == '__main__':
app.run(host='0.0.0.0')
fcntl 是linux平台上特有的基础功能,可以保证只有一个进程获得文件的锁,这样就能保证只有一个apscheduler被创建。
单机多进程部署flask 服务并使用flask_apscheduler 可以使用上面的方案。
分布式环境下,可以考虑使用分布式锁,同单机文件锁一样,也可以解决多任务问题。可为了一个小小的定时任务,如此大动干戈的引入分布式锁,似乎有点得不偿失了。除非你真的很需要这个定时框架而且必须和flask结合使用,否则,真没必要把项目搞的那么复杂。
QQ交流群: 211426309