任务调度(如期)

1 项目概述

1.1 背景介绍及目标

Task 是 web 开发中非常经典的一个场景,我们时常会需要用到一些不需要在当前请求下立刻执行的任务,或者是需要定时去做的一些任务,这些需要一个用来存放任务的队列、以及用来执行任务的工具、以及可能需要定时执行的任务调度器。

任务队列和任务调度一般来说是互相独立的两套系统

  • 任务队列主要是依次执行一系列异步的任务

  • 任务调度是和时间相关的,可能执行的是任务抑或者是系统命令。

任务调度(job scheduling),我们在任务队列的东西都是经过某个动作之后派发出来的,而有的时候我们需要延时某个操作 (比如订阅电子报半个小时之后发送给对方)或者需要在凌晨两点的时候自动清除昨天的统计数据,那么我们就需要一个调度器来做统一管理。

传统的定时操作大概就是 crontab 了,它非常的方便,但是如果定时任务太多的话,不管是写还是改都是一个大麻烦,所以任务调度器一般都封装了一层。

1.2 名词说明

  • job:描述一个任务本身。

  • triggers:触发器, 描述一个任务何时被触发,有按日期、按时间间隔、按 cronjob 描述式三种触发方式

  • jobstores:job 存储

  • executors:执行器

  • schedulers:调度器, 可以看做整个系统的 driver,外部世界通过它来实现任务(Job)的增删改查管理。

1.3 Roadmap

2 需求分析

2.1 功能需求

  • 定时执行任务

2.2 非功能需求

高可用

2.3 调研

2.3.1 轮询固定时间间隔

while True:
    do something
    time.sleep(XXX)

2.3.2 crontab */N 定时轮询

2.3.3 轮询判断更新时间间隔

def set_interval(self, interval):
    self._interval = interval

def run_one_cycle(self):
    '''所有继承函数需要重写该函数'''
    pass

def run(self):
    while not self._exit_event.is_set():
        try:
            start_time = time.time()
            self.run_one_cycle()
            end_time = time.time()
            delta_time = end_time - start_time
            #中间执行过程可能比较耗时,故 sleep 的时间相应的缩减
            if self._interval > delta_time:
                log.debug('thread %s sleep %d'%(threading.currentThread().getName(), self._interval - delta_time))
                time.sleep(self._interval - delta_time)
        except KeyboardInterrupt:
            log.error('KeyboardInterrupt happened,but no working thread exist')
        except Exception:
            log.error('[FATAL] unknown exception occurs %s'%(traceback.format_exc()))
            time.sleep(self._interval)
    log.error('thread %s exit abnormal,event is %d'%(threading.currentThread().getName(), str(self._exit_event.is_set())))

2.3.4 APScheduler

APScheduler(Advanced Python Scheduler)是一个轻量级的 Python 定时任务调度框架

环境

Python:2.7
APScheduler: 经典版本 2.1 版本

APScheduler 组件

触发器 trigger
作业存储器 job store
执行器 executor
调度器 scheduler
  • 触发器(trigger),触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。--- 无状态

  • 作业存储器(job store),存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中

    • 其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,作业存储器不能共享调度器。

  • 执行器(executor),处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器。

  • 调度器(scheduler),配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器

2.3.4.1 APScheduler 组件说明

(1) 调度器

非阻塞(目前使用非阻塞模式)

import time
from xlib.apscheduler import scheduler

my_scheduler = scheduler.Scheduler()
# 会启动一个线程进行执行 self._main_loop
my_scheduler.start()

while True:
    time.sleep(2)

阻塞

from xlib.apscheduler import scheduler

my_scheduler = scheduler.Scheduler(standalone=True)
my_scheduler.start()

相关代码

if self.standalone:
   self._main_loop()
else:
   self._thread = Thread(target=self._main_loop, name="APScheduler")
   self._thread.setDaemon(self.daemonic)
   self._thread.start()

