任务编排 XingQiao(星桥)

三五星桥连月朔,万千灯火彻天街。

通过星桥可以将众多应用进行编排,无需改造现有 Butterfly 应用---【所有过往,皆为序章】

1 前言

1.1 架构

+---------------+        +---------------+
|   xingqiao1   |        |    xingqiao2  |  [星桥连月] 流程编排
+------+--------+        +-------+-------+
       |                         |
       |                         |
+------V-------------------------V-------+
| +------------+          +MQ----------+ |
| |   MySQL    |          |   Redis    | |
| +------------+          +------^-----+ |
+--------------------------------|-------+
                                 |
                                 |
                                 |
+--------------------------------+--------+
|  +-----------------------------------+  |
|  |              paoding              |  | [庖丁解牛]  key、热 key 分析
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              qingnang             |  | [青囊回春] 服务自愈
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              yiye                 |  | [一叶知秋] 异地多活探测
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              ...                  |  |
|  +-----------------------------------+  |
+-----------------------------------------+

由【星桥】控制各个 handler 的调用顺序,【星桥】通过【百川】调用各个 handler,完成编排任务

1.2 场景

2 部署

2.1 建表

详细看建表语句

2.2 配置

<butterfly_project>/conf/config.py

2.3 应用

  • 将 examples/xingqiao 目录移动到 <butterfly_project>/handlers/ 下

  • 将 examples/xingqiao/plugins 目录移动到 <butterfly_project>/ 下

3 编写 taskflow 插件

3.1 插件路径

3.2 插件例子

3.2.1 demo_ping

3.2.1.1 说明

此 demo 仅演示了任务依赖

3.2.1.2 代码

butterfly/handlers/xingqiao/plugins/demo_ping.py

3.2.1.3 发起任务

  • job_namespace: 命名空间

  • job_type: 插件注册名

3.2.1.4 dot 效果

demo_ping

3.2.2 demo_hello(workflow 运行时动态传参)

3.2.2.1 说明

当 task 间存在依赖,task2 依赖 task1 的返回值时,task1 需要将结果 {"key": "value"} 返回

3.2.2.2 代码

butterfly/handlers/xingqiao/plugins/demo_hello.py

3.2.2.3 发起任务

  • job_namespace: job 命名空间

  • job_type: job 插件

  • job_extra: job 额外参数

3.2.2.4 dot 效果

demo_hello

3.2.3 demo_hello_params(创建任务时指定 task 参数)

3.2.4 demo_ping_notifier

3.2.4.1 说明

job 状态变更时自动通知功能,通过编写 job_notifier 方法,接收 job_obj 参数即可实现接收 job 的变更信息,比如实现任务完成或者失败时进行回调的功能

3.2.4.2 代码

3.2.5 demo_hello_lock

对 job 增加 lock, 通过在 params_check 方法中设置 job_locks 可对 job 加锁,如果需要加多个锁,则使用逗号分割

3.3 task 传参

4 taskflow 接口

4.1 taskflow job

4.1.1 job 创建

POST /xingqiao/create_job

Headers

Name
Type
Description

x_username

String

标识是某个 username 创建的任务

Request Body

Name
Type
Description

job_namespace*

String

namespace 名

job_type*

String

plugin 名字

job_name

String

name, 具有唯一性

job_extra

Dict/Json

参数信息

job_timeout

Int

超时时间

{"stat": "OK", "job_id": 3}

响应

OK: 正常

ERR_JOB_EXTRA_INVALID: job_extra 类型不是 dict

ERR_JOB_TYPE_NOT_EXISTS: job 插件不存在

例子

job_extra 为空

job_extra 有值

设置用户

4.1.2 job 删除

POST /xingqiao/delete_job

Request Body

Name
Type
Description

job_name

String

job name

job_id

Int

job id

{"stat": "OK"}

job_id 和 job_name 参数任选其一

例子

4.1.3 job 重试

POST /xingqiao/retry_job

Request Body

Name
Type
Description

job_id

Int

job id

job_name

String

job name

{"stat":"OK", "job_id": 25}

例子

curl -d '{"job_id": 1}' "http://127.0.0.1:8079/xingqiao/retry_job"

