🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • 1 前言
  • 1.1 架构
  • 1.2 场景
  • 2 部署
  • 2.1 建表
  • 2.2 配置
  • 2.3 应用
  • 3 编写 taskflow 插件
  • 3.1 插件路径
  • 3.2 插件例子
  • 3.3 task 传参
  • 4 taskflow 接口
  • 4.1 taskflow job
  • 4.2 taskflow task
  • 4.3 taskflow job dot 图
  • 4.4 taskflow job report
  • 5 实践
  • 5.1 build_meta 步骤
  1. 四 应用(通用服务)

任务编排 XingQiao(星桥)

三五星桥连月朔,万千灯火彻天街。

通过星桥可以将众多应用进行编排,无需改造现有 Butterfly 应用---【所有过往,皆为序章】

1 前言

1.1 架构

+---------------+        +---------------+
|   xingqiao1   |        |    xingqiao2  |  [星桥连月] 流程编排
+------+--------+        +-------+-------+
       |                         |
       |                         |
+------V-------------------------V-------+
| +------------+          +MQ----------+ |
| |   MySQL    |          |   Redis    | |
| +------------+          +------^-----+ |
+--------------------------------|-------+
                                 |
                                 |
                                 |
+--------------------------------+--------+
|  +-----------------------------------+  |
|  |              paoding              |  | [庖丁解牛] 大 key、热 key 分析
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              qingnang             |  | [青囊回春] 服务自愈
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              yiye                 |  | [一叶知秋] 异地多活探测
|  +-----------------------------------+  |
|  +-----------------------------------+  |
|  |              ...                  |  |
|  +-----------------------------------+  |
+-----------------------------------------+

由【星桥】控制各个 handler 的调用顺序,【星桥】通过【百川】调用各个 handler,完成编排任务

1.2 场景

1> 直接调用星桥执行工作流
   创建 workflow 返回 job id,后续使用 job_id 查询 job 详情
     耗时类任务处理(重 io):
     (1) 大 key,热 key 分析;
     (2) 异地多活探测
     流程多
     (1) 服务自愈
     (2) 集成测试(初始化环境,执行多项测试 step)
2> 集成星桥工作流
   比如有独立的订单服务,创建订单后,由 workflow 完成订单处理和更新状态

2 部署

2.1 建表

USE {DATABASE};