这个_main_loop 函数主要是用来判断任务是否到了执行的时间,然后进行执行任务(self._process_jobs(now)

调度器工作流程 (v2.1.2)

+scheduler(new thread)--------------------------------------------------------------------------------------+
|+job manager---------------+       + _main_loop-----------------------------------------------------------+|
||                          |       |+process_jobs------------------------------------------------+ +-----+||
||(add)                     |       ||(1) Get all jobs (tuple(jobstore.jobs))                     | |     |||
||EVENT_JOBSTORE_JOB_ADDED  |       ||(2) Computes the scheduled run times                        | |     |||
||(unschedule_job)          | event ||    run_times is a list(run_times = job._get_run_times(now))|+|sleep|||调度器
||EVENT_JOBSTORE_JOB_REMOVED|------->|    Example: [datetime.datetime(2020, 11, 29, 0, 17, 58)]   | |     |||
||                          |       ||(3) executor                                                | |     |||
||                          |       ||    _threadpool.submit(self._run_job, job, run_times)       | |     |||
||                          |       |+--------------------------+---------------------------------+ +-----+||
|+------------+-------------+       +---------------------------|------------------------------------------+|
|             |                                                 |                                           |
+-------------|-------------------------------------------------|--------^----------------------------------+
              |                                                 |        |
              |                                                 |        |
              |                                                 |        |
              |                                                 |        |
+job store----V---------------------------------------+         |        |
|+job------------+ +job--------------+ +job----------+|         |        |
||+trigger------+| |+trigger--------+| |+trigger----+||         |        |
|||SimpleTrigger|| ||IntervalTrigger|| ||CronTrigger||| ........|........|...................................作业存储器
||+-------------+| |+---------------+| |+-----------+||         |        |
|+---------------+ +-----------------+ +-------------+|         |        |
|                                                     |         |        |
|        +DB ------------------------------+          |         |        | EVENT_JOB_EXECUTED
|        |          table(ruqi)            |          |         |        | EVENT_JOB_ERROR
|        +---------------------------------+          |         |        | EVENT_JOB_MISSED
+-----------------------------------------------------+         |        |
                                                                |        |
                                                                |        |
                                                                |        |
                                                                |        |
                                    +submit job and executor----|--------+-------------+
                                    | +_run_job(scheduler)------V-+      +threadpool-+ |
                                    | |+job------+                |  put |+queue----+| |
                                    | ||job.func |                |----->||         || |
                                    | |+---------+                |      |+---------+| |
                                    | +---------------------------+      +-----^-----+ |.....................执行器
                                    |                                          |get    |
                                    | +_run_jobs(threadpool new thread)--------------+ |
                                    | | exe _run_job(self, job, run_times)(scheduler)| |
                                    | +----------------------------------------------+ |
                                    +--------------------------------------------------+

(2) 作业存储器

默认使用的是 RAMJobStore(内存),还提供了 SQLAlchemyJobStore、ShelveJobStore、RedisJobStore 和 MongoDBJobStore

除了 MemoryJobStore 外,其他几种都使用 pickle 做序列化工具

所以这里要指出一点,如果你不是在用内存做 jobstore,那么必须确保你提供给 job 的可执行函数必须是可以被全局访问的,也就是可以通过 ref_to_obj 反查出来的,否则无法序列化。
---------- 如果可执行函数不是全局访问的,则会出现常见问题中的问题,服务端在重新启动时无法加载 job

使用数据库做 jobstore,就会发现,其实创建了一张有三个域的的 jobs 表,分别是****id, next_run_time, job_state,其中 job_state 是 job 对象 pickle 序列化后的二进制**,而 id 和 next_run_time 则是支持 job 的两类查询(按 id 和按最近运行时间)

ShelveJobStore

获取存储中的任务

import os
import sys
if os.path.exists('third'):
    cur_path = os.path.split(os.path.realpath(__file__))[0]
    sys.path.insert(0, os.path.join(cur_path, 'third'))

import pickle
import shelve

pickle_protocol=pickle.HIGHEST_PROTOCOL
data_dict = shelve.open('./example.db', 'c',pickle_protocol)

print data_dict

(3) 执行器

在实例化 Scheduler 类的时候会创建一组默认线程数为 20 的线程池

这个线程池用来执行真正的任务。

如下面的意思是,时间一到就立马提交给 submit 去调度任务。

if run_times:
    self._threadpool.submit(self._run_job, job, run_times)


def _process_jobs(self, now):
    next_wakeup_time = None
    self._jobstores_lock.acquire()
    try:
        for alias, jobstore in iteritems(self._jobstores):
            for job in tuple(jobstore.jobs):
                run_times = job.get_run_times(now)
                if run_times:
                    self._threadpool.submit(self._run_job, job, run_times)

(4) 触发器

描述调度任务被触发的条件。不过触发器完全是无状态的。

APScheduler 有三种内建的 trigger:

  • date 触发器 --------- 基于固定时间的调度:作业任务只会执行一次。它表示特定的时间点触发

  • interval 触发器 ----- 周期任务:固定时间间隔触发

  • cron 触发器 --------- Cron 风格的任务的调度:在特定时间周期性地触发,和 Linux crontab 格式兼容。它是功能最强大的触发器

date

add_date_job(self, job_function, date, args=None, kwargs=None, **options)

date (datetime|str) – the date/time to run the job at  -(任务开始的时间)
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

interval

add_interval_job(self, job_function, weeks=0, days=0, hours=0, minutes=0,seconds=0, start_date=None, args=None, kwargs=None,**options)

weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations

cron(推荐)

def add_cron_job(self, job_function, year=None, month=None, day=None, week=None,day_of_week=None, hour=None, minute=None, second=None,start_date=None, args=None, kwargs=None, **options):

(int|str) 表示参数既可以是 int 类型,也可以是 str 类型
(datetime | str) 表示参数既可以是 datetime 类型,也可以是 str 类型

year (int|str) – 4-digit year -(表示四位数的年份,如 2008 年)
month (int|str) – month (1-12) -(表示取值范围为 1-12 月)
day (int|str) – day of the (1-31) -(表示取值范围为 1-31 日)
week (int|str) – ISO week (1-53) -(格里历 2006 年 12 月 31 日可以写成 2006 年 -W52-7(扩展形式)或 2006W527(紧凑形式))
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用 0-6 表示也可以用其英语缩写表示)
hour (int|str) – hour (0-23) - (表示取值范围为 0-23 时)
minute (int|str) – minute (0-59) - (表示取值范围为 0-59 分)
second (int|str) – second (0-59) - (表示取值范围为 0-59 秒)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)

