任务调度 RuQi(如期)
1 架构
高可用部署如期,ruqi1 和 ruqi2 均为 butterfly 应用,ruqi1 和 ruqi2 分别部署在两台不同的机器上
+---------------+        +---------------+
|    ruqi1      |        |     ruqi2     |
+------+--------+        +-------+-------+
       |                         |
       |                         |
       |                         |
  +----V-------------------------V---+
  |               MySQL              |
  +----------------------------------+如期可提供定时、周期性执行 python/shell 脚本、发送 HTTP POST 请求、发布百川任务的功能
2 部署
2.1 建表
# 此处替换为自己的 database
use {database};
CREATE TABLE `ruqi_jobs` (
  `id` varchar(255) NOT NULL COMMENT "任务 id",
  `next_run_time` double DEFAULT NULL COMMENT "下次执行时间时间戳",
  `job_state` blob NOT NULL COMMENT "任务数据",
  `job_lock` tinyint(1) NOT NULL COMMENT "任务锁",
  `job_name` varchar(64) DEFAULT NULL COMMENT "任务名称,用于分类使用",
  `job_trigger` varchar(16) DEFAULT NULL COMMENT "触发器类型",
  `job_rule` varchar(64) DEFAULT NULL COMMENT "定时任务规则",
  `u_time` datetime NOT NULL COMMENT "更新时间",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobs_next_run_time` (`next_run_time`),
  KEY `ruqijobs_job_lock` (`job_lock`),
  KEY `ruqijobs_job_name` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务数据';
CREATE TABLE `ruqi_jobs_history` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
  `job_id` varchar(255) NOT NULL COMMENT "任务 id",
  `job_name` varchar(255) NOT NULL COMMENT "任务名称,用于分类使用",
  `cmd` varchar(255) NOT NULL COMMENT "执行命令",
  `cmd_is_success` tinyint(1) NOT NULL COMMENT "是否执行成功",
  `cmd_output` varchar(4096) NOT NULL COMMENT "程序输出",
  `cmd_cost` double NOT NULL COMMENT "执行耗时",
  `scheduler_name` varchar(64) NOT NULL COMMENT "执行的 scheduler 名称",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobshistory_job_id` (`job_id`),
  KEY `ruqijobshistory_job_name` (`job_name`),
  KEY `ruqijobshistory_cmd_is_success` (`cmd_is_success`),
  KEY `ruqijobshistory_cmd_cost` (`cmd_cost`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务执行历史';2.2 配置
2.2.1 <butterfly_project>/conf/servicer/db_ruqi.py
#url_ruqi = "redirect://<file_name>:<config_name>"
url_ruqi = "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"可以选择 `redirect://` 方式重定向到别的配置,若存在 db_ruqi 配置,则不会读取 conf/config.py 配置
2.2.2 <butterfly_project>/conf/config.py
...
DATABASES = {
    "default": "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"
}
...
# Scheduler
scheduler_store = "mysql"      # ("mysql"/"memory")2.3 应用
将 examples/ruqi/handlers/ruqi 目录移动到 <butterfly_project>/handlers/ 下3 接口
3.1 调度任务接口
3.1.1 增加任务
job_trigger: 触发方式 (cron/interval/date)
           :cron:crontab 规则触发
           : interval: 固定周期触发
           : date: 指定时间触发,仅会执行一次
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
           :【脚本 1          】bash scripts/xx/xx.sh
           :【脚本 2          】python scripts/xx/xx.py arg1 arg2
           :【URL POST 请求 1 】http://127.0.0.1:8585/demo_api/hello#{"str_info":"hello"}
           :【URL POST 请求 2 】http://127.0.0.1:8585/demo_api/ping
           :【百川请求 1      】/demo_api/hello#{"str_info":"hello"}
           :【百川请求 2      】/demo_api/ping
rule       :
           :cron     规则: "*(秒) *(分) *(时) *(日) *(月) *(周)",如:"0 20 2 * * *"(每天凌晨 2:20 进行操作)
           : interval 规则: 固定周期触发: Xs/Xm/Xh/Xd, 如: 10s(每 10s 进行一次操作)
           : date     规则: 2020-12-16 18:03:17/2020-12-16 18:05:17.682862/now (now 为立即触发一次)周期任务
eg: 每 10s 向百川中添加一次 /demo_api/hello 的请求
from  xlib.util import http_util
data = {
        "job_trigger": "interval",
        "job_id": "helloworld",
        "job_name": "ceshi",
        "cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
        "rule": "10s"
        }
res = http_util.post_json("http://127.0.0.1:8200/ruqi/job_add", data=data)
print res.output()延迟任务
eg: 任务 1 小时后再发起执行
from xlib.util import http_util
from xlib.util import time_util
cur_time = time_util.get_current_time()
# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)
data = {
        "job_trigger": "date",
        "job_id": "helloworld",
        "job_name": "ceshi",
        "cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
        "rule": exe_time_str
        }
res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()例子,延时发起星桥任务
import json
from xlib.util import http_util
from xlib.util import time_util
cur_time = time_util.get_current_time()
# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)
instance_name = "proxy_396399"
params_dict = {
        "job_namespace": "except_unit",
        "job_type": "unit_delete",
        "job_name": instance_name,
        "job_extra": {
            "unit_id": instance_name
    }
}
data = {
        "job_trigger": "date",
        "job_id": instance_name,
        "job_name": "unit_delete",
        "cmd": "/xingqiao/create_job#{params_str}".format(params_str=json.dumps(params_dict)),
        "rule": exe_time_str
        }
res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()3.1.2 删除任务
POST /ruqi/job_remove
Headers
Content-Type
application/json
Authorization
Bearer <token>
Body
job_id
string
job_id
Response
{
  "stat": "OK"
}3.1.3 修改任务
POST
/ruqi/job_modify 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'参数说明
job_trigger: job_trigger(cron/interval/date)
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
rule       :
    date: "2020-12-16 18:03:17"/"2020-12-16 18:05:17.682862"/"now"
    cron: "* * * * * *"
    interval: Xs/Xm/Xh/Xd3.1.4 暂停任务
POST
/ruqi/job_pause 'job_id'3.1.5 恢复暂停的任务
POST
/ruqi/job_resume 'job_id'3.1.6 job 列表
GET /ruqi/job_lis
Headers
Content-Type
application/json
Authorization
Bearer <token>
Params
job_id
string
[可选] job id
job_name
string
[可选] job name
page_index
int
[可选] 页码,如不传,则返回全部数据
page_size
int
[可选] 默认值 15
Response
{
    "data": {
        "total": 72,
        "list": [{
            "next_run_time": "2025-02-07 18:49:38",
            "job_id": "demo_job",
            "c_time": "2024-03-26 21:49:38",
            "cmd": "http://xx",
            "rule": "1200s",
            "job_trigger": "interval",
            "job_name": "demo"
        }]
    },
    "stat": "OK"
}{
  "stat": "ERR_BAD_PARAMS"
}3.1.7 job 详情
GET /ruqi/job_show
Headers
Content-Type
application/json
Authorization
Bearer <token>
Params
job_id
string
job id
Response
{
    "data": {
        "job_id": "/xunzong/event_action_cron",
        "cmd": "/xunzong/event_action_cron",
        "job_trigger": "interval",
        "job_name": "rdb",
        "rule": "60s",
        "nexttime": "2025-02-07 19:08:47.078474"
    },
    "stat": "OK"
}{
  "stat": "ERR_BAD_PARAMS"
}3.2 调度历史接口
3.2.1 调度历史列表
GET
/ruqi/get_history  'job_id=None' 'job_name=None' 'page_index=None' 'page_size=15'参数说明
job_id: job id, job 唯一标识
job_name: job name,有可能重复
page_index: 页码
page_size: 列表中每页多少任务3.2.2 调度历史清理
POST /ruqi/job_history_clear
添加每天清理一次历史任务,默认保留 180 天,可以新增参数修改保留天数
python test_handler.py /ruqi/job_add_by_line "1d python test_handler.py /ruqi/job_history_clear"
3.3 调度引擎接口
3.3.1 调度器状态
GET /ruqi/scheduler_status
Headers
Content-Type
application/json
Authorization
Bearer <token>
Response
{
    "stat": "OK",
    "data": {
        "wait_seconds": 5.83369,
        "next_wakeup_time": "2025-02-07 19:51:13",
        "check_time": "2025-02-07 19:51:08"
    }
}3.3.2 调度器唤醒
POST /ruqi/scheduler_wakeup
Headers
Content-Type
application/json
Authorization
Bearer <token>
Response
{
    "stat": "OK"
}4 FAQ
4.1 MySQL URL 是无效地址
启动 butterfly 失败,<butterfly_project>/__stdout 日志里会有报错信息
4.2 MySQL URL 没有权限
启动 butterfly 成功,定时任务调度器会启动失败,<butterfly_project>/logs/err.log 会报 self._scheduler.start() 失败
5 最佳实践
Last updated