flask扩展插件---flask_apscheduler 定时任务框架

1. flask_apscheduler 定时任务框架

flask_apscheduler 是基于ASPcheduler的flask扩展插件。如果你的web服务需要一些定时任务,例如定时发送邮件,或者周期性的做某些检查,你可以单独的写一些任务脚本,问题在于你如何部署。

通常,这些任务脚本的启动和web服务的启动是分开进行的,这显然会增加服务的维护成本。如果使用flask_apscheduler扩展,你可以在启动web服务的同时启动定时或周期性任务。

这只是其中的一个好处,更大的好处是flask_apscheduler提供了晚上的接口,直接与项目结合在一起,你可以通过这些接口查看任务的状态,还可以对这些任务进行管理。

使用pip安装

pip install flask_apscheduler

2. 简单的示例

from datetime import datetime
from flask import Flask
from flask_apscheduler import APScheduler


app = Flask(__name__)
app.config['SCHEDULER_API_ENABLED'] = True

scheduler = APScheduler()
scheduler.init_app(app)


@scheduler.task('cron', id='write_file', second='10')
def job1():
    with open('job', 'a+')as f:
        f.write(str(datetime.now()) + "\n")


@app.route("/")
def index():
    return 'ok'

scheduler.start()

if __name__ == '__main__':
    app.run(host='0.0.0.0')

这是一个非常简单的示例,但基本上已经展示出如何使用,如果对于APScheduler模块还不太了解,请先学习一下该模块。

将SCHEDULER_API_ENABLED设置为True,服务启动后,会自动融合加载flask_apscheduler提供的API接口:

1. /scheduler [GET] > 获取服务基本信息
2.  /scheduler/jobs [POST json job data] > 添加新的任务
3. /scheduler/jobs/<job_id> [GET] > 根据job_id返回任务的详细信息
4. /scheduler/jobs [GET] > 返回所有任务的信息
5. /scheduler/jobs/<job_id> [DELETE] > 删除任务
6. /scheduler/jobs/<job_id> [PATCH json job data] > 更新一个已经存在的任务
7. /scheduler/jobs/<job_id>/pause [POST] > 暂停一个任务并返回任务的信息
8. /scheduler/jobs/<job_id>/resume [POST] > 重新启动一个任务并返回任务信息
9. /scheduler/jobs/<job_id>/run [POST] > 启动一个任务并返回任务的信息

在本示例中,在每分钟的第10秒,会打开文件向里追加当前时间。

如果你对扩展提供的API接口不满意,还可以根据其基础功能制作自己的API接口:

1. scheduler.start()  
2. scheduler.shutdown()
3. scheduler.pause() > 暂停所有任务
4. scheduler.resume() > 开启任务
5. scheduler.add_listener(<callback function>,<event>) 添加监听事件
6. scheduler.remove_listener(<callback function>) 去除监听事件
7. scheduler.add_job(<id>,<function>, **kwargs)  添加job
8. scheduler.remove_job(<id>, **<jobstore>) 删除job
9. scheduler.remove_all_jobs(**<jobstore>) 
10. scheduler.get_job(<id>,**<jobstore>) 获取job信息
11. scheduler.modify_job(<id>,**<jobstore>, **kwargs) 修改job
12. scheduler.pause_job(<id>, **<jobstore>) 暂停job
13. scheduler.resume_job(<id>, **<jobstore>)  恢复job
14. scheduler.run_job(<id>, **<jobstore>) 启动job
15. scheduler.authenticate(<function>)  验证

flask_apscheduler 启动的任务,是以线程方式启动的,这意味着你的任务不能是CPU密集型任务,不然效果会很差的。

3. 蛋疼的多进程部署问题

最开是了解flask_apscheduler 时,觉得它很不错,但很快意识到在多进程部署时会有问题。

flask通常使用gunicorn或者uwsgi进行部署,采用的是多进程方式,使用fork创建出子进程提供web服务。如此一来,就会创建出多个scheduler对象,那么也就会创建出多个任务,这些任务就会重复执行。

下面以gunicorn部署为例,下编写conf.py文件

bind = '0.0.0.0:5000'
workers = 4

worker_connections = 1000
daemon = True
debug = False
proc_name = 'scheduler'
access_log_format = '%(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
pidfile = './gunicorn.pid'
errorlog = './gunicorn.log'
accesslog = "./gunicorn_access.log"

启动命令为

gunicorn -c conf.py app:app

服务启动后,有5个进程,1个master进程,4个worker进程,这4个worker进程里分别有一个task在执行,打开job文件,果然如此

2021-03-20 15:55:10.002159
2021-03-20 15:55:10.002501
2021-03-20 15:55:10.002564
2021-03-20 15:55:10.002873

在同一个时间点,有4条数据写入,这显然不是我们想要的效果,难道使用flask_apscheduler 扩展时就只能使用gevent来单进程部署么。

在网上搜索了一些方案,普遍认可的是通过使用文件锁来保证只创建一个scheduler ,修改一下app.py代码

import atexit, fcntl
from datetime import datetime
from flask import Flask
from flask_apscheduler import APScheduler


app = Flask(__name__)
app.config['SCHEDULER_API_ENABLED'] = True



def job1():
    with open('job', 'a+')as f:
        f.write(str(datetime.now()) + "\n")


@app.route("/")
def index():
    return 'ok'

def create_apscheduler():
    f = open("./aps.lock", "wb")
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        apscheduler = APScheduler()
        apscheduler.init_app(app)
        apscheduler.start()

        #这是一个定时任务,也可以在配置文件configs中进行定时任务配置

        apscheduler.add_job(id="write_file", func=job1, trigger="cron", second=10)

    except:
        pass

    def unlock():
        fcntl.flock(f, fcntl.LOCK_UN)
        f.close()

    atexit.register(unlock)

create_apscheduler()

if __name__ == '__main__':
    app.run(host='0.0.0.0')

fcntl 是linux平台上特有的基础功能,可以保证只有一个进程获得文件的锁,这样就能保证只有一个apscheduler被创建。

单机多进程部署flask 服务并使用flask_apscheduler 可以使用上面的方案。

分布式环境下,可以考虑使用分布式锁,同单机文件锁一样,也可以解决多任务问题。可为了一个小小的定时任务,如此大动干戈的引入分布式锁,似乎有点得不偿失了。除非你真的很需要这个定时框架而且必须和flask结合使用,否则,真没必要把项目搞的那么复杂。

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案