任务编排 XingQiao(星桥)
1 前言
1.1 架构
+---------------+ +---------------+
| xingqiao1 | | xingqiao2 | [星桥连月] 流程编排
+------+--------+ +-------+-------+
| |
| |
+------V-------------------------V-------+
| +------------+ +MQ----------+ |
| | MySQL | | Redis | |
| +------------+ +------^-----+ |
+--------------------------------|-------+
|
|
|
+--------------------------------+--------+
| +-----------------------------------+ |
| | paoding | | [庖丁解牛] 大 key、热 key 分析
| +-----------------------------------+ |
| +-----------------------------------+ |
| | qingnang | | [青囊回春] 服务自愈
| +-----------------------------------+ |
| +-----------------------------------+ |
| | yiye | | [一叶知秋] 异地多活探测
| +-----------------------------------+ |
| +-----------------------------------+ |
| | ... | |
| +-----------------------------------+ |
+-----------------------------------------+由【星桥】控制各个 handler 的调用顺序,【星桥】通过【百川】调用各个 handler,完成编排任务

1.2 场景
2 部署
2.1 建表
详细看建表语句
2.2 配置
<butterfly_project>/conf/config.py
2.3 应用
将 examples/xingqiao 目录移动到 <butterfly_project>/handlers/ 下
将 examples/xingqiao/plugins 目录移动到 <butterfly_project>/ 下
3 编写 taskflow 插件
3.1 插件路径
3.2 插件例子
3.2.1 demo_ping
3.2.1.1 说明
此 demo 仅演示了任务依赖
3.2.1.2 代码
butterfly/handlers/xingqiao/plugins/demo_ping.py
3.2.1.3 发起任务
job_namespace: 命名空间
job_type: 插件注册名
3.2.1.4 dot 效果

3.2.2 demo_hello(workflow 运行时动态传参)
3.2.2.1 说明
当 task 间存在依赖,task2 依赖 task1 的返回值时,task1 需要将结果 {"key": "value"} 返回
3.2.2.2 代码
butterfly/handlers/xingqiao/plugins/demo_hello.py
3.2.2.3 发起任务
job_namespace: job 命名空间
job_type: job 插件
job_extra: job 额外参数
3.2.2.4 dot 效果