4.1.4 job 列表

GET /xingqiao/list_jobs

Query Parameters

Name
Type
Description

job_client

String

job_client, client ip

job_reqid

String

butterfly reqid

job_status

String

job 状态 started/finished/failed

job_namespace

String

job 命名空间

ret_stat

String

job 执行结果标识, 比如 OK/ERR

orderBy

String

排序字段(适配 amis)

orderDir

String

排序字段(适配 amis)

orderDir

String

asc/desc(适配 amis)

响应

正常

例子

curl "http://127.0.0.1:8585/xingqiao/list_jobs"

4.1.5 job 详情

GET /xingqiao/get_job_detail

Query Parameters

Name
Type
Description

job_id

Int

job id

job_name

String

job name

参数(job_id 和 job_name 任选其一)

响应

OK

ERR_JOB_NOT_EXIST

例子

curl "http://127.0.0.1:8585/xingqiao/get_job_detail?job_id=2"

4.2 taskflow task

4.2.1 task list

4.2.1.1 作用

查看 job 中 list

4.2.1.2 请求

  • PATH

    • /xingqiao/list_tasks

  • 方法

    • GET

  • 参数(job_id 和 job_name 任选其一) * *

    • [task_status]: task 状态,waiting/pending/started/finished/failed

4.2.1.3 响应

OK

ERR_JOB_NOT_EXIST

4.2.1.4 例子

curl "http://127.0.0.1:8585/xingqiao/list_tasks?job_id=2"

4.2.2 task 详情

4.2.2.1 作用

获取 task 详情

4.2.2.2 请求

  • PATH

    • /xingqiao/get_task

  • 参数

    • task_id: task_id

4.3.2.3 响应

4.3.2.4 例子

curl "http://127.0.0.1:8585/xingqiao/get_task?task_id=20"

4.3 taskflow job dot 图

4.3.1 获取 job dot 图

4.3.1.1 作用

输出 job dot 图

4.3.1.2 请求

  • PATH:

    • /xingqiao/get_graph

  • 参数:

    • job_id

4.3.1.3 响应

demo_hello

4.3.1.4 例子

4.4 taskflow job report

4.4.1 成功率

4.4.1.1 作用

输出 SLA 曲线

4.4.1.2 请求

  • PATH

    • /xingqiao/statistics_success_rate

  • 参数

    • job_type: workflow job plugin name

    • [job_tag]: workflow job tag

    • [exclude_retstat]: 如果有此项则会返回两条 success_rate 数据

      • rate: rate = (1 - failed_count/total_count)/total_count * 100

      • rate: rate = (1 - (failed_count-ret_stat)/total_count)/total_count * 100

4.4.1.3 响应

4.4.1.4 例子

4.4.2 tag 分布

4.4.2.1 作用

获取 job tag 分布数据

4.4.2.2 请求

  • PATH: /xingqiao/statistics_jobtag_ratio

  • 参数

    • job_type: workflow job plugin name

    • jobtag_name: job_tag name, eg: region

    • jobtag_values: job_tag values: eg: bj,nj,gz

    • [job_tag]: job_tag, eg: flag=redis

4.4.2.3 响应

4.4.2.4 例子

curl "http://IP:PORT/xingqiao/statistics_jobtag_ratio?job_type=unit_migrate&jobtag_name=region&jobtag_values=bj,nj,gz"

4.4.3 错误码分布

4.4.3.1 作用

获取错误码分布

4.4.3.2 请求

  • PATH: /xingqiao/statistics_retstat_ratio

  • 参数

    • job_type: workflow job plugin name

    • [job_tag]: workflow job tag, eg: region=bj

4.4.3.3 响应

4.4.3.4 例子

curl "http://IP:PORT/xingqiao/statistics_retstat_ratio?job_type=unit_migrate"

5 实践

5.1 build_meta 步骤

如在第一步 build_meta 创建后续任务需要的表记录,后续任务仅更新记录(不再创建新记录)

5.2 handler 重试

最后返回 retstat.ERR_TASK_ING 即可,星桥会自动进行重试,直到返回其他状态码(如 retstat.ERR,retstat.OK)

Last updated