1 项目概述
1.1 背景介绍及目标
1.1.1 背景
工作流可以做什么
顺序工作流 -- Sequential Workflow (强调的是顺序过程)
Copy 用于流程已定的情况。什么是流程已定,这是说工作流的流向大体上不取决于外部的决定
比如贷款审批,流程基本上确定,并且其分支和循环也是规定好了的。又如文档审批,假期审批等都比较适合用 Sequential Workflow。
一般 Sequential Workflow 是不可逆的。除了循环 Activity 外,一般一个 Activity 执行完毕后便不再被执行。
简单的说,工作流可以帮助我们更方便、可视化地管理一些日常操作,将零散操作流式化
Copy (1) 日常例行操作(如 crontab)
(2) 简单报警自动处理(如磁盘清理)
(3) 复杂报警追查
(4) 其它需要人工用各种原子操作串联起来进行的复杂关联操作
工作流引擎提供了一个管理这些操作的平台,将以往零散的操作按照统一的规范管理起来,并将操作执行情况可视化的展现出来
并提供了一系列完善的人工干预接口,整体上提高了操作的规范性、效率、稳定性。
我们为什么需要工作流
工作流提供的规范化管理、状态可视化等特性,一方面能够很好的解决我们日常的零散操作脚本管理需求,将日常操作规范化
另一方面,工作流的可视化及相应的人工干预机制,极大的方便了日常操作,以前的脚本执行黑盒状态,通过工作流透明化
这些特性与我们的日常操作、报警处理等场景相结合,预期可以起到提高效率、提升操作稳定性的效果。
1.1.2 目标
打造易用的工作流,需要考虑工作流应该和业务逻辑解耦
1.2 名词解释
Job :指单个操作对应的任务集合,包含工作流定义及参数
2 需求分析
2.1 功能需求
2.1.1 使用场景
子任务(比如对集群的配置进行变更,对集群操作就是生成一个 job,对集群中的实例就是 task)
任务依赖
多个 task 之间依赖(比如 task_A 依赖 task_B)
场景 1:分析 Redis 集群中的大 key(需要先拉取 RDB,然后进行分析 RDB 中的数据,最后进行汇总数据)
场景 2:分析 Redis 集群中的热 key(需要获取到各个分片的热 key 后,进行汇总分析)
场景 3:实例下线(需要先去掉上下游关联,然后进行销毁实例)
2.2 非功能需求
2.3 调研
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
https://github.com/sailthru/stolos
https://github.com/openstack/mistral
2.3.1 XXX 平台 1(内部)
2.3.1.1 名词解释
Job:一次操作,如 service-upgrade 操作
Task: 完成一个 Job 所需执行的操作,如 service-upgrade 操作中需要进行 dervice-deploy 和 agent_command 两个操作
CTask: 完成一个 task 所需要执行的操作,如 service-upgrade 操作中的 agent_command 会触发 upgrade 和 config-changed 两个 hook,这既是 agent_command 的两个子 task。
Copy +-------------------------------------------------+
| Job |
| +---------------------------------------------+ |
| | +-----+ +-----+ +-----+ +-----+ | |
| |Task |task1| |task2| |task3| |task4| | |
| | +-----+ +-----+ +-----+ +-----+ | |
| +-------|-----\-------------------------------+ |
| | \ |
| +V-----+ V------+ +------+ +------+|
| Ctask |ctask1| |ctask2| |ctask3| |ctask4||
| +------+ +------+ +------+ +------+|
+-------------------------------------------------+
job 的 3 种状态
new : 执行操作前,调用 create_job 创建一个 create_job,然后往里面添加 task
done : 在 job 所有的 Task,Ctask 都为 done 后,Job 为 done
task 的 3 种状态
new : 执行操作前,调用 add_task 创建一个 task,然后异步发数据
start: 收到 mq 发来的数据,将 task 置为 start
done : 设置为 start 后,检查 CTask 是否都 done,如果为 done,将 task 置为 done
2.3.1.2 service 升级中的平台和 agent 通信流程
执行的命令点和 task 列表
task 列表:SELECT * FROM callback_task WHERE job_id = 46169
task_id
job_id
ptask_id
ctask_list
unit_id
runtime_cmd
status
is_resp
ret_code
ret_info
ctime
utime
cost
extra
4568: service_deploy 操作,生成了 unit-deploy 的 CTask
4569: agent_command,生成了 upgrade 和 config-change 的 Ctask
字段说明
Copy is_resp 表示发送完成的命令执行完后是否完成了回调
流程图
Copy +---------------------------------------------------------------------+
| Job(46169) |
| +-----------------------------------------------------------------+ |
| | +--------------------+ +-------------------+ | |
| |Task |service_deploy(4568)| |agent_command(4569)| | |
| | +---------+----------+ +-------------------+ | |
| +----------------|--------------------|-------|-------------------+ |
| | V V |
| +---------V-------+ +-------------+ +-------------------+ |
| Ctask |unit-deploy(4570)| |upgrade(4571)| |config-change(4572)| |
| +-----------------+ +-------------+ +-------------------+ |
+---------------------------------------------------------------------+
平台和 agent 涉及的 callback 流程
以 service_deplay 为例,平台会往 agent 发送 unit-deploy 命令,unit 执行命令后,会往平台发送请求,更改状态。
平台往 agent 发送命令
Copy Deploy 模块
+---------------------------------------+
| service_deploy |
+-------+-------------------------------+
|
| 1
+-------|-------------------------------+
| +-----V--------+ +-----------------+ |
| |agent_command | |command_from_nmq | | Control 模块
| +--------------+ +--^--------------+ |
+-------|------------/--------|---------+
| / |
| 2 / 3 | 4
| / V
+------V-------+ +-------------+
| nmq | | agent |
+--------------+ +-------------+
1 service 模块的 service_deploy 组装 unit-deploy 命令,调用 control 模块的 agent_command 方法
2 agent_command 执行以下动作:
在 job(46169) 中增加一个 task(4568)
3 nmq 发送数据到 control 模块的 command_from_nmq 接口
4 control 模块的 command_from_nmq 执行以下动作:
增加一个子 task(4570),父 task 是 4568
agent 往平台发送命令
Copy service 模块
+---------------------------------------------+
| unit_stat_transform |
+-------^-------------------------------------+
|
| 4
+-------|-------------------------------------+
| +------------------+ +-------------------+ |
| |callback_from_nmq | |callback_from_agent| | Callback 模块
| +-----^------------+ +-----^-------------+ |
+-------|--------------/------|---------------+
| / |
| 3 / 2 | 1
| V |
+--------------+ +-------------+
| nmq | | agent |
+--------------+ +-------------+
1 agent 执行完 unit-deploy 命令后,请求 callback 模块的 callback_from_agent 方法
2 callback_from_agent 调用 ral 将数据发送到 nmq
3 nmq 发送数据到 callback 模块的 callback_from_nmq,
4 callback 模块的 callback_from_nmq 接口执行以下操作:
更新 task 状态,从 new 到 start(taskid=4570)
执行回调中配置的 service::method 方法,这里执行的是 deploy::callback_deploy,在 callback_deploy 调用 unit_stat_transform 更新 unit 的状态。
根据执行结果检查 task 是否成功结束,若结束,检查父 task(4568)是否完成
若父 task 为 done,检查 job 是否为 done
2.3.1.3 疑问
什么情况下进行重试?
agent_command ==> nmq 时有 3 次重试
回调转成同步命令
平台往 agent 发送命令,agent 同步阻塞
Copy Deploy 模块
+---------------------------------------+
| service_deploy |
+-------+-------------------------------+
|
| 1
+-------|-------------------------------+
| +-----V--------+ +-----------------+ |
| |agent_command | |command_from_nmq | | Control 模块
| +--------------+ +--^-----------^--+ |
+-------|------------/--------|----|----+
| / | |
| 2 / 3 | 4 | 5
| / | |
+------V-------+ +-----V-------+
| nmq | | agent |
+--------------+ +-------------+
Job 如何拆分为 task
一张 Job 表 (callback_job),一张 task 表 (callback_task)
Job 和 Task 以及 CTask 在数据库中如何体现
Job 中有个 task_list 字段,是个 list, 用于标记 task
Task 中有个 ctask_list 字段,是个 list ,用于标记子 task
添加 Task
Copy def add_task(job_id, ptask_id, unit_id, runtime_cmd, reaction)
res = {
"error_code": 0,
"error_msg": "OK",
"data":{}
}
try:
ret = Task.add_task(job_id, ptask_id, unit_id, runtime_cmd, reaction) // 数据库操作,在 Task 表中增加 task 记录
except BaseException:
res["error_code"] = "-1"
res["error_msg"] = traceback.format_exc()
res['data']['task_id'] = ret;
if (0 == ptask_id) // 如果 ptask_id 为 0 则说明是 Task,否则为 CTask
add_task2job(job_id, ret); // 将新插入数据库的 taskid,追加到 Job 表中的对应 job 的 task_list 字段
else
add_task2ptask(ptask_id, ret); // 将新插入数据库的 taskid,追加到对应父级 task 的 ctask_list 字段中
return res;
客户端如何将异步的 job 转为同步请求
客户端发送命令时,会向平台发送 create_job 请求进行生成 job_id
然后进行定时 query_job 查询任务情况
Copy "status":"start" 时,表示 job 仍在执行
"status":"done" 时,表示 job 执行完成
2.3.2 OpenStack mistral(比较重)
2.3.2.1 mistral 简介
https://github.com/openstack/mistral
最初是由 Mirantis 公司贡献给 Openstack 社区的工作流组件,提供 Workflow As a Service 服务
类似 AWS 的 SWS(Simple Workflow Serivce),Hadoop 生态圈中的 oozie 服务
2.3.2.2 mistral 与 Heat 的关系
mistral 和 OpenStack 资源编排服务 Heat 不同,二者功能可能会有重叠,但 Heat 注重基础资源的编排,而 Mistral 则主要是用于任务编排。
Heat 的主要应用场景是创建租户基础资源模板,管理员可以创建一个资源模板,基于这个模板用户一次请求就可以完成虚拟机创建及配置、挂载数据卷、创建网络和路由、设置安全组等。
而 Mistral 的典型应用场景包括执行计划任务 Cloud Cron,调度任务 Task Scheduling,执行复杂的运行时间长的业务流程等。
2.3.3 OpenStack taskflow(库)
2.3.3.1 前言
TaskFlow 是 OpenStack 中的一个 Python 库,主要目的是让 task(任务)执行更加容易可靠,能将轻量的任务对象组织成一个有序的流。
若未安装 taskflow 到环境中:
目前 TaskFlow 支持三种模式:
线性:运行一个任务或流的列表,是一个接一个串行方式运行。
无序:运行一个任务或流的列表,以并行的方式运行,顺序与列表顺序无关,任务之间不存在依赖关系。
图:运行一个图标(组节点和边缘节点)之间组成的任务 / 流依赖驱动的顺序。
Copy +engines ----------------------------------------------------+
| +linear_flow ----+ +unordered_flow -+ +graph_flow -----+ |
| |+-----+ +-----+| |+-----+ +-----+| |+-----+ +-----+| |
| ||task1| |task2|| ||task1| |task2|| ||task1| |task2|| |
| |+-----+ +-----+| |+-----+ +-----+| |+-----+ +-----+| |
| +----------------+ +----------------+ +----------------+ |
+------------------------------------------------------------+
TaskFlow 支持创建不同的 task,并以声明的方式集成到一个 flow 中,这些 flow 会通过 engine 执行、停止、继续和恢复。
Flow:
(1) 线性流(linear_flow)
"""
from taskflow.patterns import linear_flow
linear_flow.Flow('linear').add(taskA(),taskB(),taskC())
"""
(2) 无序流(unordered_flow)
"""
from taskflow.patterns import unordered_flow
unordered_flow.Flow('linear').add(taskA(),taskB(),taskC())
"""
(3) 图流(graph_flow)
"""
from taskflow.patterns import graph_flow
graph_flow.Flow('linear').add(taskA(),taskB())
"""
2.3.3.2 任务的状态
就像任何其他的任务流系统一样,每个任务都有一些状态:PENDING RUNNING SUCCESS FAILURE,你也可以创建自定义的状态。
OpenStack taskflow 使用的 automaton
Copy +------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] | . | . | . | . |
| PENDING | on_failure | FAILURE | . | . |
| PENDING | on_running | RUNNING | . | . |
| RUNNING | on_failure | FAILURE | . | . |
| RUNNING | on_success | SUCCESS | . | . |
| SUCCESS[$] | . | . | . | . |
| WAITING[^] | on_failure | FAILURE | . | . |
| WAITING[^] | on_pending | PENDING | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
即 taskflow 的状态正常流程为 WAITING --> PENDING --> RUNNING --> SUCCESS
|
V
FAILURE
2.3.3.3 状态变更通知
(1)工作流状态变更
Copy >>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def flow_transition(state, details):
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> # 使用引擎的 Notifier 属性
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS
taskflow/engines/action_engine/engine.py
Copy class ActionEngine(base.Engine):
...
def _change_state(self, state):
moved, old_state = self.storage.change_flow_state(state)
if moved:
details = {
'engine': self,
'flow_name': self.storage.flow_name,
'flow_uuid': self.storage.flow_uuid,
'old_state': old_state,
}
self.notifier.notify(state, details)
即工作流状态变更时会通知到 notifier state 和 details 两个参数
(2)任务状态变更
Copy >>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def task_transition(state, details):
... print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> # 引擎的 atom_notifier 属性
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS
通过代码 notifier 就是实现了个类似 pub/sub 的功能
2.3.3.4 retry
Retry 是一个控制当错误发生时, 如何进行重试的特殊工作单元, 而且当你需要的时候还能够以其他参数来重试执行别的 Atom 子类. 常见类型:
AlwaysRevert: 错误发生时, 回滚子流
AlwaysRevertAll: 错误发生时, 回滚所有流
ForEach: 错误发生时, 为子流中的 Atom 提供一个新的值, 然后重试, 直到成功或 retry 中定义的值用完为止.
ParameterizedForEach: 错误发生时, 从后台存储(由 store 参数提供)中获取重试的值, 然后重试, 直到成功或后台存储中的值用完为止.
times
Copy flow = linear_flow.Flow('send_message', retry=retry.Times(5)).add(
SendMessageTask('sender'))
ForEach
Copy flow = linear_flow.Flow('f1').add(
task.Task('t1'),
linear_flow.Flow('f2', retry=retry.ForEach(values=['a', 'b', 'c'], name='r1', provides='value')).add(
task.Task('t2'),
task.Task('t3', requires='value')),
task.Task('t4'))
2.3.4 pyflow
feature
Copy (1) 使用 Python 代码定义工作流
(2) 在 localhost 或 sge 上运行工作流
(3) 可以继续部分完成的工作流
(4) 任务资源管理:指定每个任务所需的线程数和内存
(5) 递归工作流规范:获取任何现有的 pyFlow 对象并将其用作另一个 pyFlow 中的任务。
(6) 动态工作流规范:定义等待任务规范,而不仅仅是任务,这样就可以根据上游任务的结果定义任务(注意:递归工作流是一种更好的方法)
(7) 使用一致的工作流级别日志记录检测和报告所有失败的任务。
(8) 任务级日志记录:所有任务stderr都被记录和修饰,例如[time][host][workflow_run][taskid]
(9) 任务计时:任务包装器函数为每个任务提供时间装饰器
(10) 任务优先级:对于同时可以运行的任务,可以分配相对的优先级,以便首先运行或排队。
(11) 可更改每个任务的环境变量或工作目录。
(12) 作业完成/错误/异常的电子邮件通知
(13) 按规定的时间间隔提供持续的任务总结报告
(14) 指定其他外部调度程序参数(例如,为SGE指定队列名称)
(15) 以点格式(dot)输出任务图
内部实现
添加任务 再执行的顺序
Copy +-------------------------WorkflowRunner----------------------------------------+
| |
| addTask |
| | +--------------TaskManager(cdata, tdag)----------------+ |
| \ | +TaskDAG-------------------------------------------+ | |
| -----> addTask | | |
| | | |(waiting) | | |
| | | | ready_commands | | |
| | | | +------------------------------------+ | | |
| | | | | TaskNode | | | |
| | | \ |+------+------+------+------+------+| | | |
| | | -||task_N|task..|task_C|task_B|task_A||------- |
| | | |+------+------+------+------+------+| | | \ |
| | | +------------------------------------+ | | | |
| | | | | | |
| | +--------------------------------------------------+ | | |
| +------------------------------------------------------+ | |
| | |
| +------------------------------------------------------+ / |
| | TaskRunner (running)<- |
| +------------------------------------------------------+ |
+-------------------------------------------------------------------------------+
+scripts dir--------------------------------------------------------------------+
|+main.py--------------------------------------------------------------------+ |
|| +--------+ +--------+ +--------+ +---------+ +---------+ | | / 统一参数,比如参数为操作对象的唯一标识
|| |command1| |command2| |command3| |command..| |commandN | | | ------| (1) 元数据获取:根据唯一标识获取元数据等信息
|| +--------+ +--------+ +--------+ +---------+ +---------+ | | \ (2) 上下文约定:约定上下文依赖的数据
|+---------------------------------------------------------------------------+ |
| | | |
| +------V------+ +-----------V-----------+ |
| | | | | |
| | temp_db | | lib | |
| | | | | |
| +-------------+ +-----------------------+ |
+-------------------------------------------------------------------------------+
| |
| +---------------------- task 具体执行逻辑
|
+--------------------------------------------- 存储 task 中间数据,作为上下文环境使用
Copy # 如何优雅地终止python线程
知道为啥threading仅有start而没有end不?
你看,线程一般用在网络连接、释放系统资源、dump流文件,这些都跟IO相关了,你突然关闭线程那这些
没有合理地关闭怎么办?是不是就是给自己造bug呢?啊?!
因此这种事情中最重要的不是终止线程而是线程的清理啊。
解决方案 · 壹
一个比较nice的方式就是每个线程都带一个退出请求标志,在线程里面间隔一定的时间来检查一次,看是不是该自己离开了!
-----------------
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
-----------------
在这部分代码所示,当你想要退出线程的时候你应当显示调用stop()函数,并且使用join()函数来等待线程合适地退出。线程应当周期性地检测停止标志。
然而,还有一些使用场景中你真的需要kill掉一个线程:比如,当你封装了一个外部库,但是这个外部库在长时间调用,因此你想中断这个过程。
workflow 嵌套
Copy // Task graph from pyflow object 'SimpleWorkflow'
// Process command: 'make_pyflow_task_graph.py'
// Process working dir: '/Users/wangbin34/meetbill/github/pyflow/pyflow/demo/subWorkflow/pyflow.data/state'
// Graph capture time: 2022-07-02T08:19:33.796883
digraph SimpleWorkflowGraph {
compound=true;
rankdir=LR;
node[fontsize=10];
subgraph cluster_sg0 {
label = "SimpleWorkflow";
n0 [label="task1" color=blue];
n1 [label="task2" color=red style=bold];
n2 [label="subwf_task3" color=grey style=dashed shape=rect style=rounded];
n3 [label="task4" color=grey style=dashed];
n0 -> n2;
n1 -> n2;
n2 -> n3;
begin0 [label="begin" shape=diamond];
end0 [label="end" shape=diamond];
begin0 -> n0;
begin0 -> n1;
n3 -> end0;
}
{ rank = source; Legend [shape=none, margin=0, label=<
<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0" CELLPADDING="4">
<TR><TD COLSPAN="2">Legend</TD></TR>
<TR> <TD>complete</TD> <TD BGCOLOR="blue"></TD> </TR>
<TR> <TD>running</TD> <TD BGCOLOR="green"></TD> </TR>
<TR> <TD>queued</TD> <TD BGCOLOR="yellow"></TD> </TR>
<TR> <TD>waiting</TD> <TD BGCOLOR="grey"></TD> </TR>
<TR> <TD>error</TD> <TD BGCOLOR="red"></TD> </TR>
</TABLE>>];}
}
2.3.5 阿里云 Serverless 工作流
Serverless 工作流(Serverless Workflow)是一个用来协调多个分布式任务执行的全托管云服务。
在 Serverless 工作流中,您可以用顺序、分支、并行 等方式来编排分布式任务, Serverless 工作流会按照设定好的步骤可靠地协调任务执行 ,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑, 以确保工作流顺利完成。Serverless 工作流通过提供日志记录和审计来监视工作流的执行,方便您轻松地诊断和调试 应用。 Serverless 工作流简化了开发和运行业务流程所需要的任务协调、状态管理以及错误处理 等繁琐工作,让您聚焦业务逻辑开发。
下图描述了 Serverless 工作流如何协调分布式任务,这些任务可以是函数、已集成云服务 API、运行在虚拟机或容器上的程序
本图里的关键点:
2、可以对Faas 作为胶水间接调用 的服务进行编排;
3、可以把已有的 workflow 作为子流程 进行标配;
4、可以通过**消息队列服务+ callback **对第三方服务进行编排;
5、可以通过Faas + callback 引入第三方审批环节;
6、用户还有个比较强烈的需要就是对于一些不会做callback 或者针对workflow进行改造的第三方服务,数据状态、任务执行状态依赖检查的编排;
例如 spark 任务,db 表数据;可以为了避免检查任务长时间运行,可以实现多次轮询函数进行检查。
2.3.5.1 产品优势
协调分布式组件
Copy Serverless 工作流能够编排不同基础架构、不同网络、不同语言编写的应用,抹平混合云、专有云过渡到公共云或者从单体架构演进到微服务架构的落差。
减少流程代码量
Copy Serverless 工作流提供了丰富的控制逻辑,例如顺序、选择、并行等,让您以更少的代码实现复杂的业务逻辑。
提高应用容错性
Copy Serverless 工作流为您管理流程状态,内置检查点和回放能力,以确保您的应用程序按照预期逐步执行。错误重试和捕获可以让您灵活的处理错误。
Serverless
Copy Serverless 工作流根据实际执行步骤转换个数收费,执行结束不再收费。Serverless 工作流自动扩展让您免于管理硬件预算和扩展。
2.3.5.2 功能特性
服务编排能力
Copy Serverless 工作流可以帮助您将流程逻辑与任务执行分开,节省编写编排代码的时间。例如图片经过人脸识别函数后,根据人脸位置剪裁图像,最后发送消息通知您,Serverless工作流提供了一个Serverless的解决方案,降低了您的编排运维成本。
协调分布式组件
Copy Serverless 工作流能够协调在不同基础架构上、不同网络内,以不同语言编写的应用。应用不管是从私有云/专有云平滑过渡到混合云或公共云,或者从单体架构演进到微服务架构,Serverless工作流都能发挥协调作用。
内置错误处理
Copy 通过内置错误重试和捕获能力,您可以自动重试失败或超时的任务,对不同类型错误做出不同响应,并定义回退逻辑。
可视化监控
Copy Serverless 工作流提供可视化界面来定义工作流和查看执行状态。状态包括输入和输出等。方便您快速识别故障位置,并快速排除故障问题。
支持长时间运行流程
Copy Serverless 工作流可以跟踪整个流程,持续长时间执行确保流程执行完成。有些流程可能要执行几个小时、几天、甚至几个月。例如运维相关的Pipeline和邮件推广流程。
流程状态管理
Copy Serverless 工作流会管理流程执行中的所有状态,包括跟踪它所处的执行步骤,以及存储在步骤之间的数据传递。您无需自己管理流程状态,也不必将复杂的状态管理构建到任务中。
2.3.5.3 名词解释
本文主要对Serverless工作流涉及的专有名词及术语进行定义及解析,方便您更好地理解相关概念并使用Serverless工作流。
Serverless工作流(Serverless Workflow)
Copy 协调多个分布式任务执行的全托管Serverless云服务。通过Serverless工作流,您可以用顺序、分支、并行等方式来编排分布式任务,以确保流程按照设定好的顺序可靠地协调任务执行。
分布式任务
Copy Serverless工作流中的分布式任务可以是函数、已集成云服务的API、运行在虚拟机或容器上的程序。
流程(Flow)
Copy 定义了业务逻辑描述以及流程执行所需要的通用信息。
步骤(Step)
Copy 步骤是流程中的工作单元,可以是简单的原子步骤,如任务(task)、成功(succeed)、失败(fail)、等待(wait)和传递(pass)等;也可以是复杂的控制步骤,如选择(choice)、并行(parallel)和并行循环(foreach)等。步骤的组合使用构建了复杂的业务逻辑。
父步骤
Copy 如果步骤A包含步骤B,则称步骤A为父步骤。
子步骤
Copy 如果步骤A包含步骤B,则称步骤B为子步骤。
任务步骤(task)
Copy 步骤类型之一,使用任务步骤来定义函数计算服务的函数调用信息,执行任务步骤会调用相应的函数。
传递步骤(pass)
Copy 步骤类型之一,使用传递步骤来输出常量或者将输入转换成期望的输出。该类型的步骤通常用于调试未创建任务步骤的函数的流程逻辑。
等待步骤(wait)
Copy 步骤类型之一,使用等待步骤来暂停执行流程,然后再继续执行。您可以选择一个等待的相对时间,也可以以时间戳方式指定等待结束的绝对时间。
选择步骤(choice)
Copy 步骤类型之一,使用选择步骤让流程根据条件执行不同步骤。
并行步骤(parallel)
Copy 步骤类型之一,使用并行步骤并行执行多个不同步骤。
并行循环步骤(foreach)
Copy 步骤类型之一,使用并行循环步骤并行执行多个相同的步骤。
成功步骤(succeed)
Copy 步骤类型之一,使用成功步骤提前结束一系列串行的步骤。成功步骤通常和选择步骤结合使用,在选择步骤条件满足的情况下跳转到一个成功步骤,从而不再执行其他步骤。
失败步骤(fail)
Copy 步骤类型之一,使用失败步骤提前结束一系列串行的步骤。当流程执行完失败步骤后,定义在失败步骤之后的步骤不会被继续执行,并且导致失败步骤的父步骤失败,并一直传递,最后导致流程执行失败。
流程定义语言FDL(Flow Definition Language)
Copy 用来描述和定义业务逻辑,在执行流程时,Serverless工作流服务会根据流程定义依次执行相关步骤。
定时调度
Copy Serverless工作流支持在指定时间调度您的工作流。
2.3.5.4 输入和输出
流程和步骤
通常,流程和步骤之间,流程的多个步骤之间需要传递数据。和函数式编程语言类似,FDL 的步骤类似于函数, 它接受输入(Input),并返回输出(Output),输出会保存在父步骤(调用者)的本地(Local)变量里。 其中,输入和输出的类型必须是 JSON 对象结构,本地变量的类型因步骤而异。 例如
步骤的输入、输出和本地变量总大小不能超过 32 KiB,否则会导致流程执行失败
2.3.5.5 技术挑战
高效:
1、 workflow 配置信息解析如何做到执行时高效
2、 每个步骤都会产生对应的事件作为对工作流的执行过程记录,执行事件QPS是工作流QPS的倍数
高可用:
复杂性:
2.3.6 XXX 平台 2(内部)
2.3.6.1 架构
Copy +-----------------------------------------+
| |
| Web http |<----------+--+
| | | |
+-----+----------------------+------------+ | |
| | | |
+-----V-----+ +------------V------------+ | |
| | | | | |
| DB | | Redis | | |
| | | | | |
+-----------+ +------^------------^-----+ | |
| | | |
+------+----+ +-----+-----+ | |
| Consumer1 | | Consumer2 | | |
+------+----+ +-----+-----+ | |
| | | |
| +-----------------+ |
+---------------------------------+
(1) 创建任务
Copy (1) 创建 Job 和 Task 到 DB
(2) 下发一个 job 到 Redis
(2) 任务执行
Copy (1) Consumer1 从 Redis 中 pop 到 job 后,进行请求 Server 进行处理, Server 根据 job id 获取当前的 task 进行处理
2.3.7 X1 task
(1) X1 task 架构
Copy |
|
|
V 同步 handler 异步 handler(异步 + 编排)
+X1-resource-api------------+ +X1-resource-task-----------+
| +APP1---+ +APPN---+ | | +schedule-+ +worker----+ |
+ | | | | + | | | | +APP1--+ | |
| +-------+ +-------+ | | +---------+ | +------+ | |
+-------------+-------------+ | +engine---+ | +APPN--+ | |
| | | | | +------+ | |
| | +-------+-+ +----+---+-+ |
| | | | | |
| | +Redis--V---------V-+ | |
| | +-------------------+ | |
| +-----------------------|---+
| |
| |
+X1-base------V-------------------------------------------------V---+
| +sdk------------------------------------------------+ +model----+ |
| | +XRM-cli---+ +BCC-cli----+ +VM-cli----+ | | | |
| | +----------+ +-----------+ +----------+ | | | |
| +------------------------+--------------------------+ +----+----+ |
+--------------------------|---------------------------------|------+
| |
..................................V.................................V.......... DB 或者第三方服务
* schedule 处理 waiting(初始状态) & queuing(有相同 mutex 的 task 仍未完成) 的 task,将 task 改为 prerun
* engine 将 prerun & running 的 task 执行
子任务状态
Copy TaskStatusWaiting string = "waiting"
TaskStatusQueuing string = "queuing"
TaskStatusPreRun string = "prerun" # 前置操作
TaskStatusRunning string = "running"
TaskStatusPostRun string = "postrun" # 后置操作
TaskStatusSuccess string = "success"
TaskStatusError string = "error"
队列(3 个队列)
Copy TaskPriorityHigh string = "high"
TaskPriorityMedium string = "medium"
TaskPriorityLow string = "low"
(2) worker
Worker 执行任务的 Message And Result
Copy type TaskExecUnit struct {
TaskExecUnitID string
TaskID string
TaskBatchID string
SendAt time.Time
ExpireAt time.Time
Entity string // 执行实体,如 order_id
Dim string
Processor string // 执行函数
Parameters string // 函数参数
}
type TaskExecResult struct {
TaskExecUnitID string
TaskID string
TaskBatchID string
Entity string
StartAt time.Time
EndAt time.Time
Status string
Message string
LastUpdate time.Time
}
worker 消费 Message(伪代码)
Copy process, err = findProcess(teu.Processor);
err = process(ctx, teu)
(3) X1 task 优缺点
Copy 优点:
(1) 同步 handler 和 异步 hndler 分离,两个组件可以独立更新(异步 handler 一般是耗时任务)
(2) 使用 Redis 原生 Stream,时效性比较好
缺点:
(1) 同步 handler 和 异步 handler 的逻辑代码中使用 model 时,则需要共用 dao 层代码
(2) 异步 handler 使用的 Redis Stream 命令、目前处于单点状态
(3) 工作流仅支持线性步骤执行
(4) 创建 workflow 时是通过直接往 db 中插入任务,存在代码侵入(引入或继承了别的包或框架)
(5) engine(执行 workflow) 和 worker(执行 task step) 是一个整体,同一个 workflow 涉及的 task step 涉及多个模块,耦合性比较强(存在强耦合)
3 系统设计
3.1 系统架构
Copy +Xingqiao-------------------------------------+
| +Task------------+ |
| +---|/{app}/{handler}| |
| | +----------------+ |
| +Job-------+ | +Task------------+ |
| | +---+---|/{app}/{handler}| |
| +----------+ | +----------------+ |
| | +Task------------+ |
| +---|/{app}/{handler}| |
| +----------------+ |
+----------------------+----------------------+
|
|
+Redis ---------------V----------------------+
| +---------+ |
| |Twemproxy| \ |
| +--+---+--+ +----------+ |
| | | |Metaserver| |
| +--V---V--+ +----------+ |
| | Redis | / |
| +---------+ |
+----^-----------^-----------^-----------^----+
| | | |
| | | |
+----+----+ +----+----+ +----+----+ +----+----+
| worker1 | | worker2 | | worker3 | | workerN |
+---------+ +---------+ +---------+ +---------+
整体架构分为三部分
星桥:用于维护 job、task 状态机,以及 task 执行顺序
3.2 模块简介
3.2.1 job
3.2.2 task
3.2.2.1 任务的关注点
task 即为 1 个 handler
3.2.2.2 任务的定义
task 参数
task 进度状态
Copy +------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | go_waiting | waiting | . | . |
| pending | go_failure | failed | . | . |
| pending | go_running | started | . | . |
| started | go_failure | failed | . | . |
| started | go_success |finished | . | . |
| finished[$]| . | . | . | . |
| waiting[^] | go_failure | failed | . | . |
| waiting[^] | go_pending | pending | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
task 执行流程
Copy 检查依赖 准备参数 结果校验
(go_pending) (go_running) (go_success)
即 taskflow 的状态正常流程为 waiting ---> pending ---> started ---> finished[$]
^ | | |
| | | V
| +----------+-------> failed[$]
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
3.3 设计思路及折衷
3.3.1 推动工作流任务流转方法
3.3.1.1 方案一:使用定时任务
创建任务后,同时注册一个定时任务
Copy +------------------------------+
|+---------------+ | 10s +----------+
|| job | |<------------------------| ruqi |
|+-----+---------+ | +----------+
| | task_list:[1,3,4...] |
| V task_index:0 |
|+-----+ +-----+ |
||task1| |task3| ... |
|+-----+ +-----+ |
+------------------------------+
(1) job_create
创建任务,并同时创建个定时任务
(2) job_action
定时任务每隔 10s 发起一次 job_action, job_action 中 job 根据 task_index 依次执行 task_list 中的任务
job 结束时,删除掉 ruqi 定时任务
注意点:
Copy (1) 执行 job_action 时需要加锁(防止并发操作 job)
执行 job_action 时需要加锁,同时需要对锁进行续时间
3.3.1.2 方案二:使用消息队列
创建任务后,同时发一个消息到消息队列
Copy +------------------------------+
|+---------------+ |<------------------------------------+
|| job | | |
|+-----+---------+ | +---------+ +---------+----------+
| | task_list:[1,3,4...] |------->| MQ |<-------|push_forward_handler|
| V task_index:0 | +---------+ +--------------------+
|+-----+ +-----+ |
||task1| |task3| ... |
|+-----+ +-----+ |
+------------------------------+
(1) job_create
创建任务,并同时发布一条消息到消息队列
(2) job_action
push_forward_handler 从 MQ 中拿到消息,进行请求下 job_action 进行执行任务,然后再继续发个消息进行下次流转
job 结束时,则不再往消息队列中发消息
注意点:
Copy (1) 需要关注发布消息到消息队列是否成功,若丢失,则此 job 任务不再进行状态变更
(2) 执行 job_action 时需要加锁(防止并发操作 job)
3.3.1.3 方案三:任务完成后进行回调
创建任务后,任务执行完成时进行回调
Copy +------------------------------+
|+---------------+ |
|| job | | <---------+
|+-------+-------+ | |
| | | |
| V | | callback
|+-------+ +-------+ | |
||task1 | | task2 | |+----------+
|+-------+ +-------+ |
+------------------------------+
3.3.1.4 方案四:定时器 + 加锁
Copy +-----------+ +-----------+
| | | |
| | | |
| | | |
| xingqiao1 | | xingqiao2 |
| | | |
| | | |
+-----+-----+ +-----+-----+
| |
| |
+-----V-------------------V-----+
| DB |
+-------------------------------+
优点: 优点是实现简单
缺点: (1) 所有实例均在工作状态,对数据库会有压力
(2) 任务过多时,调度可能会出现瓶颈
3.3.1.5 折衷分析
方案一需要依赖定时任务,方案三通过 callback 方式不太容易管理,方案四任务过多时,调度可能会出现瓶颈
故使用方案二
3.3.2 Bubble 任务执行顺序
3.3.2.1 方案一:Bubble Down
Copy Job 1: Job 2:
task_A task_A
| / \
v v v
task_B task_B task_C
| |
| task_D
| |
v v
task_E
the "Bubble Down" approach is like "pushing" work through a pipe.
如:Job2 是 task_A 先完成,然后 task_B 和 task_C 开始运行,依次类推
这个的想法就是先完成 task_A, 然后完成 task_B, task_C 等,是传统的执行方式
3.3.2.2 方案二:Bubble Up
The "Bubble Up" approach is the concept of "pulling" work through a pipe.
Bubble Up 是先定义结果,然后生成依赖
Copy job 2:
task_E
^ ^
/ \
task_B task_D
^ ^
| |
| task_C
| |
\ /
task_A
比如最终是要最终执行 task_E,会生成任务 task_B 和 task_D, task_B 和 task_D 开始执行,task_D 执行创建 task_C, 依次类推
当完成 task 时,将 task 任务状态标记完成时,同时检查其母任务是否完成
OSP 平台使用此模式
这个的想法就是完成 task_E 工作,需要先完成 task_B, task_C 等依赖,然后通过其子任务完成时进行检查其他子任务状态,然后变更总 job 状态
3.3.2.3 折衷分析
"Bubble Up" 类似于函数调用,task 与 task 的耦合比较紧,比较难以扩展,故使用 bubble-down 方式
3.3.3 耗时任务处理方式
一些操作类的检查耗时比较长,比如检查 Redis 的数据同步状态完成时间,如果数据比较大,是分钟以及小时以上,如果 task 一直进行处理此任务,则会导致服务耗到所有线程
所以需要有"一次执行,多次检查"的模式
3.3.3.1 方案一:task 中增加任务 review 状态
Copy 检查依赖 准备参数 检查(可多次执行) 结果校验
(go_pending) (go_running) (go_review) (go_success)
waiting ---> pending ---> started -----> review---> finished
^ | | | |
| | | V |
| +----------+-------> failed <----------+
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
task 执行完成后,由对应的 review 任务进行检查结果,此任务可以多次执行
缺点:需要增加状态,另外定义 task 时需要配置 review 参数进行指定 review 方法
3.3.3.2 方案二:task 返回状态标识下次是否重新执行
Copy (exe_again) task_ing
+--------------+
| |
(go_pending) V (go_running) |(go_success)
waiting ---> pending ------> started ---> finished
^ | | |
| | | V
| +-----------+-----------> failed
| (go_failure) |
| |
+----------------------------+
(go_waiting) job retry
task 执行完成时,当结果状态为 ERR_TASK_ING 时,则发起一个 "go_pending" 事件,将 task 状态置为 pending 状态
优点:无需进行添加 task 状态,只需添加一个约定(根据 task 执行状态则触发一个事件)
3.3.3.3 折衷分析
方案二对服务侵入低,方案一还会变更创建 task 方法,不方便兼容,故使用方案二
3.3.4 Job 间并发限制
3.3.4.1 方案一:job_name 唯一性限制
job_name 添加唯一索引
Copy job_name = CharField(max_length=64, unique=True)
3.3.4.2 方案二:job 执行时加锁
Copy (1) 传 job_extra 时,添加锁标识
(2) job 添加状态
-----------------------------------------------------------
检查是否抢锁成功
|
V
job_status: waiting --> pending --> started --> finished
| | |
| | V
+-----------+-------->failed
-----------------------------------------------------------
3.3.4.3 折衷分析
方案一可作为强限制(创建任务时,如果出现同 job_name,则创建任务失败)
方案二是处理时并发限制
场景:要对 Redis 实例进行迁移,需要控制分片内串行迁移,同一个集群可多个分片进行同时迁移(比如 5 个并发)
可以提供 1 个锁标识,比如集群名 (ceshi_name),同时提供个并发数 (5),程序内部生成 (ceshi_name:0, ceshi_name:1, ceshi_name:2, ceshi_name:3, ceshi_name:4) 锁
锁标识 + 并发度可以在 xingqiao plugin 中设置默认值
3.3.5 超时处理
Task 任务在执行中,则此时整体 Job 超时,则此任务会丢失
备注: Task 任务都是在异步消费处理,在任务队列中保存 1 天有效时间
解决方法:若有正在执行中的任务,则等待此执行任务处理完成
started: 仅处于 stared 状态的 task 可以流转
failed: 不再进行状态流转(比如 ERR_TASK_ING 不再往 waiting 进行流转)
最后一个 Task 执行完成(stared --> finished) 时,则此时超时表明任务失败是得不偿失的
解决方法:判断任务是否完成放在判断任务是否超时前面
3.3.6 job 参数如何传到 task
生成 task 所需要的参数 dict self.task_requires_dict 两种方式生成:
(1) 预设 task_params: 创建 task 任务时预置的 task 参数值
(2) 动态生成 self.task_requires_dict:
3> 若存在依赖任务:在依赖的 task 的 ret_data 中获取参数数据
4> 若有 all_taskdata 依赖参数,则将所有依赖的 ret_data 放到 all_taskdata 列表中
3.3.7 状态变更通知
event 触发 job 状态机改变时发起 action(推送状态变更通知)
4 详细设计
4.1 job
4.1.1 job 状态
Copy +------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | . | | . | . |
| finished[$]| . | . | . | . |
| started[^] | go_failure | failed | . | . |
| started[^] | go_success |finished | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
任务依赖性时,在执行 task 时,任一 task 异常,则标记此 job 异常
子任务
Copy job1
|
+-------+-------+-------+
| | | |
task1 task2 task3 taskN
任务依赖
Copy Job_2
|
task_A
/ \
v v
task_B task_C
| |
| task_D
| |
v v
task_E
4.1.2 交互流程
4.1.2.1 创建任务
Copy /-------\
| start |
\---+---/
|
V
/------------------------------------------\
. Check whether the job_name does not exist . ----------+
\--------------------+---------------------/ |
| |
/--------------------V---------------------\ |
. Check whether the job_type exists . ----------|
\--------------------+---------------------/ |
| |
/--------------------V---------------------\ |
. Check whether the job_extra is valid . ----------|
\--------------------+---------------------/ |
| |
+-------------V-------------+ |
| create job | |
+-------------+-------------+ |
| |
/--------------------V---------------------\ |
. Is the job created successfully . ----------|
\--------------------+---------------------/ |
| |
+-------------V-------------+ |
| Push message to MQ | |
+-------------+-------------+ |
| |
+----------------------------------+
|
/---V---\
| end |
\-------/
4.1.2.2 执行任务
Copy /-------\
| start |
\---+---/
|
+-------------------------+
| |
| V
| /------------------------------------------\
| . Is the exe_id valid . -----N----+
| \--------------------+---------------------/ |
| | |
| +-----------------V-----------------+ |
| | If the job has a lock, | |
| | the lock is renewed | |
| +-----------------+-----------------+ |
| | |
| +-----------------V-----------------+ |
| | Send related events to tasks that | |
| | have not reached the final state | |
| +-----------------+-----------------+ |
| | |
| /--------------------V---------------------\ |
| . failed_count == 0 . -----N----|
| \--------------------+---------------------/ |
| | |
| /--------------------V---------------------\ |
| . finished_count < total_task_num . -----N----|
| \--------------------+---------------------/ |
| | |
| /--------------------V---------------------\ |
| . Is not timeout? . -----N----|
| \--------------------+---------------------/ |
| | |
| +-------------V-------------+ |
| | exe_id = exe_id + 1 | |
| | Push message to MQ | |
| +-------------+-------------+ |
| | |
+-------------------------+ /---V---\
| end |
\-------/
由 task handler 进行消费 task 任务
Copy (1) 设置 task 状态 (waiting->started)
(2) 执行 task 任务
(3) 设置 task 状态 (started->finished or started->failed)
4.1.3 数据库设计
Copy CREATE TABLE `workflow_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
`job_client` varchar(64) NOT NULL COMMENT 'job client ip',
`job_namespace` varchar(64) NOT NULL COMMENT "namespace",
`job_reqid` varchar(64) NOT NULL COMMENT "butterfly reqid",
`job_name` varchar(64) NOT NULL COMMENT "job name",
`job_status` varchar(64) NOT NULL COMMENT "job status",
`job_type` varchar(64) NOT NULL COMMENT "job type, 插件名",
`ret_stat` varchar(64) NOT NULL COMMENT "ret stat",
`ret_data` text COMMENT "ret data",
`job_cost` double NOT NULL COMMENT "cost, 单位是秒",
`job_extra` varchar(2048) NOT NULL COMMENT "job extra 参数",
`job_timeout` int(11) NOT NULL COMMENT "timeout, 单位是秒",
`job_lock` varchar(64) NOT NULL COMMENT 'job lock name',
`exe_id` int(11) NOT NULL,
`operator` varchar(64) NOT NULL,
`is_valid` tinyint(1) NOT NULL COMMENT "是否有效",
`c_time` datetime NOT NULL COMMENT "create time",
`s_time` datetime NOT NULL COMMENT "start time",
`e_time` datetime NOT NULL COMMENT "end time",
PRIMARY KEY (`job_id`),
UNIQUE KEY `job_job_name` (`job_name`),
KEY `job_job_client` (`job_client`),
KEY `job_job_namespace` (`job_namespace`),
KEY `job_job_reqid` (`job_reqid`),
KEY `job_job_status` (`job_status`),
KEY `job_job_type` (`job_type`),
KEY `job_ret_stat` (`ret_stat`),
KEY `job_job_cost` (`job_cost`),
KEY `job_operator` (`operator`),
KEY `job_is_valid` (`is_valid`),
KEY `job_exe_id` (`exe_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT "星桥 job 表";
4.1.4 接口
详细看用户手册
/xingqiao/create_job 创建 job
/xingqiao/delete_job 删除 job
/xingqiao/retry_job 重试 job
/xingqiao/list_jobs job 列表
/xingqiao/get_job_detail job 详情
4.2 task
4.2.1 task 状态
Copy +------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | go_waiting | waiting | . | . |
| pending | go_failure | failed | . | . |
| pending | go_running | started | . | . |
| started | go_failure | failed | . | . |
| started | go_success |finished | . | . |
| finished[$]| . | . | . | . |
| waiting[^] | go_failure | failed | . | . |
| waiting[^] | go_pending | pending | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
检查依赖 准备参数 结果校验
(go_pending) (go_running) (go_success)
即 taskflow 的状态正常流程为 waiting ---> pending ---> started ---> finished[$]
^ | | |
| | | V
| +----------+-------> failed[$]
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
4.2.2 交互流程
4.2.3 数据库设计
Copy CREATE TABLE `workflow_task` (
`task_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
`job_id` bigint(20) NOT NULL COMMENT "job id",
`task_label` varchar(64) NOT NULL COMMENT "task 名字",
`task_reqid` varchar(64) NOT NULL COMMENT "task 在百川中的任务 id",
`task_cmd` varchar(128) NOT NULL COMMENT "handler path",
`task_params` varchar(1024) DEFAULT NULL COMMENT "task params, 若有值则是个 json",
`task_requires` varchar(1024) NOT NULL COMMENT "task 所需的参数: 即 arg1,arg2",
`task_provides` varchar(1024) NOT NULL COMMENT "task 可提供的结果: 即 result1,result2",
`task_dependencies` varchar(1024) NOT NULL COMMENT "task 依赖: 即 label1,label2",
`task_status` varchar(64) NOT NULL COMMENT "task 状态",
`task_cost` double NOT NULL COMMENT "task 耗时",
`task_extra` varchar(2048) NOT NULL COMMENT "task extra, 暂时没用",
`task_is_save` tinyint(1) NOT NULL COMMENT "是否保存结果到 job",
`ret_stat` varchar(64) NOT NULL COMMENT "tast ret stat",
`ret_data` text COMMENT "task ret data",
`task_retrymax` int(11) NOT NULL COMMENT "task retry max",
`task_retrycount` int(11) NOT NULL COMMENT "task retry count",
`task_timeout` int(11) NOT NULL COMMENT "task timeout",
`c_time` datetime NOT NULL COMMENT "create time",
`u_time` datetime NOT NULL COMMENT "update time",
PRIMARY KEY (`task_id`),
KEY `task_job_id` (`job_id`),
KEY `task_task_label` (`task_label`),
KEY `task_task_reqid` (`task_reqid`),
KEY `task_task_cmd` (`task_cmd`),
KEY `task_task_status` (`task_status`),
KEY `task_task_cost` (`task_cost`),
KEY `task_ret_stat` (`ret_stat`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT "星桥 task 表";
5 传送门
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
adage: https://github.com/yadage/adage
腾讯蓝鲸作业平台:https://github.com/Tencent/bk-job
StackStorm/st2/st2actions