例子: 与 crontab 用法一致
a. 表示 2019 年 11 月 1 日 17 时 19 分 07 秒执行该程序   {"year": 2017, "month": 11, "day": 1, "hour": 17, "minute": 19, "second": 07}
b. 表示每 5 秒执行该程序一次   {"second": "*/5"}
c. 每天 2 点 21 分执行一次   {"day": "*", "hour": 02, "minute": 21}

Linux crontab

minute(s) hour(s) day(s) month(s) weekday(s) command(s)
# Use the hash sign to prefix a comment
# +—————- minute (0 – 59)
# |  +————- hour (0 – 23)
# |  |  +———- day of month (1 – 31)
# |  |  |  +——- month (1 – 12)
# |  |  |  |  +—- day of week (0 – 7) (Sunday=0 or 7)
# |  |  |  |  |
# *  *  *  *  *  command to be executed

2.3.4.2 使用

(1) 使用步骤

APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

(1) 新建一个 schedulers (调度器)
(2) 添加一个调度任务
(3) 运行调度任务
import time
from datetime import datetime, timedelta

from xlib.apscheduler import scheduler
from xlib.apscheduler.jobstores.shelve_store import ShelveJobStore

def tick(msg=None):
    print "hello {msg}".format(msg=msg)

_scheduler = scheduler.Scheduler()
_scheduler.add_jobstore(ShelveJobStore('example.db'), 'shelve')
_scheduler.start()

# 间隔执行
_scheduler.add_interval_job(tick, seconds=3)

# 一次执行
exe_time = datetime.now() + timedelta(seconds=5)
_scheduler.add_date_job(tick, date=exe_time, kwargs={"msg": "world"})

# 定期执行
rule = "* * * * * *"
cron_rule_list = rule.split(' ')
_scheduler.add_cron_job(
        tick,
        second=cron_rule_list[0],
        minute=cron_rule_list[1],
        hour=cron_rule_list[2],
        day=cron_rule_list[3],
        month=cron_rule_list[4],
        day_of_week=cron_rule_list[5],
        name="ceshi",
        jobstore='shelve',
        kwargs={"msg":"meetbill"}
    )

while True:
    print "---------------1s"
    time.sleep(1)

(2) 作业处理

获取作业列表

可通过 get_jobs() 获取作业列表

返回结果 jobs 是个 list,里面的 item 实例为如下:
apscheduler/job.py:Job

job.__dict__