3.2.3 demo_hello_params(创建任务时指定 task 参数)
3.2.4 demo_ping_notifier
3.2.4.1 说明
job 状态变更时自动通知功能,通过编写 job_notifier 方法,接收 job_obj 参数即可实现接收 job 的变更信息,比如实现任务完成或者失败时进行回调的功能
3.2.4.2 代码
3.2.5 demo_hello_lock
对 job 增加 lock, 通过在 params_check 方法中设置 job_locks 可对 job 加锁,如果需要加多个锁,则使用逗号分割
3.3 task 传参
4 taskflow 接口
4.1 taskflow job
4.1.1 job 创建
POST /xingqiao/create_job
Headers
x_username
String
标识是某个 username 创建的任务
Request Body
job_namespace*
String
namespace 名
job_type*
String
plugin 名字
job_name
String
name, 具有唯一性
job_extra
Dict/Json
参数信息
job_timeout
Int
超时时间
{"stat": "OK", "job_id": 3}
响应
OK: 正常
ERR_JOB_EXTRA_INVALID: job_extra 类型不是 dict
ERR_JOB_TYPE_NOT_EXISTS: job 插件不存在
例子
job_extra 为空
job_extra 有值
设置用户
4.1.2 job 删除
POST /xingqiao/delete_job
Request Body
job_name
String
job name
job_id
Int
job id
{"stat": "OK"}
job_id 和 job_name 参数任选其一
例子
4.1.3 job 重试
POST /xingqiao/retry_job
Request Body
job_id
Int
job id
job_name
String
job name
{"stat":"OK", "job_id": 25}
例子
curl -d '{"job_id": 1}' "http://127.0.0.1:8079/xingqiao/retry_job"
4.1.4 job 列表
GET /xingqiao/list_jobs
Query Parameters
job_client
String
job_client, client ip
job_reqid
String
butterfly reqid
job_status
String
job 状态 started/finished/failed
job_namespace
String
job 命名空间
ret_stat
String
job 执行结果标识, 比如 OK/ERR
orderBy
String
排序字段(适配 amis)
orderDir
String
排序字段(适配 amis)
orderDir
String
asc/desc(适配 amis)
响应
正常
例子
curl "http://127.0.0.1:8585/xingqiao/list_jobs"
4.1.5 job 详情
GET /xingqiao/get_job_detail
Query Parameters
job_id
Int
job id
job_name
String
job name
参数(job_id 和 job_name 任选其一)
响应
OK
ERR_JOB_NOT_EXIST
例子
curl "http://127.0.0.1:8585/xingqiao/get_job_detail?job_id=2"
4.2 taskflow task
4.2.1 task list
4.2.1.1 作用
查看 job 中 list
4.2.1.2 请求
PATH
/xingqiao/list_tasks
方法
GET
参数(job_id 和 job_name 任选其一) * *
[task_status]: task 状态,waiting/pending/started/finished/failed
4.2.1.3 响应
OK
ERR_JOB_NOT_EXIST
4.2.1.4 例子
curl "http://127.0.0.1:8585/xingqiao/list_tasks?job_id=2"
4.2.2 task 详情
4.2.2.1 作用
获取 task 详情
4.2.2.2 请求
PATH
/xingqiao/get_task
参数
task_id: task_id
4.3.2.3 响应
4.3.2.4 例子
curl "http://127.0.0.1:8585/xingqiao/get_task?task_id=20"
4.3 taskflow job dot 图
4.3.1 获取 job dot 图
4.3.1.1 作用
输出 job dot 图
4.3.1.2 请求
PATH:
/xingqiao/get_graph
参数:
job_id
4.3.1.3 响应

4.3.1.4 例子
4.4 taskflow job report
4.4.1 成功率
4.4.1.1 作用
输出 SLA 曲线
4.4.1.2 请求
PATH
/xingqiao/statistics_success_rate
参数
job_type: workflow job plugin name
[job_tag]: workflow job tag
[exclude_retstat]: 如果有此项则会返回两条 success_rate 数据
rate: rate = (1 - failed_count/total_count)/total_count * 100
rate: rate = (1 - (failed_count-ret_stat)/total_count)/total_count * 100
4.4.1.3 响应
4.4.1.4 例子
4.4.2 tag 分布
4.4.2.1 作用
获取 job tag 分布数据
4.4.2.2 请求
PATH: /xingqiao/statistics_jobtag_ratio
参数
job_type: workflow job plugin name
jobtag_name: job_tag name, eg: region
jobtag_values: job_tag values: eg: bj,nj,gz
[job_tag]: job_tag, eg: flag=redis
4.4.2.3 响应
4.4.2.4 例子
curl "http://IP:PORT/xingqiao/statistics_jobtag_ratio?job_type=unit_migrate&jobtag_name=region&jobtag_values=bj,nj,gz"
4.4.3 错误码分布
4.4.3.1 作用
获取错误码分布
4.4.3.2 请求
PATH: /xingqiao/statistics_retstat_ratio
参数
job_type: workflow job plugin name
[job_tag]: workflow job tag, eg: region=bj
4.4.3.3 响应
4.4.3.4 例子
curl "http://IP:PORT/xingqiao/statistics_retstat_ratio?job_type=unit_migrate"
5 实践
5.1 build_meta 步骤
如在第一步 build_meta 创建后续任务需要的表记录,后续任务仅更新记录(不再创建新记录)
5.2 handler 重试
最后返回 retstat.ERR_TASK_ING 即可,星桥会自动进行重试,直到返回其他状态码(如 retstat.ERR,retstat.OK)
Last updated