CREATE TABLE `workflow_job` (
  `job_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_client` varchar(64) NOT NULL,
  `job_namespace` varchar(64) NOT NULL,
  `job_reqid` varchar(64) NOT NULL,
  `job_name` varchar(64) NOT NULL,
  `job_status` varchar(64) NOT NULL,
  `job_type` varchar(64) NOT NULL,
  `job_tags` varchar(128) NOT NULL,
  `ret_stat` varchar(64) NOT NULL,
  `ret_data` text,
  `job_cost` double NOT NULL,
  `job_extra` varchar(10240) NOT NULL,
  `job_timeout` int(11) NOT NULL,
  `job_lock` varchar(64) NOT NULL,
  `exe_id` int(11) NOT NULL,
  `operator` varchar(64) NOT NULL,
  `is_valid` tinyint(1) NOT NULL,
  `c_time` datetime NOT NULL,
  `s_time` datetime NOT NULL,
  `e_time` datetime NOT NULL,
  `u_time` datetime NOT NULL,
  PRIMARY KEY (`job_id`),
  UNIQUE KEY `job_job_name` (`job_name`),
  KEY `job_job_client` (`job_client`),
  KEY `job_job_namespace` (`job_namespace`),
  KEY `job_job_reqid` (`job_reqid`),
  KEY `job_job_status` (`job_status`),
  KEY `job_job_type` (`job_type`),
  KEY `job_job_tags` (`job_tags`),
  KEY `job_ret_stat` (`ret_stat`),
  KEY `job_job_cost` (`job_cost`),
  KEY `job_exe_id` (`exe_id`),
  KEY `job_operator` (`operator`),
  KEY `job_is_valid` (`is_valid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `workflow_task` (
  `task_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_id` bigint(20) NOT NULL,
  `task_label` varchar(64) NOT NULL,
  `task_reqid` varchar(64) NOT NULL,
  `task_cmd` varchar(128) NOT NULL,
  `task_params` text,
  `task_requires` varchar(1024) NOT NULL,
  `task_provides` varchar(1024) NOT NULL,
  `task_dependencies` varchar(10240) NOT NULL,
  `task_status` varchar(64) NOT NULL,
  `task_cost` double NOT NULL,
  `task_extra` text NOT NULL,
  `task_is_save` tinyint(1) NOT NULL,
  `ret_stat` varchar(64) NOT NULL,
  `ret_data` text,
  `task_retrymax` int(11) NOT NULL,
  `task_retrycount` int(11) NOT NULL,
  `task_timeout` int(11) NOT NULL,
  `c_time` datetime NOT NULL,
  `s_time` datetime NOT NULL,
  `u_time` datetime NOT NULL,
  PRIMARY KEY (`task_id`),
  KEY `task_job_id` (`job_id`),
  KEY `task_task_label` (`task_label`),
  KEY `task_task_reqid` (`task_reqid`),
  KEY `task_task_cmd` (`task_cmd`),
  KEY `task_task_status` (`task_status`),
  KEY `task_task_cost` (`task_cost`),
  KEY `task_ret_stat` (`ret_stat`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.2 配置

<butterfly_project>/conf/config.py

...
DATABASES = {
    "default": "mysql+retrypool://<user>:<password>@<ip>:<port>/<database>?max_connections=300&stale_timeout=300"
}
...
CACHES = {
    "default": "redis://@localhost:6379/0",
    "baichuan": "redis://@<ip>:<port>/0?socket_timeout=2&socket_connect_timeout=0.5&retry_on_timeout=False",
}

2.3 应用

  • 将 examples/xingqiao 目录移动到 <butterfly_project>/handlers/ 下

  • 将 examples/xingqiao/plugins 目录移动到 <butterfly_project>/ 下

3 编写 taskflow 插件

3.1 插件路径

butterfly/handlers/xingqiao/plugins

3.2 插件例子

3.2.1 demo_ping

3.2.1.1 说明

此 demo 仅演示了任务依赖

(1) 先执行 task1,task1 完成后,才会执行 task2
(2) 插件注册名为 ping

3.2.1.2 代码

butterfly/handlers/xingqiao/plugins/demo_ping.py

from xlib.taskflow.taskflow import WorkflowRunner


class Ping(WorkflowRunner):
    """
    a workflow is defined by overloading the WorkflowRunner.workflow() method:
    """

    def workflow(self):
        """
        先执行 task1, 再执行 task2
        """
        self.add_task("task1", "/demo_api/ping")
        # 通过 dependencies 描述依赖关系
        self.add_task("task2", "/demo_api/ping", dependencies=["task1"])


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter('ping', Ping)

3.2.1.3 发起任务

$curl -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8585/xingqiao/create_job"
  • job_namespace: 命名空间

  • job_type: 插件注册名

3.2.1.4 dot 效果

3.2.2 demo_hello(workflow 运行时动态传参)

3.2.2.1 说明

(1) 参数传入
    * 普通参数:  参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
    * 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
      以 all_taskdata 传给此 task
(2) 保存结果到 job ret_data 中
    场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
          如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
          在此 task 中进行汇聚下结果,然后进行保存数据

当 task 间存在依赖,task2 依赖 task1 的返回值时,task1 需要将结果 {"key": "value"} 返回

当 task2 依赖 task1 的 new_unitmatrix 结果
# 正确
return retstat.OK, {"new_unitmatrix": "xxx"}

# 错误
return retstat.OK, {"data": {"new_unitmatrix": "xxx"}}

3.2.2.2 代码

butterfly/handlers/xingqiao/plugins/demo_hello.py

from xlib.taskflow.taskflow import WorkflowRunner


class Hello(WorkflowRunner):
    """
    a workflow is defined by overloading the WorkflowRunner.workflow() method:
    """

    def params_check(self):
        """
        params_check, self.job_extra
        """
        if "str_info" not in self.job_extra.keys():
            raise Exception("job_extra not have str_info")

    def workflow(self):
        """
        学习点:
            (1) 参数传入
                * 普通参数:  参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
                * 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
                  以 all_taskdata 传给此 task
            (2) 保存结果到 job ret_data 中
                场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
                      如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
                      在此 task 中进行汇聚下结果,然后进行保存数据

        执行顺序:
            执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
            task1 执行完成后,执行 task2
            task1 和 task2 都执行完成后,执行 task3, task3 将结果保存到 job ret_data 中
        """
        self.add_task("task1", "/demo_api/hello", requires=["str_info"])
        self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
        self.add_task(
            "task3",
            "/demo_api/collect",
            requires=["all_taskdata"],
            dependencies=[
                "task1",
                "task2"],
            is_save=True)


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter('hello', Hello)

3.2.2.3 发起任务

curl -d '{"job_namespace":"ceshi_namespace", "job_type":"hello", "job_extra":{"str_info": "ceshi"}}' "http://127.0.0.1:8585/xingqiao/create_job"
  • job_namespace: job 命名空间

  • job_type: job 插件

  • job_extra: job 额外参数

3.2.2.4 dot 效果

3.2.3 demo_hello_params(创建任务时指定 task 参数)

from xlib.taskflow.taskflow import WorkflowRunner


class HelloParams(WorkflowRunner):
    """
    a workflow is defined by overloading the WorkflowRunner.workflow() method:
    """

    def params_check(self):
        """
        params_check
        """
        if "str_info" not in self.job_extra.keys():
            raise Exception("job_extra not have str_info")

    def workflow(self):
        """
        学习点:
            (1) 参数传入
                * 普通参数:  参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
                * 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
                  以 all_taskdata 传给此 task
            (2) 保存结果到 job ret_data 中
                场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
                      如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
                      在此 task 中进行汇聚下结果,然后进行保存数据

        执行顺序:
            执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
            task1 执行完成后,执行 task2
            task1 和 task2 都执行完成后,执行 task3, task3 将结果保存到 job ret_data 中
        """
        self.add_task("task1", "/demo_api/hello", params={"str_info": "hello world"})
        self.add_task("task2", "/demo_api/ping", dependencies=["task1"])
        self.add_task(
            "task3",
            "/demo_api/collect",
            requires=["all_taskdata"],
            dependencies=[
                "task1",
                "task2"],
            is_save=True)


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter('hello_params', HelloParams)

3.2.4 demo_ping_notifier

3.2.4.1 说明

job 状态变更时自动通知功能,通过编写 job_notifier 方法,接收 job_obj 参数即可实现接收 job 的变更信息,比如实现任务完成或者失败时进行回调的功能

3.2.4.2 代码

import logging

from xlib.taskflow.taskflow import WorkflowRunner


class Ping(WorkflowRunner):
    """
    a workflow is defined by overloading the WorkflowRunner.workflow() method:
    """

    def workflow(self):
        """
        先执行 task1, 再执行 task2
        """
        self.add_task("task1", "/demo_api/ping")
        # 通过 dependencies 描述依赖关系
        self.add_task("task2", "/demo_api/ping", dependencies=["task1"])

    @classmethod
    def job_notifier(cls, job_obj):
        """
        job_obj:
            job_id: (int)job_id
            job_client
            job_namespace
            job_reqid
            job_name: job_name
            job_status: started/finished/failed
            job_type
            job_tags
            ret_stat
            ret_data
            job_cost
            job_extra: (json)
            job_timeout
            job_lock
            exe_id
            operator
            is_valid
            c_time
            s_time
            e_time
        """
        logging.info("job_id={job_id} job_name={job_name} job_status={job_status}".format(
            job_id=job_obj.job_id,
            job_name=job_obj.job_name,
            job_status=job_obj.job_status
            ))


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter('ping_notifier', Ping)

3.2.5 demo_hello_lock

对 job 增加 lock, 通过在 params_check 方法中设置 job_locks 可对 job 加锁,如果需要加多个锁,则使用逗号分割

如 job_locks = []
job_locks.append("lock1")
job_locks.append("lock2")
self.job_locks = ",".join(job_locks)
from xlib.taskflow.taskflow import WorkflowRunner
from xlib.util import jsonschema


class HelloLock(WorkflowRunner):
    """
    a workflow is defined by overloading the WorkflowRunner.workflow() method:
    """

    def params_check(self):
        """
        params_check
        """
        schema = {
            "type": "object",
            "properties": {
                "str_info": {
                    "type": "string",
                    "pattern": "[A-Za-z]+$"
                },
                "job_timeout": {
                    "type": "number"
                },
                "job_locks": {
                    "type": "string",
                }
            },
            "required": ["str_info"]
        }
        jsonschema.validate(instance=self.job_extra, schema=schema)

        # job_timeout(s)
        if "job_timeout" in self.job_extra:
            self.job_timeout = self.job_extra["job_timeout"]
        else:
            self.job_timeout = 1800

        # job_locks
        if "job_locks" in self.job_extra:
            self.job_locks = self.job_extra["job_locks"]
        else:
            self.job_locks = "ceshi"

    def workflow(self):
        """
        学习点:
            (1) 参数传入
                * 普通参数:  参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
                * 参数名为 all_taskdata 的参数:此时会将此任务依赖的 task 的 ret_data 放到一个列表中,
                  以 all_taskdata 传给此 task
            (2) 保存结果到 job ret_data 中
                场景:工作流的场景分为操作型和数据型,如果是操作型,我们仅仅需要知道执行结果即可
                      如果是数据型,我们需要获取整个任务的结果,此时我们可以在最后的一个任务中 设置 is_save 为 True
                      在此 task 中进行汇聚下结果,然后进行保存数据

        执行顺序:
            执行 task1, task1 需要 str_info 参数, 参数的 value 会从任务依赖的 task 中获取,或者从 job_extra 中获取
            task1 执行完成后,执行 task2
        """
        self.add_task("task1", "/demo_api/hello", requires=["str_info"])
        self.add_task("task2", "/demo_api/ping", dependencies=["task1"])


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter('hello_lock', HelloLock)

3.3 task 传参

两种方式生成:
(1) 预设 task_params: 创建 task 任务时预置的 task 参数值(使用 params 参数)
(2) 动态生成 self.task_requires_dict:(使用 requires & extra 参数)
    1> 在 job_extra 中获取参数数据
    2> 在 task_extra 中获取参数数据
    3> 若存在依赖任务:在依赖的 task 的 ret_data 中获取参数数据
    4> 若有 all_taskdata 依赖参数,则将所有依赖的 ret_data 放到 all_taskdata 列表中

4 taskflow 接口

4.1 taskflow job

4.1.1 job 创建

POST /xingqiao/create_job

Headers

Name
Type
Description

x_username

String

标识是某个 username 创建的任务

Request Body

Name
Type
Description

job_namespace*

String

namespace 名

job_type*

String

plugin 名字

job_name

String

name, 具有唯一性

job_extra

Dict/Json

参数信息

job_timeout

Int

超时时间

{"stat": "OK", "job_id": 3}

响应

OK: 正常

{"stat": "OK", "job_id": 3}

ERR_JOB_EXTRA_INVALID: job_extra 类型不是 dict

{"stat": "ERR_JOB_EXTRA_INVALID"}

ERR_JOB_TYPE_NOT_EXISTS: job 插件不存在

{"stat": "ERR_JOB_TYPE_NOT_EXISTS"}

例子

job_extra 为空

$ curl -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 3}

job_extra 有值

$ curl -d '{"job_namespace":"ceshi_namespace", "job_type":"hello", "job_extra":{"str_info": "ceshi"}}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 25}

设置用户

$ curl -H "x_username:wangbin34" -d '{"job_namespace":"ceshi_namespace", "job_type":"ping"}' "http://127.0.0.1:8007/xingqiao/create_job"
{"stat": "OK", "job_id": 35}

4.1.2 job 删除

POST /xingqiao/delete_job

Request Body

Name
Type
Description

job_name

String

job name

job_id

Int

job id

{"stat": "OK"}

job_id 和 job_name 参数任选其一

例子

$ curl "http://127.0.0.1:8007/xingqiao/delete_job?job_id=2"
{"stat": "OK"}

4.1.3 job 重试

POST /xingqiao/retry_job

Request Body

Name
Type
Description

job_id

Int

job id

job_name

String

job name

{"stat":"OK", "job_id": 25}

例子

curl -d '{"job_id": 1}' "http://127.0.0.1:8079/xingqiao/retry_job"

{"stat": "ERR_JOB_STATUS_NOT_FAILED", "job_id": 1}

4.1.4 job 列表

GET /xingqiao/list_jobs

Query Parameters

Name
Type
Description

job_client

String

job_client, client ip

job_reqid

String

butterfly reqid

job_status

String

job 状态 started/finished/failed

job_namespace

String

job 命名空间

ret_stat

String

job 执行结果标识, 比如 OK/ERR

orderBy

String

排序字段(适配 amis)

orderDir

String

排序字段(适配 amis)

orderDir

String

asc/desc(适配 amis)

响应

正常

{
   "data" : {
      "total" : 2,
      "list" : [
         {
            "job_extra" : "{}",
            "job_id" : 2,
            "job_status" : "finished",
            "ret_stat" : "OK",
            "u_time" : "2021-06-19 12:33:04",
            "job_namespace" : "ceshi_namespace",
            "job_reqid" : "DEV_8E5244781FF8F2E6",
            "job_type" : "HelloWorld",
            "job_name" : "hotkey",
            "ret_data" : null,
            "job_cost" : null,
            "c_time" : "2021-06-19 12:33:04"
         },
         {
            "job_extra" : "{}",
            "job_id" : 1,
            "job_status" : "finished",
            "ret_stat" : "OK",
            "u_time" : "2021-06-19 12:33:01",
            "job_namespace" : "ceshi_namespace",
            "job_reqid" : "DEV_C8BD00F3E59CC0A1",
            "job_type" : "HelloWorld",
            "job_name" : "hotkey",
            "ret_data" : null,
            "job_cost" : null,
            "c_time" : "2021-06-19 12:33:01"
         }
      ]
   },
   "stat" : "OK"
}

例子

curl "http://127.0.0.1:8585/xingqiao/list_jobs"

{
   "data" : {
      "total" : 2,
      "list" : [
         {
            "job_extra" : "{}",
            "job_id" : 2,
            "job_status" : "finished",
            "ret_stat" : "OK",
            "u_time" : "2021-06-19 12:33:04",
            "job_namespace" : "ceshi_namespace",
            "job_reqid" : "DEV_8E5244781FF8F2E6",
            "job_type" : "HelloWorld",
            "job_name" : "hotkey",
            "ret_data" : null,
            "job_cost" : null,
            "c_time" : "2021-06-19 12:33:04"
         },
         {
            "job_extra" : "{}",
            "job_id" : 1,
            "job_status" : "finished",
            "ret_stat" : "OK",
            "u_time" : "2021-06-19 12:33:01",
            "job_namespace" : "ceshi_namespace",
            "job_reqid" : "DEV_C8BD00F3E59CC0A1",
            "job_type" : "HelloWorld",
            "job_name" : "hotkey",
            "ret_data" : null,
            "job_cost" : null,
            "c_time" : "2021-06-19 12:33:01"
         }
      ]
   },
   "stat" : "OK"
}

4.1.5 job 详情

GET /xingqiao/get_job_detail

Query Parameters

Name
Type
Description

job_id

Int

job id

job_name

String

job name

参数(job_id 和 job_name 任选其一)

响应

OK

{
   "data" : {
      "job_extra" : "{}",
      "job_id" : 2,
      "job_status" : "finished",
      "ret_stat" : "OK",
      "u_time" : "2021-06-19 12:33:04",
      "job_namespace" : "ceshi_namespace",
      "job_reqid" : "DEV_8E5244781FF8F2E6",
      "job_type" : "HelloWorld",
      "job_name" : "hotkey",
      "ret_data" : null,
      "job_cost" : null,
      "c_time" : "2021-06-19 12:33:04"
   },
   "stat" : "OK"
}

ERR_JOB_NOT_EXIST

{
   "data" : {},
   "stat" : "ERR_JOB_NOT_EXIST"
}

例子

curl "http://127.0.0.1:8585/xingqiao/get_job_detail?job_id=2"

{
   "data" : {
      "job_extra" : "{}",
      "job_id" : 2,
      "job_status" : "finished",
      "ret_stat" : "OK",
      "u_time" : "2021-06-19 12:33:04",
      "job_namespace" : "ceshi_namespace",
      "job_reqid" : "DEV_8E5244781FF8F2E6",
      "job_type" : "HelloWorld",
      "job_name" : "hotkey",
      "ret_data" : null,
      "job_cost" : null,
      "c_time" : "2021-06-19 12:33:04"
   },
   "stat" : "OK"
}

4.2 taskflow task

4.2.1 task list

4.2.1.1 作用

查看 job 中 list

4.2.1.2 请求

  • PATH

    • /xingqiao/list_tasks

  • 方法

    • GET

  • 参数(job_id 和 job_name 任选其一) * *

    • [task_status]: task 状态,waiting/pending/started/finished/failed

4.2.1.3 响应

OK

{
   "data" : {
      "list" : [
         {
            "task_cmd" : "/demo_api/ping",
            "task_reqid" : "4737bc6f-eb3c-4c17-a303-e0cacfb0f82c",
            "task_extra" : "{}",
            "u_time" : "2021-06-19 12:33:04",
            "task_id" : 3,
            "task_cost" : 0,
            "ret_data" : null,
            "task_label" : "easy_task1",
            "job_id" : 2,
            "task_status" : "finished",
            "task_dependencies" : "",
            "ret_stat" : "OK",
            "task_requires" : "",
            "task_retrymax" : 0,
            "task_provides" : "",
            "task_retrycount" : 0,
            "c_time" : "2021-06-19 12:33:04"
         },
         {
            "task_cmd" : "/demo_api/ping",
            "task_reqid" : "28876492-d141-4c69-a5ed-8c03e228fb1c",
            "task_extra" : "{}",
            "u_time" : "2021-06-19 12:33:04",
            "task_id" : 4,
            "task_cost" : 0,
            "ret_data" : null,
            "task_label" : "easy_task2",
            "job_id" : 2,
            "task_status" : "finished",
            "task_dependencies" : "easy_task1",
            "ret_stat" : "OK",
            "task_requires" : "",
            "task_retrymax" : 0,
            "task_provides" : "",
            "task_retrycount" : 0,
            "c_time" : "2021-06-19 12:33:04"
         }
      ]
   },
   "stat" : "OK"
}

ERR_JOB_NOT_EXIST

{
   "data" : {},
   "stat" : "ERR_JOB_NOT_EXIST"
}

4.2.1.4 例子

curl "http://127.0.0.1:8585/xingqiao/list_tasks?job_id=2"

{
   "data" : {
      "list" : [
         {
            "task_cmd" : "/demo_api/ping",
            "task_reqid" : "4737bc6f-eb3c-4c17-a303-e0cacfb0f82c",
            "task_extra" : "{}",
            "u_time" : "2021-06-19 12:33:04",
            "task_id" : 3,
            "task_cost" : 0,
            "ret_data" : null,
            "task_label" : "easy_task1",
            "job_id" : 2,
            "task_status" : "finished",
            "task_dependencies" : "",
            "ret_stat" : "OK",
            "task_requires" : "",
            "task_retrymax" : 0,
            "task_provides" : "",
            "task_retrycount" : 0,
            "c_time" : "2021-06-19 12:33:04"
         },
         {
            "task_cmd" : "/demo_api/ping",
            "task_reqid" : "28876492-d141-4c69-a5ed-8c03e228fb1c",
            "task_extra" : "{}",
            "u_time" : "2021-06-19 12:33:04",
            "task_id" : 4,
            "task_cost" : 0,
            "ret_data" : null,
            "task_label" : "easy_task2",
            "job_id" : 2,
            "task_status" : "finished",
            "task_dependencies" : "easy_task1",
            "ret_stat" : "OK",
            "task_requires" : "",
            "task_retrymax" : 0,
            "task_provides" : "",
            "task_retrycount" : 0,
            "c_time" : "2021-06-19 12:33:04"
         }
      ]
   },
   "stat" : "OK"
}

4.2.2 task 详情

4.2.2.1 作用

获取 task 详情

4.2.2.2 请求

  • PATH

    • /xingqiao/get_task

  • 参数

    • task_id: task_id

4.3.2.3 响应

{
   "data" : {
      "task_params" : null,
      "task_cmd" : "/qingnang_unit/u_rm",
      "task_is_save" : false,
      "task_reqid" : "",
      "task_extra" : "{}",
      "u_time" : "2021-08-27 02:04:59",
      "task_id" : 20,
      "task_cost" : 0,
      "ret_data" : null,
      "task_label" : "u_rm",
      "job_id" : 4,
      "task_status" : "waiting",
      "task_dependencies" : "u_del",
      "ret_stat" : "OK",
      "task_requires" : "unit_id",
      "task_retrymax" : 0,
      "task_provides" : "",
      "task_retrycount" : 0,
      "task_timeout" : 0,
      "c_time" : "2021-08-27 02:04:59"
   },
   "stat" : "OK"
}

4.3.2.4 例子

curl "http://127.0.0.1:8585/xingqiao/get_task?task_id=20"

{
   "data" : {
      "task_params" : null,
      "task_cmd" : "/qingnang_unit/u_rm",
      "task_is_save" : false,
      "task_reqid" : "",
      "task_extra" : "{}",
      "u_time" : "2021-08-27 02:04:59",
      "task_id" : 20,
      "task_cost" : 0,
      "ret_data" : null,
      "task_label" : "u_rm",
      "job_id" : 4,
      "task_status" : "waiting",
      "task_dependencies" : "u_del",
      "ret_stat" : "OK",
      "task_requires" : "unit_id",
      "task_retrymax" : 0,
      "task_provides" : "",
      "task_retrycount" : 0,
      "task_timeout" : 0,
      "c_time" : "2021-08-27 02:04:59"
   },
   "stat" : "OK"
}

4.3 taskflow job dot 图

4.3.1 获取 job dot 图

4.3.1.1 作用

输出 job dot 图

4.3.1.2 请求

  • PATH:

    • /xingqiao/get_graph

  • 参数:

    • job_id

4.3.1.3 响应

4.3.1.4 例子

http://XXXX:XXXX/xingqiao/get_graph?job_id=9

4.4 taskflow job report

4.4.1 成功率

4.4.1.1 作用

输出 SLA 曲线

4.4.1.2 请求

  • PATH

    • /xingqiao/statistics_success_rate

  • 参数

    • job_type: workflow job plugin name

    • [job_tag]: workflow job tag

    • [exclude_retstat]: 如果有此项则会返回两条 success_rate 数据

      • rate: rate = (1 - failed_count/total_count)/total_count * 100

      • rate: rate = (1 - (failed_count-ret_stat)/total_count)/total_count * 100

4.4.1.3 响应

{
    'data': {
        'title': {'text': 'success rate'},
        'series': [{'data': [100], 'type': 'line', 'name': 'rate', 'smooth': True}],
        'yAxis': {},
        'tooltip': {},
        'xAxis': {'data': [35]},
        'legend': {}
    }
}

4.4.1.4 例子

curl "http://IP:PORT/xingqiao/statistics_success_rate?job_type=unit_migrate"

4.4.2 tag 分布

4.4.2.1 作用

获取 job tag 分布数据

4.4.2.2 请求

  • PATH: /xingqiao/statistics_jobtag_ratio

  • 参数

    • job_type: workflow job plugin name

    • jobtag_name: job_tag name, eg: region

    • jobtag_values: job_tag values: eg: bj,nj,gz

    • [job_tag]: job_tag, eg: flag=redis

4.4.2.3 响应

{
    'data': {
        'series': [
            {
                'data': [{'name': 'bj', 'value': 0}, {'name': 'nj', 'value': 0}],
                'type': 'pie',
                'name': 'region'
            }
        ],
        'legend': {},
        'tooltip': {'trigger': 'item', 'formatter': '{a} {b}:{c} ({d}%)'},
        'title': {'text': 'region ratio'}
    }
}

4.4.2.4 例子

curl "http://IP:PORT/xingqiao/statistics_jobtag_ratio?job_type=unit_migrate&jobtag_name=region&jobtag_values=bj,nj,gz"

{
   "data" : {
      "tooltip" : {
         "formatter" : "{a} {b}:{c} ({d}%)",
         "trigger" : "item"
      },
      "legend" : {},
      "title" : {
         "text" : "region ratio"
      },
      "series" : [
         {
            "name" : "region",
            "type" : "pie",
            "data" : [
               {
                  "value" : 465,
                  "name" : "bj"
               },
               {
                  "value" : 33,
                  "name" : "nj"
               },
               {
                  "value" : 23,
                  "name" : "gz"
               }
            ]
         }
      ]
   },
   "stat" : "OK"
}

4.4.3 错误码分布

4.4.3.1 作用

获取错误码分布

4.4.3.2 请求

  • PATH: /xingqiao/statistics_retstat_ratio

  • 参数

    • job_type: workflow job plugin name

    • [job_tag]: workflow job tag, eg: region=bj

4.4.3.3 响应

{
    'data': {
        'series': [{'data': [{'name': u'OK', 'value': 2}], 'type': 'pie', 'name': 'retstat'}],
        'legend': {},
        'tooltip': {
            'trigger': 'item',
            # a=series["name"], b=series[x]["name"]
            # c=series[x]["value"], d=series[x]["value"]/total(series[x]["value"])
            'formatter': '{a} {b}:{c} ({d}%)'
        },
        'title': {'text': 'retstat ratio'}
    }
}

4.4.3.4 例子

curl "http://IP:PORT/xingqiao/statistics_retstat_ratio?job_type=unit_migrate"

{
   "success" : true,
   "data" : {
      "tooltip" : {
         "formatter" : "{a} {b}:{c} ({d}%)",
         "trigger" : "item"
      },
      "legend" : {},
      "title" : {
         "text" : "retstat ratio"
      },
      "series" : [
         {
            "name" : "retstat",
            "type" : "pie",
            "data" : [
               {
                  "value" : 26,
                  "name" : "ERR_JOB_EXE_TIMEOUT"
               },
               {
                  "value" : 3,
                  "name" : "ERR_SERVER_EXCEPTION"
               },
               {
                  "value" : 1022,
                  "name" : "OK"
               }
            ]
         }
      ]
   },
   "message" : "OK",
   "stat" : "OK"
}

5 实践

5.1 build_meta 步骤

如在第一步 build_meta 创建后续任务需要的表记录,后续任务仅更新记录(不再创建新记录)

Previous任务调度 RuQi(如期)Next配置管理 WuXing(五行)

Last updated 1 month ago

demo_ping
demo_hello
demo_hello