任务调度(如期)

1 项目概述

1.1 背景介绍及目标

Task 是 web 开发中非常经典的一个场景,我们时常会需要用到一些不需要在当前请求下立刻执行的任务,或者是需要定时去做的一些任务,这些需要一个用来存放任务的队列、以及用来执行任务的工具、以及可能需要定时执行的任务调度器。

任务队列和任务调度一般来说是互相独立的两套系统

  • 任务队列主要是依次执行一系列异步的任务

  • 任务调度是和时间相关的,可能执行的是任务抑或者是系统命令。

任务调度(job scheduling),我们在任务队列的东西都是经过某个动作之后派发出来的,而有的时候我们需要延时某个操作 (比如订阅电子报半个小时之后发送给对方)或者需要在凌晨两点的时候自动清除昨天的统计数据,那么我们就需要一个调度器来做统一管理。

传统的定时操作大概就是 crontab 了,它非常的方便,但是如果定时任务太多的话,不管是写还是改都是一个大麻烦,所以任务调度器一般都封装了一层。

1.2 名词说明

  • job:描述一个任务本身。

  • triggers:触发器, 描述一个任务何时被触发,有按日期、按时间间隔、按 cronjob 描述式三种触发方式

  • jobstores:job 存储

  • executors:执行器

  • schedulers:调度器, 可以看做整个系统的 driver,外部世界通过它来实现任务(Job)的增删改查管理。

1.3 Roadmap

2 需求分析

2.1 功能需求

  • 定时执行任务

2.2 非功能需求

高可用

2.3 调研

2.3.1 轮询固定时间间隔

2.3.2 crontab */N 定时轮询

2.3.3 轮询判断更新时间间隔

2.3.4 APScheduler

APScheduler(Advanced Python Scheduler)是一个轻量级的 Python 定时任务调度框架

环境

APScheduler 组件

  • 触发器(trigger),触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。--- 无状态

  • 作业存储器(job store),存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中

    • 其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,作业存储器不能共享调度器。

  • 执行器(executor),处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器。

  • 调度器(scheduler),配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器

2.3.4.1 APScheduler 组件说明

(1) 调度器

非阻塞(目前使用非阻塞模式)

阻塞

相关代码

这个_main_loop 函数主要是用来判断任务是否到了执行的时间,然后进行执行任务(self._process_jobs(now)

调度器工作流程 (v2.1.2)

(2) 作业存储器

默认使用的是 RAMJobStore(内存),还提供了 SQLAlchemyJobStore、ShelveJobStore、RedisJobStore 和 MongoDBJobStore

ShelveJobStore

获取存储中的任务

(3) 执行器

在实例化 Scheduler 类的时候会创建一组默认线程数为 20 的线程池

这个线程池用来执行真正的任务。

如下面的意思是,时间一到就立马提交给 submit 去调度任务。

(4) 触发器

描述调度任务被触发的条件。不过触发器完全是无状态的。

APScheduler 有三种内建的 trigger:

  • date 触发器 --------- 基于固定时间的调度:作业任务只会执行一次。它表示特定的时间点触发

  • interval 触发器 ----- 周期任务:固定时间间隔触发

  • cron 触发器 --------- Cron 风格的任务的调度:在特定时间周期性地触发,和 Linux crontab 格式兼容。它是功能最强大的触发器

date

interval

cron(推荐)

Linux crontab

2.3.4.2 使用

(1) 使用步骤

APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

(2) 作业处理

获取作业列表

可通过 get_jobs() 获取作业列表

job.__dict__

  • coalesce:当由于某种原因导致某个 job 积攒了好几次没有实际运行(比如说系统挂了 5 分钟后恢复,有一个任务是每分钟跑一次的,按道理说这 5 分钟内本来是“计划”运行 5 次的,但实际没有执行),如果 coalesce 为 True,下次这个 job 被 submit 给 executor 时,只会执行 1 次,也就是最后这次,如果为 False,那么会执行 5 次(不一定,因为还有其他条件,看后面 misfire_grace_time 的解释)

  • max_instance: 就是说同一个 job 同一时间最多有几个实例再跑,比如一个耗时 10 分钟的 job,被指定每分钟运行 1 次,如果我们 max_instance 值为 5,那么在第 6~10 分钟上,新的运行实例不会被执行,因为已经有 5 个实例在跑了

  • misfire_grace_time:设想和上述 coalesce 类似的场景,如果一个 job 本来 14:00 有一次执行,但是由于某种原因没有被调度上,现在 14:01 了,这个 14:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是 1 分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。

添加作业

通过 add_job() 添加作业

2.3.4.3 APScheduler 的持久化存储

MySQL

Redis

Redis 需要 keys 命令,Redis 集群不支持 Keys 命令,故不推荐使用 Redis

2.3.5 rq-scheduler

栗子:https://blog.csdn.net/szj_jojoli/article/details/103125310

2.3.6 折衷分析

  • schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务

  • Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求

  • Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源

考虑到 APScheduler 实现比较简单,使用 APScheduler

3 总体设计

3.1 系统架构

  • 表现层: Butterfly-fe 和 Amis 平台两个用户操作入口

  • 业务层: ruqi 模块负责

  • 数据层:

3.2 模块简介

3.2.1 job store

  • 内存

    • self._jobs_index = {} : id -> (job, timestamp) lookup table

    • self._jobs = [] :存储 (job, timestamp) 的有序列表,按照 next_run_time 和 job_id 排序(升序)

  • Redis(和内存存储方式类似)

    • self.jobs_key:hash 存储所有 jobs

    • self.run_times_key: zset 存储,按照 next_run_time 排序

3.3 设计与折衷

3.3.1 高可用方案

方案一:租约续期 + 实例抢主,在查询任务前进行检查

方案二:实例间无主备之分,在执行时进行抢锁执行,抢不到锁时则不执行

方案一改造成本比较高,以及总任务数是千级别,同时执行任务数是个位数并发,数据库压力可控,故选择方案二

3.4 潜在风险

4 详细设计

4.1 ruqi

4.1.1 交互流程

4.1.1.1 增

4.1.1.2 删

4.1.1.3 改

修改任务

暂停任务

恢复任务

4.1.1.4 查

获取任务列表

获取任务详情

4.1.2 数据库设计

4.1.3 接口形式

/ruqi/add_job

创建任务

POST

参数说明

/ruqi/remove_job

删除任务

POST

参数说明

/ruqi/modify_job

修改任务

POST

参数说明

/ruqi/pause_job

暂停任务

POST

/ruqi/resume_job

POST

/ruqi/get_jobs

GET

参数说明

/ruqi/get_job

GET

/ruqi/get_history

GET

参数说明

/ruqi/status

输出如期当前状态

GET

响应

/ruqi/wakeup

强制唤醒一次调度器

POST

响应

5 传送门

Last updated