任务调度 RuQi(如期)

要相信,一切所期盼的,都会如期而至

1 架构

高可用部署如期,ruqi1 和 ruqi2 均为 butterfly 应用,ruqi1 和 ruqi2 分别部署在两台不同的机器上

+---------------+        +---------------+
|    ruqi1      |        |     ruqi2     |
+------+--------+        +-------+-------+
       |                         |
       |                         |
       |                         |
  +----V-------------------------V---+
  |               MySQL              |
  +----------------------------------+

如期可提供定时、周期性执行 python/shell 脚本、发送 HTTP POST 请求、发布百川任务的功能

2 部署

2.1 建表

# 此处替换为自己的 database
use {database};
CREATE TABLE `ruqi_jobs` (
  `id` varchar(255) NOT NULL COMMENT "任务 id",
  `next_run_time` double DEFAULT NULL COMMENT "下次执行时间时间戳",
  `job_state` blob NOT NULL COMMENT "任务数据",
  `job_lock` tinyint(1) NOT NULL COMMENT "任务锁",
  `job_name` varchar(64) DEFAULT NULL COMMENT "任务名称,用于分类使用",
  `job_trigger` varchar(16) DEFAULT NULL COMMENT "触发器类型",
  `job_rule` varchar(64) DEFAULT NULL COMMENT "定时任务规则",
  `u_time` datetime NOT NULL COMMENT "更新时间",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobs_next_run_time` (`next_run_time`),
  KEY `ruqijobs_job_lock` (`job_lock`),
  KEY `ruqijobs_job_name` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务数据';
CREATE TABLE `ruqi_jobs_history` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
  `job_id` varchar(255) NOT NULL COMMENT "任务 id",
  `job_name` varchar(255) NOT NULL COMMENT "任务名称,用于分类使用",
  `cmd` varchar(255) NOT NULL COMMENT "执行命令",
  `cmd_is_success` tinyint(1) NOT NULL COMMENT "是否执行成功",
  `cmd_output` varchar(4096) NOT NULL COMMENT "程序输出",
  `cmd_cost` double NOT NULL COMMENT "执行耗时",
  `scheduler_name` varchar(64) NOT NULL COMMENT "执行的 scheduler 名称",
  `c_time` datetime NOT NULL COMMENT "创建时间",
  PRIMARY KEY (`id`),
  KEY `ruqijobshistory_job_id` (`job_id`),
  KEY `ruqijobshistory_job_name` (`job_name`),
  KEY `ruqijobshistory_cmd_is_success` (`cmd_is_success`),
  KEY `ruqijobshistory_cmd_cost` (`cmd_cost`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='如期-定时任务执行历史';

2.2 配置

2.2.1 <butterfly_project>/conf/servicer/db_ruqi.py

#url_ruqi = "redirect://<file_name>:<config_name>"
url_ruqi = "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"

可以选择 `redirect://` 方式重定向到别的配置,若存在 db_ruqi 配置,则不会读取 conf/config.py 配置

2.2.2 <butterfly_project>/conf/config.py

...
DATABASES = {
    "default": "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"
}
...
# Scheduler
scheduler_store = "mysql"      # ("mysql"/"memory")

2.3 应用

将 examples/ruqi/handlers/ruqi 目录移动到 <butterfly_project>/handlers/ 下

3 接口

3.1 调度任务接口

3.1.1 增加任务

job_trigger: 触发方式 (cron/interval/date)
           :cron:crontab 规则触发
           : interval: 固定周期触发
           : date: 指定时间触发,仅会执行一次
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
           :【脚本 1          】bash scripts/xx/xx.sh
           :【脚本 2          】python scripts/xx/xx.py arg1 arg2
           :【URL POST 请求 1 】http://127.0.0.1:8585/demo_api/hello#{"str_info":"hello"}
           :【URL POST 请求 2 】http://127.0.0.1:8585/demo_api/ping
           :【百川请求 1      】/demo_api/hello#{"str_info":"hello"}
           :【百川请求 2      】/demo_api/ping
rule       :
           :cron     规则: "*(秒) *(分) *(时) *(日) *(月) *(周)",如:"0 20 2 * * *"(每天凌晨 2:20 进行操作)
           : interval 规则: 固定周期触发: Xs/Xm/Xh/Xd, 如: 10s(每 10s 进行一次操作)
           : date     规则: 2020-12-16 18:03:17/2020-12-16 18:05:17.682862/now (now 为立即触发一次)

周期任务

eg: 每 10s 向百川中添加一次 /demo_api/hello 的请求

from  xlib.util import http_util
data = {
        "job_trigger": "interval",
        "job_id": "helloworld",
        "job_name": "ceshi",
        "cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
        "rule": "10s"
        }
res = http_util.post_json("http://127.0.0.1:8200/ruqi/job_add", data=data)
print res.output()

延迟任务

eg: 任务 1 小时后再发起执行

from xlib.util import http_util
from xlib.util import time_util

cur_time = time_util.get_current_time()

# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)

data = {
        "job_trigger": "date",
        "job_id": "helloworld",
        "job_name": "ceshi",
        "cmd": '/demo_api/hello#{"str_info": "%s"}' % "helloworld",
        "rule": exe_time_str
        }

res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()

例子,延时发起星桥任务

import json
from xlib.util import http_util
from xlib.util import time_util

cur_time = time_util.get_current_time()

# 1 hour later
exe_time = cur_time + 3600
exe_time_str = time_util.timestamp2time_str(exe_time)

instance_name = "proxy_396399"
params_dict = {
        "job_namespace": "except_unit",
        "job_type": "unit_delete",
        "job_name": instance_name,
        "job_extra": {
            "unit_id": instance_name
    }
}

data = {
        "job_trigger": "date",
        "job_id": instance_name,
        "job_name": "unit_delete",
        "cmd": "/xingqiao/create_job#{params_str}".format(params_str=json.dumps(params_dict)),
        "rule": exe_time_str
        }

res = http_util.post_json("http://127.0.0.1:4100/ruqi/job_add", data=data)
print res.output()

3.1.2 删除任务

POST /ruqi/job_remove

Headers

Name
Value

Content-Type

application/json

Authorization

Bearer <token>

Body

Name
Type
Description

job_id

string

job_id

Response

{
  "stat": "OK"
}

3.1.3 修改任务

POST

/ruqi/job_modify 'job_trigger' 'job_id' 'job_name' 'cmd' 'rule'

参数说明

job_trigger: job_trigger(cron/interval/date)
job_id     : job id(唯一索引)
job_name   : 用作分类
cmd        : job cmd
rule       :
    date: "2020-12-16 18:03:17"/"2020-12-16 18:05:17.682862"/"now"
    cron: "* * * * * *"
    interval: Xs/Xm/Xh/Xd

3.1.4 暂停任务

POST

/ruqi/job_pause 'job_id'

3.1.5 恢复暂停的任务

POST

/ruqi/job_resume 'job_id'

3.1.6 job 列表

GET /ruqi/job_lis

Headers

Name
Value

Content-Type

application/json

Authorization

Bearer <token>

Params

Name
Type
Description

job_id

string

[可选] job id

job_name

string

[可选] job name

page_index

int

[可选] 页码,如不传,则返回全部数据

page_size

int

[可选] 默认值 15

Response

{
    "data": {
        "total": 72,
        "list": [{
            "next_run_time": "2025-02-07 18:49:38",
            "job_id": "demo_job",
            "c_time": "2024-03-26 21:49:38",
            "cmd": "http://xx",
            "rule": "1200s",
            "job_trigger": "interval",
            "job_name": "demo"
        }]
    },
    "stat": "OK"
}

3.1.7 job 详情

GET /ruqi/job_show

Headers

Name
Value

Content-Type

application/json

Authorization

Bearer <token>

Params

Name
Type
Description

job_id

string

job id

Response

{
    "data": {
        "job_id": "/xunzong/event_action_cron",
        "cmd": "/xunzong/event_action_cron",
        "job_trigger": "interval",
        "job_name": "rdb",
        "rule": "60s",
        "nexttime": "2025-02-07 19:08:47.078474"
    },
    "stat": "OK"
}

3.2 调度历史接口

3.2.1 调度历史列表

GET

/ruqi/get_history  'job_id=None' 'job_name=None' 'page_index=None' 'page_size=15'

参数说明

job_id: job id, job 唯一标识
job_name: job name,有可能重复
page_index: 页码
page_size: 列表中每页多少任务

3.2.2 调度历史清理

POST /ruqi/job_history_clear

添加每天清理一次历史任务,默认保留 180 天,可以新增参数修改保留天数

python test_handler.py /ruqi/job_add_by_line "1d python test_handler.py /ruqi/job_history_clear"

3.3 调度引擎接口

3.3.1 调度器状态

GET /ruqi/scheduler_status

Headers

Name
Value

Content-Type

application/json

Authorization

Bearer <token>

Response

{
    "stat": "OK",
    "data": {
        "wait_seconds": 5.83369,
        "next_wakeup_time": "2025-02-07 19:51:13",
        "check_time": "2025-02-07 19:51:08"
    }
}

3.3.2 调度器唤醒

POST /ruqi/scheduler_wakeup

Headers

Name
Value

Content-Type

application/json

Authorization

Bearer <token>

Response

{
    "stat": "OK"
}

4 FAQ

4.1 MySQL URL 是无效地址

启动 butterfly 失败,<butterfly_project>/__stdout 日志里会有报错信息

4.2 MySQL URL 没有权限

启动 butterfly 成功,定时任务调度器会启动失败,<butterfly_project>/logs/err.log 会报 self._scheduler.start() 失败

5 最佳实践

Last updated