三五星桥连月朔,万千灯火彻天街。
通过星桥可以将众多应用进行编排,无需改造现有 Butterfly 应用---【所有过往,皆为序章】
1 前言
1.1 架构
+---------------+ +---------------+
| xingqiao1 | | xingqiao2 | [星桥连月] 流程编排
+------+--------+ +-------+-------+
| |
| |
+------V-------------------------V-------+
| +------------+ +MQ----------+ |
| | MySQL | | Redis | |
| +------------+ +------^-----+ |
+--------------------------------|-------+
|
|
|
+--------------------------------+--------+
| +-----------------------------------+ |
| | paoding | | [庖丁解牛] 大 key、热 key 分析
| +-----------------------------------+ |
| +-----------------------------------+ |
| | qingnang | | [青囊回春] 服务自愈
| +-----------------------------------+ |
| +-----------------------------------+ |
| | yiye | | [一叶知秋] 异地多活探测
| +-----------------------------------+ |
| +-----------------------------------+ |
| | ... | |
| +-----------------------------------+ |
+-----------------------------------------+
由【星桥】控制各个 handler 的调用顺序,【星桥】通过【百川】调用各个 handler,完成编排任务
1.2 场景
1> 直接调用星桥执行工作流
创建 workflow 返回 job id,后续使用 job_id 查询 job 详情
耗时类任务处理(重 io):
(1) 大 key,热 key 分析;
(2) 异地多活探测
流程多
(1) 服务自愈
(2) 集成测试(初始化环境,执行多项测试 step)
2> 集成星桥工作流
比如有独立的订单服务,创建订单后,由 workflow 完成订单处理和更新状态
2 部署
2.1 建表
USE {DATABASE};
CREATE TABLE `workflow_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_client` varchar(64) NOT NULL,
`job_namespace` varchar(64) NOT NULL,
`job_reqid` varchar(64) NOT NULL,
`job_name` varchar(64) NOT NULL,
`job_status` varchar(64) NOT NULL,
`job_type` varchar(64) NOT NULL,
`job_tags` varchar(128) NOT NULL,
`ret_stat` varchar(64) NOT NULL,
`ret_data` text,
`job_cost` double NOT NULL,
`job_extra` varchar(10240) NOT NULL,
`job_timeout` int(11) NOT NULL,
`job_lock` varchar(64) NOT NULL,
`exe_id` int(11) NOT NULL,
`operator` varchar(64) NOT NULL,
`is_valid` tinyint(1) NOT NULL,
`c_time` datetime NOT NULL,
`s_time` datetime NOT NULL,
`e_time` datetime NOT NULL,
`u_time` datetime NOT NULL,
PRIMARY KEY (`job_id`),
UNIQUE KEY `job_job_name` (`job_name`),
KEY `job_job_client` (`job_client`),
KEY `job_job_namespace` (`job_namespace`),
KEY `job_job_reqid` (`job_reqid`),
KEY `job_job_status` (`job_status`),
KEY `job_job_type` (`job_type`),
KEY `job_job_tags` (`job_tags`),
KEY `job_ret_stat` (`ret_stat`),
KEY `job_job_cost` (`job_cost`),
KEY `job_exe_id` (`exe_id`),
KEY `job_operator` (`operator`),
KEY `job_is_valid` (`is_valid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `workflow_task` (
`task_id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_id` bigint(20) NOT NULL,
`task_label` varchar(64) NOT NULL,
`task_reqid` varchar(64) NOT NULL,
`task_cmd` varchar(128) NOT NULL,
`task_params` text,
`task_requires` varchar(1024) NOT NULL,
`task_provides` varchar(1024) NOT NULL,
`task_dependencies` varchar(10240) NOT NULL,
`task_status` varchar(64) NOT NULL,
`task_cost` double NOT NULL,
`task_extra` text NOT NULL,
`task_is_save` tinyint(1) NOT NULL,
`ret_stat` varchar(64) NOT NULL,
`ret_data` text,
`task_retrymax` int(11) NOT NULL,
`task_retrycount` int(11) NOT NULL,
`task_timeout` int(11) NOT NULL,
`c_time` datetime NOT NULL,
`s_time` datetime NOT NULL,
`u_time` datetime NOT NULL,
PRIMARY KEY (`task_id`),
KEY `task_job_id` (`job_id`),
KEY `task_task_label` (`task_label`),
KEY `task_task_reqid` (`task_reqid`),
KEY `task_task_cmd` (`task_cmd`),
KEY `task_task_status` (`task_status`),
KEY `task_task_cost` (`task_cost`),
KEY `task_ret_stat` (`ret_stat`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.2 配置
<butterfly_project>/conf/config.py
...
DATABASES = {
"default": "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"
}
...
CACHES = {
"default": "redis://@localhost:6379/0",
"baichuan": "redis://@<ip>:<port>/0?socket_timeout=2&socket_connect_timeout=0.5&retry_on_timeout=False",
}
2.3 应用
将 examples/xingqiao 目录移动到 <butterfly_project>/handlers/ 下
将 examples/xingqiao/plugins 目录移动到 <butterfly_project>/ 下
3 编写 taskflow 插件
3.1 插件路径
butterfly/handlers/xingqiao/plugins
3.2 插件例子
3.2.1 demo_ping
3.2.1.1 说明
此 demo 仅演示了任务依赖
(1) 先执行 task1,task1 完成后,才会执行 task2
(2) 插件注册名为 ping
3.2.1.2 代码
butterfly/handlers/xingqiao/plugins/demo_ping.py
from xlib.taskflow.taskflow import WorkflowRunner
class Ping(WorkflowRunner):
"""
a workflow is defined by overloading the WorkflowRunner.workflow() method:
"""
def workflow(self):
"""
先执行 task1, 再执行 task2
"""
self.add_task("task1", "/demo_api/ping")
# 通过 dependencies 描述依赖关系
self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
def setup(app):
"""
插件注册函数
"""
app.register_formatter('ping', Ping)
3.2.1.3 发起任务
$curl -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8585/xingqiao/create_job"
3.2.1.4 dot 效果
3.2.2 demo_hello(workflow 运行时动态传参)
3.2.2.1 说明
(1) 参数传入
* 普通参数: 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
* 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
以 all_taskdata 传给此 task
(2) 保存结果到 job ret_data 中
场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
在此 task 中进行汇聚下结果,然后进行保存数据
当 task 间存在依赖,task2 依赖 task1 的返回值时,task1 需要将结果 {"key": "value"} 返回
当 task2 依赖 task1 的 new_unitmatrix 结果
# 正确
return retstat.OK, {"new_unitmatrix": "xxx"}
# 错误
return retstat.OK, {"data": {"new_unitmatrix": "xxx"}}
3.2.2.2 代码
butterfly/handlers/xingqiao/plugins/demo_hello.py
from xlib.taskflow.taskflow import WorkflowRunner
class Hello(WorkflowRunner):
"""
a workflow is defined by overloading the WorkflowRunner.workflow() method:
"""
def params_check(self):
"""
params_check, self.job_extra
"""
if "str_info" not in self.job_extra.keys():
raise Exception("job_extra not have str_info")
def workflow(self):
"""
学习点:
(1) 参数传入
* 普通参数: 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
* 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
以 all_taskdata 传给此 task
(2) 保存结果到 job ret_data 中
场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
在此 task 中进行汇聚下结果,然后进行保存数据
执行顺序:
执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
task1 执行完成后,执行 task2
task1 和 task2 都执行完成后,执行 task3, task3 将结果保存到 job ret_data 中
"""
self.add_task("task1", "/demo_api/hello", requires=["str_info"])
self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
self.add_task(
"task3",
"/demo_api/collect",
requires=["all_taskdata"],
dependencies=[
"task1",
"task2"],
is_save=True)
def setup(app):
"""
插件注册函数
"""
app.register_formatter('hello', Hello)
3.2.2.3 发起任务
curl -d '{"job_namespace":"ceshi_namespace", "job_type":"hello", "job_extra":{"str_info": "ceshi"}}' "http://127.0.0.1:8585/xingqiao/create_job"
3.2.2.4 dot 效果
3.2.3 demo_hello_params(创建任务时指定 task 参数)
from xlib.taskflow.taskflow import WorkflowRunner
class HelloParams(WorkflowRunner):
"""
a workflow is defined by overloading the WorkflowRunner.workflow() method:
"""
def params_check(self):
"""
params_check
"""
if "str_info" not in self.job_extra.keys():
raise Exception("job_extra not have str_info")
def workflow(self):
"""
学习点:
(1) 参数传入
* 普通参数: 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
* 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
以 all_taskdata 传给此 task
(2) 保存结果到 job ret_data 中
场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
在此 task 中进行汇聚下结果,然后进行保存数据
执行顺序:
执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
task1 执行完成后,执行 task2
task1 和 task2 都执行完成后,执行 task3, task3 将结果保存到 job ret_data 中
"""
self.add_task("task1", "/demo_api/hello", params={"str_info": "hello world"})
self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
self.add_task(
"task3",
"/demo_api/collect",
requires=["all_taskdata"],
dependencies=[
"task1",
"task2"],
is_save=True)
def setup(app):
"""
插件注册函数
"""
app.register_formatter('hello_params', HelloParams)
3.2.4 demo_ping_notifier
3.2.4.1 说明
job 状态变更时自动通知功能,通过编写 job_notifier 方法,接收 job_obj 参数即可实现接收 job 的变更信息,比如实现任务完成或者失败时进行回调的功能
3.2.4.2 代码
import logging
from xlib.taskflow.taskflow import WorkflowRunner
class Ping(WorkflowRunner):
"""
a workflow is defined by overloading the WorkflowRunner.workflow() method:
"""
def workflow(self):
"""
先执行 task1, 再执行 task2
"""
self.add_task("task1", "/demo_api/ping")
# 通过 dependencies 描述依赖关系
self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
@classmethod
def job_notifier(cls, job_obj):
"""
job_obj:
job_id: (int)job_id
job_client
job_namespace
job_reqid
job_name: job_name
job_status: started/finished/failed
job_type
job_tags
ret_stat
ret_data
job_cost
job_extra: (json)
job_timeout
job_lock
exe_id
operator
is_valid
c_time
s_time
e_time
"""
logging.info("job_id={job_id} job_name={job_name} job_status={job_status}".format(
job_id=job_obj.job_id,
job_name=job_obj.job_name,
job_status=job_obj.job_status
))
def setup(app):
"""
插件注册函数
"""
app.register_formatter('ping_notifier', Ping)
3.2.5 demo_hello_lock
对 job 增加 lock, 通过在 params_check 方法中设置 job_locks 可对 job 加锁,如果需要加多个锁,则使用逗号分割
如 job_locks = []
job_locks.append("lock1")
job_locks.append("lock2")
self.job_locks = ",".join(job_locks)
from xlib.taskflow.taskflow import WorkflowRunner
from xlib.util import jsonschema
class HelloLock(WorkflowRunner):
"""
a workflow is defined by overloading the WorkflowRunner.workflow() method:
"""
def params_check(self):
"""
params_check
"""
schema = {
"type": "object",
"properties": {
"str_info": {
"type": "string",
"pattern": "[A-Za-z]+$"
},
"job_timeout": {
"type": "number"
},
"job_locks": {
"type": "string",
}
},
"required": ["str_info"]
}
jsonschema.validate(instance=self.job_extra, schema=schema)
# job_timeout(s)
if "job_timeout" in self.job_extra:
self.job_timeout = self.job_extra["job_timeout"]
else:
self.job_timeout = 1800
# job_locks
if "job_locks" in self.job_extra:
self.job_locks = self.job_extra["job_locks"]
else:
self.job_locks = "ceshi"
def workflow(self):
"""
学习点:
(1) 参数传入
* 普通参数: 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
* 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
以 all_taskdata 传给此 task
(2) 保存结果到 job ret_data 中
场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
在此 task 中进行汇聚下结果,然后进行保存数据
执行顺序:
执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
task1 执行完成后,执行 task2
"""
self.add_task("task1", "/demo_api/hello", requires=["str_info"])
self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
def setup(app):
"""
插件注册函数
"""
app.register_formatter('hello_lock', HelloLock)
3.3 task 传参
两种方式生成:
(1) 预设 task_params: 创建 task 任务时预置的 task 参数值(使用 params 参数)
(2) 动态生成 self.task_requires_dict:(使用 requires & extra 参数)
1> 在 job_extra 中获取参数数据
2> 在 task_extra 中获取参数数据
3> 若存在依赖任务:在依赖的 task 的 ret_data 中获取参数数据
4> 若有 all_taskdata 依赖参数,则将所有依赖的 ret_data 放到 all_taskdata 列表中
4 taskflow 接口
4.1 taskflow job
4.1.1 job 创建
POST
/xingqiao/create_job
Request Body
{"stat": "OK", "job_id": 3}
响应
OK: 正常
{"stat": "OK", "job_id": 3}
ERR_JOB_EXTRA_INVALID: job_extra 类型不是 dict
{"stat": "ERR_JOB_EXTRA_INVALID"}
ERR_JOB_TYPE_NOT_EXISTS: job 插件不存在
{"stat": "ERR_JOB_TYPE_NOT_EXISTS"}
例子
job_extra 为空
$ curl -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 3}
job_extra 有值
$ curl -d '{"job_namespace":"ceshi_namespace", "job_type":"hello", "job_extra":{"str_info": "ceshi"}}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 25}
设置用户
$ curl -H "x_username:wangbin34" -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 35}
4.1.2 job 删除
POST
/xingqiao/delete_job
Request Body
job_id 和 job_name 参数任选其一
例子
$ curl "http://127.0.0.1:8007/xingqiao/delete_job?job_id=2"
{"stat": "OK"}
4.1.3 job 重试
POST
/xingqiao/retry_job
Request Body
{"stat":"OK", "job_id": 25}
例子
curl -d '{"job_id": 1}' "http://127.0.0.1:8079/xingqiao/retry_job"
{"stat": "ERR_JOB_STATUS_NOT_FAILED", "job_id": 1}
4.1.4 job 列表
GET
/xingqiao/list_jobs
Query Parameters
job 状态 started/finished/failed
响应
正常
{
"data" : {
"total" : 2,
"list" : [
{
"job_extra" : "{}",
"job_id" : 2,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:04",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_8E5244781FF8F2E6",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:04"
},
{
"job_extra" : "{}",
"job_id" : 1,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:01",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_C8BD00F3E59CC0A1",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:01"
}
]
},
"stat" : "OK"
}
例子
curl "http://127.0.0.1:8585/xingqiao/list_jobs"
{
"data" : {
"total" : 2,
"list" : [
{
"job_extra" : "{}",
"job_id" : 2,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:04",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_8E5244781FF8F2E6",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:04"
},
{
"job_extra" : "{}",
"job_id" : 1,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:01",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_C8BD00F3E59CC0A1",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:01"
}
]
},
"stat" : "OK"
}
4.1.5 job 详情
GET
/xingqiao/get_job_detail
Query Parameters
参数(job_id 和 job_name 任选其一)
响应
OK
{
"data" : {
"job_extra" : "{}",
"job_id" : 2,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:04",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_8E5244781FF8F2E6",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:04"
},
"stat" : "OK"
}
ERR_JOB_NOT_EXIST
{
"data" : {},
"stat" : "ERR_JOB_NOT_EXIST"
}
例子
curl "http://127.0.0.1:8585/xingqiao/get_job_detail?job_id=2"
{
"data" : {
"job_extra" : "{}",
"job_id" : 2,
"job_status" : "finished",
"ret_stat" : "OK",
"u_time" : "2021-06-19 12:33:04",
"job_namespace" : "ceshi_namespace",
"job_reqid" : "DEV_8E5244781FF8F2E6",
"job_type" : "HelloWorld",
"job_name" : "hotkey",
"ret_data" : null,
"job_cost" : null,
"c_time" : "2021-06-19 12:33:04"
},
"stat" : "OK"
}
4.2 taskflow task
4.2.1 task list
4.2.1.1 作用
查看 job 中 list
4.2.1.2 请求
参数(job_id 和 job_name 任选其一) * *
[task_status]: task 状态,waiting/pending/started/finished/failed
4.2.1.3 响应
OK
{
"data" : {
"list" : [
{
"task_cmd" : "/demo_api/ping",
"task_reqid" : "4737bc6f-eb3c-4c17-a303-e0cacfb0f82c",
"task_extra" : "{}",
"u_time" : "2021-06-19 12:33:04",
"task_id" : 3,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "easy_task1",
"job_id" : 2,
"task_status" : "finished",
"task_dependencies" : "",
"ret_stat" : "OK",
"task_requires" : "",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"c_time" : "2021-06-19 12:33:04"
},
{
"task_cmd" : "/demo_api/ping",
"task_reqid" : "28876492-d141-4c69-a5ed-8c03e228fb1c",
"task_extra" : "{}",
"u_time" : "2021-06-19 12:33:04",
"task_id" : 4,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "easy_task2",
"job_id" : 2,
"task_status" : "finished",
"task_dependencies" : "easy_task1",
"ret_stat" : "OK",
"task_requires" : "",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"c_time" : "2021-06-19 12:33:04"
}
]
},
"stat" : "OK"
}
ERR_JOB_NOT_EXIST
{
"data" : {},
"stat" : "ERR_JOB_NOT_EXIST"
}
4.2.1.4 例子
curl "http://127.0.0.1:8585/xingqiao/list_tasks?job_id=2"
{
"data" : {
"list" : [
{
"task_cmd" : "/demo_api/ping",
"task_reqid" : "4737bc6f-eb3c-4c17-a303-e0cacfb0f82c",
"task_extra" : "{}",
"u_time" : "2021-06-19 12:33:04",
"task_id" : 3,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "easy_task1",
"job_id" : 2,
"task_status" : "finished",
"task_dependencies" : "",
"ret_stat" : "OK",
"task_requires" : "",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"c_time" : "2021-06-19 12:33:04"
},
{
"task_cmd" : "/demo_api/ping",
"task_reqid" : "28876492-d141-4c69-a5ed-8c03e228fb1c",
"task_extra" : "{}",
"u_time" : "2021-06-19 12:33:04",
"task_id" : 4,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "easy_task2",
"job_id" : 2,
"task_status" : "finished",
"task_dependencies" : "easy_task1",
"ret_stat" : "OK",
"task_requires" : "",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"c_time" : "2021-06-19 12:33:04"
}
]
},
"stat" : "OK"
}
4.2.2 task 详情
4.2.2.1 作用
获取 task 详情
4.2.2.2 请求
4.3.2.3 响应
{
"data" : {
"task_params" : null,
"task_cmd" : "/qingnang_unit/u_rm",
"task_is_save" : false,
"task_reqid" : "",
"task_extra" : "{}",
"u_time" : "2021-08-27 02:04:59",
"task_id" : 20,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "u_rm",
"job_id" : 4,
"task_status" : "waiting",
"task_dependencies" : "u_del",
"ret_stat" : "OK",
"task_requires" : "unit_id",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"task_timeout" : 0,
"c_time" : "2021-08-27 02:04:59"
},
"stat" : "OK"
}
4.3.2.4 例子
curl "http://127.0.0.1:8585/xingqiao/get_task?task_id=20"
{
"data" : {
"task_params" : null,
"task_cmd" : "/qingnang_unit/u_rm",
"task_is_save" : false,
"task_reqid" : "",
"task_extra" : "{}",
"u_time" : "2021-08-27 02:04:59",
"task_id" : 20,
"task_cost" : 0,
"ret_data" : null,
"task_label" : "u_rm",
"job_id" : 4,
"task_status" : "waiting",
"task_dependencies" : "u_del",
"ret_stat" : "OK",
"task_requires" : "unit_id",
"task_retrymax" : 0,
"task_provides" : "",
"task_retrycount" : 0,
"task_timeout" : 0,
"c_time" : "2021-08-27 02:04:59"
},
"stat" : "OK"
}
4.3 taskflow job dot 图
4.3.1 获取 job dot 图
4.3.1.1 作用
输出 job dot 图
4.3.1.2 请求
4.3.1.3 响应
4.3.1.4 例子
http://XXXX:XXXX/xingqiao/get_graph?job_id=9
4.4 taskflow job report
4.4.1 成功率
4.4.1.1 作用
输出 SLA 曲线
4.4.1.2 请求
PATH
/xingqiao/statistics_success_rate
参数
job_type: workflow job plugin name
[job_tag]: workflow job tag
[exclude_retstat]: 如果有此项则会返回两条 success_rate 数据
rate: rate = (1 - failed_count/total_count)/total_count * 100
rate: rate = (1 - (failed_count-ret_stat)/total_count)/total_count * 100
4.4.1.3 响应
{
'data': {
'title': {'text': 'success rate'},
'series': [{'data': [100], 'type': 'line', 'name': 'rate', 'smooth': True}],
'yAxis': {},
'tooltip': {},
'xAxis': {'data': [35]},
'legend': {}
}
}
4.4.1.4 例子
curl "http://IP:PORT/xingqiao/statistics_success_rate?job_type=unit_migrate"
4.4.2 tag 分布
4.4.2.1 作用
获取 job tag 分布数据
4.4.2.2 请求
PATH: /xingqiao/statistics_jobtag_ratio
参数
job_type: workflow job plugin name
jobtag_name: job_tag name, eg: region
jobtag_values: job_tag values: eg: bj,nj,gz
[job_tag]: job_tag, eg: flag=redis
4.4.2.3 响应
{
'data': {
'series': [
{
'data': [{'name': 'bj', 'value': 0}, {'name': 'nj', 'value': 0}],
'type': 'pie',
'name': 'region'
}
],
'legend': {},
'tooltip': {'trigger': 'item', 'formatter': '{a} {b}:{c} ({d}%)'},
'title': {'text': 'region ratio'}
}
}
4.4.2.4 例子
curl "http://IP:PORT/xingqiao/statistics_jobtag_ratio?job_type=unit_migrate&jobtag_name=region&jobtag_values=bj,nj,gz"
{
"data" : {
"tooltip" : {
"formatter" : "{a} {b}:{c} ({d}%)",
"trigger" : "item"
},
"legend" : {},
"title" : {
"text" : "region ratio"
},
"series" : [
{
"name" : "region",
"type" : "pie",
"data" : [
{
"value" : 465,
"name" : "bj"
},
{
"value" : 33,
"name" : "nj"
},
{
"value" : 23,
"name" : "gz"
}
]
}
]
},
"stat" : "OK"
}
4.4.3 错误码分布
4.4.3.1 作用
获取错误码分布
4.4.3.2 请求
PATH: /xingqiao/statistics_retstat_ratio
参数
job_type: workflow job plugin name
[job_tag]: workflow job tag, eg: region=bj
4.4.3.3 响应
{
'data': {
'series': [{'data': [{'name': u'OK', 'value': 2}], 'type': 'pie', 'name': 'retstat'}],
'legend': {},
'tooltip': {
'trigger': 'item',
# a=series["name"], b=series[x]["name"]
# c=series[x]["value"], d=series[x]["value"]/total(series[x]["value"])
'formatter': '{a} {b}:{c} ({d}%)'
},
'title': {'text': 'retstat ratio'}
}
}
4.4.3.4 例子
curl "http://IP:PORT/xingqiao/statistics_retstat_ratio?job_type=unit_migrate"
{
"success" : true,
"data" : {
"tooltip" : {
"formatter" : "{a} {b}:{c} ({d}%)",
"trigger" : "item"
},
"legend" : {},
"title" : {
"text" : "retstat ratio"
},
"series" : [
{
"name" : "retstat",
"type" : "pie",
"data" : [
{
"value" : 26,
"name" : "ERR_JOB_EXE_TIMEOUT"
},
{
"value" : 3,
"name" : "ERR_SERVER_EXCEPTION"
},
{
"value" : 1022,
"name" : "OK"
}
]
}
]
},
"message" : "OK",
"stat" : "OK"
}