{
    "next_run_time": datetime.datetime(2019, 11, 3, 19, 0), // 下一次执行时间
    "_lock": <thread.lock object at 0x107297590>,
    "name": u"run_cmd",
    "misfire_grace_time": 1, # seconds, 此项表示如果超过 1s 则任务不会被执行
    "instances": 0,
    "args": [],
    "runs": 1,
    "max_instances": 1,
    "max_runs": None,
    "coalesce": True,
    "trigger": <CronTrigger (month="*", day="*", day_of_week="*", hour="*", minute="*/6")>,
    "func": <function alarm at 0x10764aaa0>,
    "kwargs": {"time": "xx"},
    "id": "874875"
}
  • coalesce:当由于某种原因导致某个 job 积攒了好几次没有实际运行(比如说系统挂了 5 分钟后恢复,有一个任务是每分钟跑一次的,按道理说这 5 分钟内本来是“计划”运行 5 次的,但实际没有执行),如果 coalesce 为 True,下次这个 job 被 submit 给 executor 时,只会执行 1 次,也就是最后这次,如果为 False,那么会执行 5 次(不一定,因为还有其他条件,看后面 misfire_grace_time 的解释)

  • max_instance: 就是说同一个 job 同一时间最多有几个实例再跑,比如一个耗时 10 分钟的 job,被指定每分钟运行 1 次,如果我们 max_instance 值为 5,那么在第 6~10 分钟上,新的运行实例不会被执行,因为已经有 5 个实例在跑了

  • misfire_grace_time:设想和上述 coalesce 类似的场景,如果一个 job 本来 14:00 有一次执行,但是由于某种原因没有被调度上,现在 14:01 了,这个 14:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是 1 分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。

添加作业

通过 add_job() 添加作业

add_job()

2.3.4.3 APScheduler 的持久化存储

MySQL

url="mysql+pymysql://user:passwd@host/dbname?charset=utf8"
job.scheduler.add_jobstore(jobstore="sqlalchemy",url=url,tablename='api_job')

Redis

conf = {
        "host": "10.0.1.1",
        "port": 6379,
        "db": 0,
        "max_connections": 10
      }
job.scheduler.add_jobstore(jobstore="redis", **conf)

Redis 需要 keys 命令,Redis 集群不支持 Keys 命令,故不推荐使用 Redis

2.3.5 rq-scheduler

栗子:https://blog.csdn.net/szj_jojoli/article/details/103125310

2.3.6 折衷分析

  • schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务

  • Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求

  • Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源

考虑到 APScheduler 实现比较简单,使用 APScheduler

3 总体设计

3.1 系统架构

 +-----------------------------------------------------------------------------------------+
 |                                       client/brower                                     |
 +------------------------------+-------------------------------------+--------------------+
................................|(domain name)........................|(domain name)...................
                                |(vip)                                |
 +Nginx-------------------------V--------------+  +Amis---------------V--------------------+
 |             butterfly-fe                    |  |                                        |  表现层
 +-----------+------------------+--------------+  +-------------------+--------------------+
             | vip              |                                     |
 +Auth-------V----------+       |                                     |
 |                      |       |                                     |
 +----------------------+       |                                     |
................................|.....................................|................................ APP
 +Ruqi--------------------------V-------------------------------------V-------------------+
 |                                                                                        |  业务层
 +---------------+--------------------------+---------------------------------------------+
                 |                          |
 +---------------|--------------------------|---------------------------------------------+
 | +MQ-----------V-----------+  +DB---------V-------------+                               |
 | |+---------+              |  |+-------+                |                               |
 | ||Twemproxy| \            |  ||DBproxy| \              |                               |
 | |+--+---+--+  +----------+|  |+-+---+-+   +---------+  |                               |
 | |   |   |     |Metaserver||  |  |   |     | Control |  |                               |  数据层
 | |+--V---V--+  +----------+|  |+-V---V-+   +---------+  |                               |
 | ||  Redis  | /            |  || MySQL | /              |                               |
 | |+---------+              |  |+-------+                |                               |
 | +-------------------------+  +-------------------------+                               |
 |      Redis Cluster                MySQL Cluster                                        |
 +----------------------------------------------------------------------------------------+
  • 表现层: Butterfly-fe 和 Amis 平台两个用户操作入口

  • 业务层: ruqi 模块负责

  • 数据层:

3.2 模块简介

