任务调度 RuQi(如期)
要相信,一切所期盼的,都会如期而至
1 架构
高可用部署如期,ruqi1 和 ruqi2 均为 butterfly 应用,ruqi1 和 ruqi2 分别部署在两台不同的机器上
+---------------+ +---------------+
| ruqi1 | | ruqi2 |
+------+--------+ +-------+-------+
| |
| |
| |
+----V-------------------------V---+
| MySQL |
+----------------------------------+
如期可提供定时、周期性执行 python/shell 脚本、发送 HTTP POST 请求、发布百川任务的功能
2 部署
2.1 建表
# 此处替换为自己的 database
use {database};
CREATE TABLE `ruqi_jobs` (
`id` varchar(255) NOT NULL COMMENT "任务 id",
`next_run_time` double DEFAULT NULL COMMENT "下次执行时间时间戳",
`job_state` blob NOT NULL COMMENT "任务数据",
`job_lock` tinyint(1) NOT NULL COMMENT "任务锁",
`job_name` varchar(64) DEFAULT NULL COMMENT "任务名称,用于分类使用",
`job_trigger` varchar(16) DEFAULT NULL COMMENT "触发器类型",
`job_rule` varchar(64) DEFAULT NULL COMMENT "定时任务规则",
`u_time` datetime NOT NULL COMMENT "更新时间",
`c_time` datetime NOT NULL COMMENT "创建时间",
PRIMARY KEY (`id`),
KEY `ruqijobs_next_run_time` (`next_run_time`),
KEY `ruqijobs_job_lock` (`job_lock`),
KEY `ruqijobs_job_name` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务数据';
CREATE TABLE `ruqi_jobs_history` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
`job_id` varchar(255) NOT NULL COMMENT "任务 id",
`job_name` varchar(255) NOT NULL COMMENT "任务名称,用于分类使用",
`cmd` varchar(64) NOT NULL COMMENT "执行命令",
`cmd_is_success` tinyint(1) NOT NULL COMMENT "是否执行成功",
`cmd_output` varchar(4096) NOT NULL COMMENT "程序输出",
`cmd_cost` double NOT NULL COMMENT "执行耗时",
`scheduler_name` varchar(64) NOT NULL COMMENT "执行的 scheduler 名称",
`c_time` datetime NOT NULL COMMENT "创建时间",
PRIMARY KEY (`id`),
KEY `ruqijobshistory_job_id` (`job_id`),
KEY `ruqijobshistory_job_name` (`job_name`),
KEY `ruqijobshistory_cmd_is_success` (`cmd_is_success`),
KEY `ruqijobshistory_cmd_cost` (`cmd_cost`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='如期-定时任务执行历史';
2.2 配置
<butterfly_project>/conf/config.py
...
DATABASES = {
"default": "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"
}
...
# Scheduler
scheduler_store = "mysql" # ("mysql"/"memory")
2.3 应用
将 examples/ruqi/handlers/ruqi 目录移动到 <butterfly_project>/handlers/ 下
3 接口
/ruqi/job_add 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'
/ruqi/job_history_clear 'job_id=None' 'job_name=None' 'retention_day=180' 'limit_num=1000'
/ruqi/job_history_list 'job_id=None' 'job_name=None' 'is_success=None' 'scheduler_name=None' 'display=table' 'page_index=0' 'page_size=15'
/ruqi/job_list 'job_id=None' 'job_name=None' 'display=table' 'page_index=None' 'page_size=15'
/ruqi/job_modify 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'
/ruqi/job_pause 'job_id'
/ruqi/job_remove 'job_id'
/ruqi/job_resume 'job_id'
/ruqi/job_show 'job_id'
/ruqi/scheduler_status
/ruqi/scheduler_wakeup
3.1 添加任务
3.1.1 参数
job_trigger: 触发方式 (cron/interval/date)
:cron:crontab 规则触发
: interval: 固定周期触发
: date: 指定时间触发,仅会执行一次
job_id : job id(唯一索引)
job_name : 用作分类
cmd : job cmd
:【脚本 1 】bash scripts/xx/xx.sh
:【脚本 2 】python scripts/xx/xx.py arg1 arg2
:【URL POST 请求 1 】http://127.0.0.1:8585/demo_api/hello#{"str_info":"hello"}
:【URL POST 请求 2 】http://127.0.0.1:8585/demo_api/ping
:【百川请求 1 】/demo_api/hello#{"str_info":"hello"}
:【百川请求 2 】/demo_api/ping
rule :
:cron 规则: "*(秒) *(分) *(时) *(日) *(月) *(周)",如:"0 20 2 * * *"(每天凌晨 2:20 进行操作)
: interval 规则: 固定周期触发: Xs/Xm/Xh/Xd, 如: 10s(每 10s 进行一次操作)
: date 规则: 2020-12-16 18:03:17/2020-12-16 18:05:17.682862/now (now 为立即触发一次)
3.1.2 例子
周期任务
eg: 每 10s 向百川中添加一次 /demo_api/hello 的请求
from xlib.util import http_util
data = {
"job_trigger": "interval",
"job_id": "helloworld",
"job_name": "ceshi",
"cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
"rule": "10s"
}
res = http_util.post_json("http://127.0.0.1:8200/ruqi/job_add", data=data)
print res.output()
延迟任务
eg: 任务 1 小时后再发起执行
from xlib.util import http_util
from xlib.util import time_util
cur_time = time_util.get_current_time()
# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)
data = {
"job_trigger": "date",
"job_id": "helloworld",
"job_name": "ceshi",
"cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
"rule": exe_time_str
}
res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()
例子,延时发起星桥任务
import json
from xlib.util import http_util
from xlib.util import time_util
cur_time = time_util.get_current_time()
# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)
instance_name = "proxy_396399"
params_dict = {
"job_namespace": "except_unit",
"job_type": "unit_delete",
"job_name": instance_name,
"job_extra": {
"unit_id": instance_name
}
}
data = {
"job_trigger": "date",
"job_id": instance_name,
"job_name": "unit_delete",
"cmd": "/xingqiao/create_job#{params_str}".format(params_str=json.dumps(params_dict)),
"rule": exe_time_str
}
res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()
3.2 删除任务
POST
/ruqi/job_remove 'job_id'
参数说明
job_id: job id
3.3 修改任务
/ruqi/job_modify
修改任务
POST
/ruqi/job_modify 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'
参数说明
job_trigger: job_trigger(cron/interval/date)
job_id : job id(唯一索引)
job_name : 用作分类
cmd : job cmd
rule :
date: "2020-12-16 18:03:17"/"2020-12-16 18:05:17.682862"/"now"
cron: "* * * * * *"
interval: Xs/Xm/Xh/Xd
3.4 暂停任务
POST
/ruqi/job_pause 'job_id'
3.5 恢复暂停的任务
POST
/ruqi/job_resume 'job_id'
3.6 job 列表
GET
/ruqi/job_lis
Headers
Content-Type
application/json
Authorization
Bearer <token>
Params
job_id
string
[可选] job id
job_name
string
[可选] job name
page_index
int
[可选] 页码,如不传,则返回全部数据
page_size
int
[可选] 默认值 15
Response
{
"data": {
"total": 72,
"list": [{
"next_run_time": "2025-02-07 18:49:38",
"job_id": "demo_job",
"c_time": "2024-03-26 21:49:38",
"cmd": "http://xx",
"rule": "1200s",
"job_trigger": "interval",
"job_name": "demo"
}]
},
"stat": "OK"
}
{
"stat": "ERR_BAD_PARAMS"
}
3.7 job 详情
GET
/ruqi/job_show
Headers
Content-Type
application/json
Authorization
Bearer <token>
Params
job_id
string
job id
Response
{
"data": {
"job_id": "/xunzong/event_action_cron",
"cmd": "/xunzong/event_action_cron",
"job_trigger": "interval",
"job_name": "rdb",
"rule": "60s",
"nexttime": "2025-02-07 19:08:47.078474"
},
"stat": "OK"
}
{
"stat": "ERR_BAD_PARAMS"
}
3.8 job 执行历史
GET
/ruqi/get_history 'job_id=None' 'job_name=None' 'page_index=None' 'page_size=15'
参数说明
job_id: job id, job 唯一标识
job_name: job name,有可能重复
page_index: 页码
page_size: 列表中每页多少任务
3.9 调度器状态
GET
/ruqi/scheduler_status
Headers
Content-Type
application/json
Authorization
Bearer <token>
Response
{
"stat": "OK",
"data": {
"wait_seconds": 5.83369,
"next_wakeup_time": "2025-02-07 19:51:13",
"check_time": "2025-02-07 19:51:08"
}
}
3.10 调度器唤醒
POST
/ruqi/scheduler_wakeup
Headers
Content-Type
application/json
Authorization
Bearer <token>
Response
{
"stat": "OK"
}
Last updated