+scheduler(new thread)--------------------------------------------------------------------------------------+
|+job manager---------------+       + _main_loop-----------------------------------------------------------+|
||                          |       |+process_jobs------------------------------------------------+ +-----+||
||(add)                     |       ||(1) Get all jobs (tuple(jobstore.jobs))                     | |     |||
||EVENT_JOB_ADDED           |       ||(2) Computes the scheduled run times                        | |     |||
||(unschedule_job)          | event ||    run_times is a list(run_times = job._get_run_times(now))|+|sleep|||调度器
||EVENT_JOB_REMOVED         |------->|    Example: [datetime.datetime(2020, 11, 29, 0, 17, 58)]   | |     |||
||                          |       ||(3) executor                                                | |     |||
||                          |       ||    _threadpool.submit(self._run_job, job, run_times)       | |     |||
||                          |       |+--------------------------+---------------------------------+ +-----+||
|+------------+-------------+       +---------------------------|------------------------------------------+|
|             |                                                 |                                           |
+-------------|-------------------------------------------------|--------^----------------------------------+
              |                                                 |        |
              |                                                 |        |
              |                                                 |        |
              |                                                 |        |
+job store----V---------------------------------------+         |        |
|+job------------+ +job--------------+ +job----------+|         |        |
||+trigger------+| |+trigger--------+| |+trigger----+||         |        |
|||DateTrigger  || ||IntervalTrigger|| ||CronTrigger||| ........|........|...................................作业存储器
||+-------------+| |+---------------+| |+-----------+||         |        |
|+---------------+ +-----------------+ +-------------+|         |        |
|                                                     |         |        |
|+DB ------------------------------------------------+|         |        | EVENT_JOB_EXECUTED
||                 table(ruqi)                       ||         |        | EVENT_JOB_ERROR
|+---------------------------------------------------+|         |        | EVENT_JOB_MISSED
+-----------------------------------------------------+         |        |
                                                                |        |
                                                                |        |
                                                                |        |
                                                                |        |
                                    +submit job and executor----|--------+-------------+
                                    | +_run_job(scheduler)------V-+      +threadpool-+ |
                                    | |+job------+                |  put |+queue----+| |
                                    | ||job.func |                |----->||         || |
                                    | |+---------+                |      |+---------+| |
                                    | +---------------------------+      +-----^-----+ |.....................执行器
                                    |                                          |get    |
                                    | +_run_jobs(threadpool new thread)--------------+ |
                                    | | exe _run_job(self, job, run_times)(scheduler)| |
                                    | +----------------------------------------------+ |
                                    +--------------------------------------------------

3.2.1 job store

  • 内存

    • self._jobs_index = {} : id -> (job, timestamp) lookup table

    • self._jobs = [] :存储 (job, timestamp) 的有序列表,按照 next_run_time 和 job_id 排序(升序)

  • Redis(和内存存储方式类似)

    • self.jobs_key:hash 存储所有 jobs

    • self.run_times_key: zset 存储,按照 next_run_time 排序

3.3 设计与折衷

3.3.1 高可用方案

方案一:租约续期 + 实例抢主,在查询任务前进行检查

优点时日常只有一主实例在进行工作,只是主异常是进行实例变更
缺点是当新增任务时,如果不是在主实例上执行新增任务的话,主实例可能会漏掉最近一次的执行

方案二:实例间无主备之分,在执行时进行抢锁执行,抢不到锁时则不执行

优点是实现简单,缺点是所有实例均在工作状态,对数据库会有压力

方案一改造成本比较高,以及总任务数是千级别,同时执行任务数是个位数并发,数据库压力可控,故选择方案二

3.4 潜在风险

4 详细设计

4.1 ruqi

4.1.1 交互流程

4.1.1.1 增

    开始
     |
     V                   (否)
trigger, rule 是否合法 -----> 结束
     |
     V
根据 cmd 封装 func
     |
     V                   (异常)
 创建 job-------------------> 结束
     |
     V
    结束

4.1.1.2 删

    开始
      |
      V                  (异常)
   删除 job-----------------> 异常
      |
      V
     结束

4.1.1.3 改

修改任务

    开始
     |
     V                   (否)
trigger, rule 是否合法 -----> 结束
     |
     V
根据 cmd 封装 func
     |
     V                   (异常)
 修改 job-------------------> 结束
     |
     V
    结束

暂停任务

    开始
     |
     V
 修改 job next_run_time 为 none
     |
     V
    结束

恢复任务

    开始
     |
     V
计算 job next_run_time
     |
     V                         否
job next_run_time 是否有效-----------> 删除 job 任务 -----> 结束
     |
     V
设置 job next_run_time
     |
     V
   结束

4.1.1.4 查

获取任务列表

    开始
     |
     V
检查 jobstore 类型
     |
     V
从 job store 中获取 job 列表
     |
     V
  返回 job 列表
     |
     V
    结束

获取任务详情

    开始
     |
     V
获取 default jobstore 中 job
     |
     V
对 job 返回结果进行格式化
     |
     V
  返回 job
     |
     V
   结束

4.1.2 数据库设计

# 此处替换为自己的 database
use {database};

CREATE TABLE `ruqi_jobs` (
  `id` varchar(255) NOT NULL COMMENT "任务 id",
  `next_run_time` double DEFAULT NULL COMMENT "下次执行时间时间戳",
  `job_state` blob NOT NULL COMMENT "任务数据",
  `job_lock` tinyint(1) NOT NULL COMMENT "任务锁",
  `job_name` varchar(64) DEFAULT NULL COMMENT "任务名称,用于分类使用",
  `job_trigger` varchar(16) DEFAULT NULL COMMENT "触发器类型",
  `job_rule` varchar(64) DEFAULT NULL COMMENT "定时任务规则",
  `u_time` datetime NOT NULL COMMENT "更新时间",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobs_next_run_time` (`next_run_time`),
  KEY `ruqijobs_job_lock` (`job_lock`),
  KEY `ruqijobs_job_name` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务数据';

CREATE TABLE `ruqi_jobs_history` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
  `job_id` varchar(255) NOT NULL COMMENT "任务 id",
  `job_name` varchar(255) NOT NULL COMMENT "任务名称,用于分类使用",
  `cmd` varchar(64) NOT NULL COMMENT "执行命令",
  `cmd_is_success` tinyint(1) NOT NULL COMMENT "是否执行成功",
  `cmd_output` varchar(4096) NOT NULL COMMENT "程序输出",
  `cmd_cost` double NOT NULL COMMENT "执行耗时",
  `scheduler_name` varchar(64) NOT NULL COMMENT "执行的 scheduler 名称",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobshistory_job_id` (`job_id`),
  KEY `ruqijobshistory_job_name` (`job_name`),
  KEY `ruqijobshistory_cmd_is_success` (`cmd_is_success`),
  KEY `ruqijobshistory_cmd_cost` (`cmd_cost`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='如期-定时任务执行历史';

4.1.3 接口形式

/ruqi/add_job

创建任务

POST

/ruqi/add_job 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'

参数说明

job_trigger: job_trigger(cron/interval/date)
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
rule       :
    date: "2020-12-16 18:03:17"/"2020-12-16 18:05:17.682862"/"now"
    cron: "* * * * * *"
    interval: Xs/Xm/Xh/Xd

/ruqi/remove_job

删除任务

POST

/ruqi/remove_job 'job_id'

参数说明

job_id: job id

/ruqi/modify_job

修改任务

POST

/ruqi/modify_job 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'

参数说明

job_trigger: job_trigger(cron/interval/date)
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
rule       :
    date: "2020-12-16 18:03:17"/"2020-12-16 18:05:17.682862"/"now"
    cron: "* * * * * *"
    interval: Xs/Xm/Xh/Xd

/ruqi/pause_job

暂停任务

POST

/ruqi/pause_job 'job_id'

/ruqi/resume_job

POST

/ruqi/resume_job 'job_id'

/ruqi/get_jobs

GET

/ruqi/get_jobs  'job_id=None' 'job_name=None' 'page_index=None' 'page_size=15'

参数说明

job_id: job id, job 唯一标识
job_name: job name,有可能重复
page_index: 页码
page_size: 列表中每页多少任务

/ruqi/get_job

GET

/ruqi/get_job 'job_id'

/ruqi/get_history

GET

/ruqi/get_history  'job_id=None' 'job_name=None' 'page_index=None' 'page_size=15'

参数说明

job_id: job id, job 唯一标识
job_name: job name,有可能重复
page_index: 页码
page_size: 列表中每页多少任务

/ruqi/status

输出如期当前状态

GET

/ruqi/status

响应

{
   "data" : {
      "check_time" : "2022-03-06 21:36:13",
      "wait_seconds" : 4.505374,
      "next_wakeup_time" : "2022-03-06 21:36:18"
   },
   "stat" : "OK"
}

/ruqi/wakeup

强制唤醒一次调度器

POST

/ruqi/wakeup

响应

{
   "stat" : "OK"
}

5 传送门

Last updated