流程编排(星桥)
三五星桥连月朔,万千灯火彻天街。
1 项目概述
1.1 背景介绍及目标
1.1.1 背景
工作流可以做什么
顺序工作流 -- Sequential Workflow (强调的是顺序过程)
用于流程已定的情况。什么是流程已定,这是说工作流的流向大体上不取决于外部的决定
比如贷款审批,流程基本上确定,并且其分支和循环也是规定好了的。又如文档审批,假期审批等都比较适合用 Sequential Workflow。
一般 Sequential Workflow 是不可逆的。除了循环 Activity 外,一般一个 Activity 执行完毕后便不再被执行。
简单的说,工作流可以帮助我们更方便、可视化地管理一些日常操作,将零散操作流式化
(1) 日常例行操作(如 crontab)
(2) 简单报警自动处理(如磁盘清理)
(3) 复杂报警追查
(4) 其它需要人工用各种原子操作串联起来进行的复杂关联操作
工作流引擎提供了一个管理这些操作的平台,将以往零散的操作按照统一的规范管理起来,并将操作执行情况可视化的展现出来
并提供了一系列完善的人工干预接口,整体上提高了操作的规范性、效率、稳定性。
我们为什么需要工作流
工作流提供的规范化管理、状态可视化等特性,一方面能够很好的解决我们日常的零散操作脚本管理需求,将日常操作规范化
另一方面,工作流的可视化及相应的人工干预机制,极大的方便了日常操作,以前的脚本执行黑盒状态,通过工作流透明化
这些特性与我们的日常操作、报警处理等场景相结合,预期可以起到提高效率、提升操作稳定性的效果。
1.1.2 目标
打造易用的工作流,需要考虑工作流应该和业务逻辑解耦
1.2 名词解释
Job :指单个操作对应的任务集合,包含工作流定义及参数
Task:指工作流的单个步骤的执行流程
2 需求分析
2.1 功能需求
2.1.1 使用场景
定时任务执行情况
仅使用 job
子任务(比如对集群的配置进行变更,对集群操作就是生成一个 job,对集群中的实例就是 task)
job + (n)task
任务依赖
多个 task 之间依赖(比如 task_A 依赖 task_B)
场景 1:分析 Redis 集群中的大 key(需要先拉取 RDB,然后进行分析 RDB 中的数据,最后进行汇总数据)
场景 2:分析 Redis 集群中的热 key(需要获取到各个分片的热 key 后,进行汇总分析)
场景 3:实例下线(需要先去掉上下游关联,然后进行销毁实例)
2.2 非功能需求
2.3 调研
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
https://github.com/sailthru/stolos
https://github.com/openstack/mistral
Openstack taskflow
ST2 workflow(百度云预案平台使用的此 workflow)
2.3.1 XXX 平台 1(内部)
2.3.1.1 名词解释
Job:一次操作,如 service-upgrade 操作
Task: 完成一个 Job 所需执行的操作,如 service-upgrade 操作中需要进行 dervice-deploy 和 agent_command 两个操作
CTask: 完成一个 task 所需要执行的操作,如 service-upgrade 操作中的 agent_command 会触发 upgrade 和 config-changed 两个 hook,这既是 agent_command 的两个子 task。
+-------------------------------------------------+
| Job |
| +---------------------------------------------+ |
| | +-----+ +-----+ +-----+ +-----+ | |
| |Task |task1| |task2| |task3| |task4| | |
| | +-----+ +-----+ +-----+ +-----+ | |
| +-------|-----\-------------------------------+ |
| | \ |
| +V-----+ V------+ +------+ +------+|
| Ctask |ctask1| |ctask2| |ctask3| |ctask4||
| +------+ +------+ +------+ +------+|
+-------------------------------------------------+
job 的 3 种状态
new : 执行操作前,调用 create_job 创建一个 create_job,然后往里面添加 task
start: 自身已经执行,子节点未完成(蓝色)
done : 在 job 所有的 Task,Ctask 都为 done 后,Job 为 done
task 的 3 种状态
new : 执行操作前,调用 add_task 创建一个 task,然后异步发数据
start: 收到 mq 发来的数据,将 task 置为 start
done : 设置为 start 后,检查 CTask 是否都 done,如果为 done,将 task 置为 done
2.3.1.2 service 升级中的平台和 agent 通信流程
执行的命令点和 task 列表
task 列表:SELECT * FROM callback_task WHERE job_id = 46169
4568
46169
0
[4570]
946
Control::send2mq
done
yes
0
1407594530
1407594530
0
{"reaction":[]}
4569
46169
0
[4571,4572]
946
Control::send2mq
done
yes
0
1407594530
1407594541
0
{"reaction":[]}
4570
46169
4568
[]
946
unit-deploy
done
yes
0
1407594530
1407594530
0
{"reaction":[]}
4571
46169
4569
[]
946
upgrade
done
yes
0
1407594530
1407594539
0
{"reaction":[]}
4572
46169
4569
[]
946
config-changed
done
yes
0
1407594530
1407594541
0
{"reaction":[]}
4568: service_deploy 操作,生成了 unit-deploy 的 CTask
4569: agent_command,生成了 upgrade 和 config-change 的 Ctask
字段说明
is_resp 表示发送完成的命令执行完后是否完成了回调
流程图
+---------------------------------------------------------------------+
| Job(46169) |
| +-----------------------------------------------------------------+ |
| | +--------------------+ +-------------------+ | |
| |Task |service_deploy(4568)| |agent_command(4569)| | |
| | +---------+----------+ +-------------------+ | |
| +----------------|--------------------|-------|-------------------+ |
| | V V |
| +---------V-------+ +-------------+ +-------------------+ |
| Ctask |unit-deploy(4570)| |upgrade(4571)| |config-change(4572)| |
| +-----------------+ +-------------+ +-------------------+ |
+---------------------------------------------------------------------+
平台和 agent 涉及的 callback 流程
以 service_deplay 为例,平台会往 agent 发送 unit-deploy 命令,unit 执行命令后,会往平台发送请求,更改状态。
平台往 agent 发送命令
Deploy 模块
+---------------------------------------+
| service_deploy |
+-------+-------------------------------+
|
| 1
+-------|-------------------------------+
| +-----V--------+ +-----------------+ |
| |agent_command | |command_from_nmq | | Control 模块
| +--------------+ +--^--------------+ |
+-------|------------/--------|---------+
| / |
| 2 / 3 | 4
| / V
+------V-------+ +-------------+
| nmq | | agent |
+--------------+ +-------------+
1 service 模块的 service_deploy 组装 unit-deploy 命令,调用 control 模块的 agent_command 方法
2 agent_command 执行以下动作:
在 job(46169) 中增加一个 task(4568)
调 ral 发送数据到 nmq
3 nmq 发送数据到 control 模块的 command_from_nmq 接口
4 control 模块的 command_from_nmq 执行以下动作:
增加一个子 task(4570),父 task 是 4568
调 trigeer 发送数据到 agent
agent 往平台发送命令
service 模块
+---------------------------------------------+
| unit_stat_transform |
+-------^-------------------------------------+
|
| 4
+-------|-------------------------------------+
| +------------------+ +-------------------+ |
| |callback_from_nmq | |callback_from_agent| | Callback 模块
| +-----^------------+ +-----^-------------+ |
+-------|--------------/------|---------------+
| / |
| 3 / 2 | 1
| V |
+--------------+ +-------------+
| nmq | | agent |
+--------------+ +-------------+
1 agent 执行完 unit-deploy 命令后,请求 callback 模块的 callback_from_agent 方法
2 callback_from_agent 调用 ral 将数据发送到 nmq
3 nmq 发送数据到 callback 模块的 callback_from_nmq,
4 callback 模块的 callback_from_nmq 接口执行以下操作:
更新 task 状态,从 new 到 start(taskid=4570)
执行回调中配置的 service::method 方法,这里执行的是 deploy::callback_deploy,在 callback_deploy 调用 unit_stat_transform 更新 unit 的状态。
根据执行结果检查 task 是否成功结束,若结束,检查父 task(4568)是否完成
若父 task 为 done,检查 job 是否为 done
2.3.1.3 疑问
什么情况下进行重试?
agent_command ==> nmq 时有 3 次重试
回调转成同步命令
平台往 agent 发送命令,agent 同步阻塞
Deploy 模块
+---------------------------------------+
| service_deploy |
+-------+-------------------------------+
|
| 1
+-------|-------------------------------+
| +-----V--------+ +-----------------+ |
| |agent_command | |command_from_nmq | | Control 模块
| +--------------+ +--^-----------^--+ |
+-------|------------/--------|----|----+
| / | |
| 2 / 3 | 4 | 5
| / | |
+------V-------+ +-----V-------+
| nmq | | agent |
+--------------+ +-------------+
Job 如何拆分为 task
一张 Job 表 (callback_job),一张 task 表 (callback_task)
Job 和 Task 以及 CTask 在数据库中如何体现
Job 中有个 task_list 字段,是个 list, 用于标记 task
Task 中有个 ctask_list 字段,是个 list ,用于标记子 task
添加 Task
def add_task(job_id, ptask_id, unit_id, runtime_cmd, reaction)
res = {
"error_code": 0,
"error_msg": "OK",
"data":{}
}
try:
ret = Task.add_task(job_id, ptask_id, unit_id, runtime_cmd, reaction) // 数据库操作,在 Task 表中增加 task 记录
except BaseException:
res["error_code"] = "-1"
res["error_msg"] = traceback.format_exc()
res['data']['task_id'] = ret;
if (0 == ptask_id) // 如果 ptask_id 为 0 则说明是 Task,否则为 CTask
add_task2job(job_id, ret); // 将新插入数据库的 taskid,追加到 Job 表中的对应 job 的 task_list 字段
else
add_task2ptask(ptask_id, ret); // 将新插入数据库的 taskid,追加到对应父级 task 的 ctask_list 字段中
return res;
客户端如何将异步的 job 转为同步请求
客户端发送命令时,会向平台发送 create_job 请求进行生成 job_id
然后进行定时 query_job 查询任务情况
"status":"start" 时,表示 job 仍在执行
"status":"done" 时,表示 job 执行完成
2.3.2 OpenStack mistral(比较重)
2.3.2.1 mistral 简介
https://github.com/openstack/mistral
最初是由 Mirantis 公司贡献给 Openstack 社区的工作流组件,提供 Workflow As a Service 服务
类似 AWS 的 SWS(Simple Workflow Serivce),Hadoop 生态圈中的 oozie 服务
2.3.2.2 mistral 与 Heat 的关系
mistral 和 OpenStack 资源编排服务 Heat 不同,二者功能可能会有重叠,但 Heat 注重基础资源的编排,而 Mistral 则主要是用于任务编排。
Heat 的主要应用场景是创建租户基础资源模板,管理员可以创建一个资源模板,基于这个模板用户一次请求就可以完成虚拟机创建及配置、挂载数据卷、创建网络和路由、设置安全组等。
而 Mistral 的典型应用场景包括执行计划任务 Cloud Cron,调度任务 Task Scheduling,执行复杂的运行时间长的业务流程等。
2.3.3 OpenStack taskflow(库)
2.3.3.1 前言
TaskFlow 是 OpenStack 中的一个 Python 库,主要目的是让 task(任务)执行更加容易可靠,能将轻量的任务对象组织成一个有序的流。
若未安装 taskflow 到环境中:
pip install taskflow
目前 TaskFlow 支持三种模式:
线性:运行一个任务或流的列表,是一个接一个串行方式运行。
无序:运行一个任务或流的列表,以并行的方式运行,顺序与列表顺序无关,任务之间不存在依赖关系。
图:运行一个图标(组节点和边缘节点)之间组成的任务 / 流依赖驱动的顺序。

+engines ----------------------------------------------------+
| +linear_flow ----+ +unordered_flow -+ +graph_flow -----+ |
| |+-----+ +-----+| |+-----+ +-----+| |+-----+ +-----+| |
| ||task1| |task2|| ||task1| |task2|| ||task1| |task2|| |
| |+-----+ +-----+| |+-----+ +-----+| |+-----+ +-----+| |
| +----------------+ +----------------+ +----------------+ |
+------------------------------------------------------------+
TaskFlow 支持创建不同的 task,并以声明的方式集成到一个 flow 中,这些 flow 会通过 engine 执行、停止、继续和恢复。
Flow:
(1) 线性流(linear_flow)
"""
from taskflow.patterns import linear_flow
linear_flow.Flow('linear').add(taskA(),taskB(),taskC())
"""
(2) 无序流(unordered_flow)
"""
from taskflow.patterns import unordered_flow
unordered_flow.Flow('linear').add(taskA(),taskB(),taskC())
"""
(3) 图流(graph_flow)
"""
from taskflow.patterns import graph_flow
graph_flow.Flow('linear').add(taskA(),taskB())
"""
2.3.3.2 任务的状态
就像任何其他的任务流系统一样,每个任务都有一些状态:PENDING RUNNING SUCCESS FAILURE,你也可以创建自定义的状态。
OpenStack taskflow 使用的 automaton
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] | . | . | . | . |
| PENDING | on_failure | FAILURE | . | . |
| PENDING | on_running | RUNNING | . | . |
| RUNNING | on_failure | FAILURE | . | . |
| RUNNING | on_success | SUCCESS | . | . |
| SUCCESS[$] | . | . | . | . |
| WAITING[^] | on_failure | FAILURE | . | . |
| WAITING[^] | on_pending | PENDING | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
即 taskflow 的状态正常流程为 WAITING --> PENDING --> RUNNING --> SUCCESS
|
V
FAILURE
2.3.3.3 状态变更通知
(1)工作流状态变更
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def flow_transition(state, details):
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> # 使用引擎的 Notifier 属性
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS
taskflow/engines/action_engine/engine.py
class ActionEngine(base.Engine):
...
def _change_state(self, state):
moved, old_state = self.storage.change_flow_state(state)
if moved:
details = {
'engine': self,
'flow_name': self.storage.flow_name,
'flow_uuid': self.storage.flow_uuid,
'old_state': old_state,
}
self.notifier.notify(state, details)
即工作流状态变更时会通知到 notifier state 和 details 两个参数
(2)任务状态变更
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def task_transition(state, details):
... print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> # 引擎的 atom_notifier 属性
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS
通过代码 notifier 就是实现了个类似 pub/sub 的功能
2.3.3.4 retry
Retry 是一个控制当错误发生时, 如何进行重试的特殊工作单元, 而且当你需要的时候还能够以其他参数来重试执行别的 Atom 子类. 常见类型:
AlwaysRevert: 错误发生时, 回滚子流
AlwaysRevertAll: 错误发生时, 回滚所有流
Times: 错误发生时, 重试子流
ForEach: 错误发生时, 为子流中的 Atom 提供一个新的值, 然后重试, 直到成功或 retry 中定义的值用完为止.
ParameterizedForEach: 错误发生时, 从后台存储(由 store 参数提供)中获取重试的值, 然后重试, 直到成功或后台存储中的值用完为止.
times
flow = linear_flow.Flow('send_message', retry=retry.Times(5)).add(
SendMessageTask('sender'))
ForEach
flow = linear_flow.Flow('f1').add(
task.Task('t1'),
linear_flow.Flow('f2', retry=retry.ForEach(values=['a', 'b', 'c'], name='r1', provides='value')).add(
task.Task('t2'),
task.Task('t3', requires='value')),
task.Task('t4'))
https://wiki.openstack.org/wiki/TaskFlow/Retry
2.3.4 pyflow
feature
(1) 使用 Python 代码定义工作流
(2) 在 localhost 或 sge 上运行工作流
(3) 可以继续部分完成的工作流
(4) 任务资源管理:指定每个任务所需的线程数和内存
(5) 递归工作流规范:获取任何现有的 pyFlow 对象并将其用作另一个 pyFlow 中的任务。
(6) 动态工作流规范:定义等待任务规范,而不仅仅是任务,这样就可以根据上游任务的结果定义任务(注意:递归工作流是一种更好的方法)
(7) 使用一致的工作流级别日志记录检测和报告所有失败的任务。
(8) 任务级日志记录:所有任务stderr都被记录和修饰,例如[time][host][workflow_run][taskid]
(9) 任务计时:任务包装器函数为每个任务提供时间装饰器
(10) 任务优先级:对于同时可以运行的任务,可以分配相对的优先级,以便首先运行或排队。
(11) 可更改每个任务的环境变量或工作目录。
(12) 作业完成/错误/异常的电子邮件通知
(13) 按规定的时间间隔提供持续的任务总结报告
(14) 指定其他外部调度程序参数(例如,为SGE指定队列名称)
(15) 以点格式(dot)输出任务图
内部实现
添加任务 再执行的顺序
+-------------------------WorkflowRunner----------------------------------------+
| |
| addTask |
| | +--------------TaskManager(cdata, tdag)----------------+ |
| \ | +TaskDAG-------------------------------------------+ | |
| -----> addTask | | |
| | | |(waiting) | | |
| | | | ready_commands | | |
| | | | +------------------------------------+ | | |
| | | | | TaskNode | | | |
| | | \ |+------+------+------+------+------+| | | |
| | | -||task_N|task..|task_C|task_B|task_A||------- |
| | | |+------+------+------+------+------+| | | \ |
| | | +------------------------------------+ | | | |
| | | | | | |
| | +--------------------------------------------------+ | | |
| +------------------------------------------------------+ | |
| | |
| +------------------------------------------------------+ / |
| | TaskRunner (running)<- |
| +------------------------------------------------------+ |
+-------------------------------------------------------------------------------+
+scripts dir--------------------------------------------------------------------+
|+main.py--------------------------------------------------------------------+ |
|| +--------+ +--------+ +--------+ +---------+ +---------+ | | / 统一参数,比如参数为操作对象的唯一标识
|| |command1| |command2| |command3| |command..| |commandN | | | ------| (1) 元数据获取:根据唯一标识获取元数据等信息
|| +--------+ +--------+ +--------+ +---------+ +---------+ | | \ (2) 上下文约定:约定上下文依赖的数据
|+---------------------------------------------------------------------------+ |
| | | |
| +------V------+ +-----------V-----------+ |
| | | | | |
| | temp_db | | lib | |
| | | | | |
| +-------------+ +-----------------------+ |
+-------------------------------------------------------------------------------+
| |
| +---------------------- task 具体执行逻辑
|
+--------------------------------------------- 存储 task 中间数据,作为上下文环境使用
# 如何优雅地终止python线程
知道为啥threading仅有start而没有end不?
你看,线程一般用在网络连接、释放系统资源、dump流文件,这些都跟IO相关了,你突然关闭线程那这些
没有合理地关闭怎么办?是不是就是给自己造bug呢?啊?!
因此这种事情中最重要的不是终止线程而是线程的清理啊。
解决方案 · 壹
一个比较nice的方式就是每个线程都带一个退出请求标志,在线程里面间隔一定的时间来检查一次,看是不是该自己离开了!
-----------------
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
-----------------
在这部分代码所示,当你想要退出线程的时候你应当显示调用stop()函数,并且使用join()函数来等待线程合适地退出。线程应当周期性地检测停止标志。
然而,还有一些使用场景中你真的需要kill掉一个线程:比如,当你封装了一个外部库,但是这个外部库在长时间调用,因此你想中断这个过程。
workflow 嵌套

// Task graph from pyflow object 'SimpleWorkflow'
// Process command: 'make_pyflow_task_graph.py'
// Process working dir: '/Users/wangbin34/meetbill/github/pyflow/pyflow/demo/subWorkflow/pyflow.data/state'
// Graph capture time: 2022-07-02T08:19:33.796883
digraph SimpleWorkflowGraph {
compound=true;
rankdir=LR;
node[fontsize=10];
subgraph cluster_sg0 {
label = "SimpleWorkflow";
n0 [label="task1" color=blue];
n1 [label="task2" color=red style=bold];
n2 [label="subwf_task3" color=grey style=dashed shape=rect style=rounded];
n3 [label="task4" color=grey style=dashed];
n0 -> n2;
n1 -> n2;
n2 -> n3;
begin0 [label="begin" shape=diamond];
end0 [label="end" shape=diamond];
begin0 -> n0;
begin0 -> n1;
n3 -> end0;
}
{ rank = source; Legend [shape=none, margin=0, label=<
<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0" CELLPADDING="4">
<TR><TD COLSPAN="2">Legend</TD></TR>
<TR> <TD>complete</TD> <TD BGCOLOR="blue"></TD> </TR>
<TR> <TD>running</TD> <TD BGCOLOR="green"></TD> </TR>
<TR> <TD>queued</TD> <TD BGCOLOR="yellow"></TD> </TR>
<TR> <TD>waiting</TD> <TD BGCOLOR="grey"></TD> </TR>
<TR> <TD>error</TD> <TD BGCOLOR="red"></TD> </TR>
</TABLE>>];}
}
2.3.5 阿里云 Serverless 工作流
Serverless 工作流(Serverless Workflow)是一个用来协调多个分布式任务执行的全托管云服务。
在 Serverless 工作流中,您可以用顺序、分支、并行等方式来编排分布式任务, Serverless 工作流会按照设定好的步骤可靠地协调任务执行,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑, 以确保工作流顺利完成。Serverless 工作流通过提供日志记录和审计来监视工作流的执行,方便您轻松地诊断和调试应用。 Serverless 工作流简化了开发和运行业务流程所需要的任务协调、状态管理以及错误处理等繁琐工作,让您聚焦业务逻辑开发。
下图描述了 Serverless 工作流如何协调分布式任务,这些任务可以是函数、已集成云服务 API、运行在虚拟机或容器上的程序

本图里的关键点:
1、可以对 Faas 函数进行编排,同步调用;
2、可以对Faas 作为胶水间接调用的服务进行编排;
3、可以把已有的 workflow 作为子流程进行标配;
4、可以通过**消息队列服务+ callback **对第三方服务进行编排;
5、可以通过Faas + callback 引入第三方审批环节;
6、用户还有个比较强烈的需要就是对于一些不会做callback 或者针对workflow进行改造的第三方服务,数据状态、任务执行状态依赖检查的编排;
例如 spark 任务,db 表数据;可以为了避免检查任务长时间运行,可以实现多次轮询函数进行检查。
2.3.5.1 产品优势
协调分布式组件
Serverless 工作流能够编排不同基础架构、不同网络、不同语言编写的应用,抹平混合云、专有云过渡到公共云或者从单体架构演进到微服务架构的落差。
减少流程代码量
Serverless 工作流提供了丰富的控制逻辑,例如顺序、选择、并行等,让您以更少的代码实现复杂的业务逻辑。
提高应用容错性
Serverless 工作流为您管理流程状态,内置检查点和回放能力,以确保您的应用程序按照预期逐步执行。错误重试和捕获可以让您灵活的处理错误。
Serverless
Serverless 工作流根据实际执行步骤转换个数收费,执行结束不再收费。Serverless 工作流自动扩展让您免于管理硬件预算和扩展。
2.3.5.2 功能特性
服务编排能力
Serverless 工作流可以帮助您将流程逻辑与任务执行分开,节省编写编排代码的时间。例如图片经过人脸识别函数后,根据人脸位置剪裁图像,最后发送消息通知您,Serverless工作流提供了一个Serverless的解决方案,降低了您的编排运维成本。
协调分布式组件
Serverless 工作流能够协调在不同基础架构上、不同网络内,以不同语言编写的应用。应用不管是从私有云/专有云平滑过渡到混合云或公共云,或者从单体架构演进到微服务架构,Serverless工作流都能发挥协调作用。
内置错误处理
通过内置错误重试和捕获能力,您可以自动重试失败或超时的任务,对不同类型错误做出不同响应,并定义回退逻辑。
可视化监控
Serverless 工作流提供可视化界面来定义工作流和查看执行状态。状态包括输入和输出等。方便您快速识别故障位置,并快速排除故障问题。
支持长时间运行流程
Serverless 工作流可以跟踪整个流程,持续长时间执行确保流程执行完成。有些流程可能要执行几个小时、几天、甚至几个月。例如运维相关的Pipeline和邮件推广流程。
流程状态管理
Serverless 工作流会管理流程执行中的所有状态,包括跟踪它所处的执行步骤,以及存储在步骤之间的数据传递。您无需自己管理流程状态,也不必将复杂的状态管理构建到任务中。
2.3.5.3 名词解释
本文主要对Serverless工作流涉及的专有名词及术语进行定义及解析,方便您更好地理解相关概念并使用Serverless工作流。
Serverless工作流(Serverless Workflow)
协调多个分布式任务执行的全托管Serverless云服务。通过Serverless工作流,您可以用顺序、分支、并行等方式来编排分布式任务,以确保流程按照设定好的顺序可靠地协调任务执行。
分布式任务
Serverless工作流中的分布式任务可以是函数、已集成云服务的API、运行在虚拟机或容器上的程序。
流程(Flow)
定义了业务逻辑描述以及流程执行所需要的通用信息。
步骤(Step)
步骤是流程中的工作单元,可以是简单的原子步骤,如任务(task)、成功(succeed)、失败(fail)、等待(wait)和传递(pass)等;也可以是复杂的控制步骤,如选择(choice)、并行(parallel)和并行循环(foreach)等。步骤的组合使用构建了复杂的业务逻辑。
父步骤
如果步骤A包含步骤B,则称步骤A为父步骤。
子步骤
如果步骤A包含步骤B,则称步骤B为子步骤。
任务步骤(task)
步骤类型之一,使用任务步骤来定义函数计算服务的函数调用信息,执行任务步骤会调用相应的函数。
传递步骤(pass)
步骤类型之一,使用传递步骤来输出常量或者将输入转换成期望的输出。该类型的步骤通常用于调试未创建任务步骤的函数的流程逻辑。
等待步骤(wait)
步骤类型之一,使用等待步骤来暂停执行流程,然后再继续执行。您可以选择一个等待的相对时间,也可以以时间戳方式指定等待结束的绝对时间。
选择步骤(choice)
步骤类型之一,使用选择步骤让流程根据条件执行不同步骤。
并行步骤(parallel)
步骤类型之一,使用并行步骤并行执行多个不同步骤。
并行循环步骤(foreach)
步骤类型之一,使用并行循环步骤并行执行多个相同的步骤。
成功步骤(succeed)
步骤类型之一,使用成功步骤提前结束一系列串行的步骤。成功步骤通常和选择步骤结合使用,在选择步骤条件满足的情况下跳转到一个成功步骤,从而不再执行其他步骤。
失败步骤(fail)
步骤类型之一,使用失败步骤提前结束一系列串行的步骤。当流程执行完失败步骤后,定义在失败步骤之后的步骤不会被继续执行,并且导致失败步骤的父步骤失败,并一直传递,最后导致流程执行失败。
流程定义语言FDL(Flow Definition Language)
用来描述和定义业务逻辑,在执行流程时,Serverless工作流服务会根据流程定义依次执行相关步骤。
定时调度
Serverless工作流支持在指定时间调度您的工作流。
2.3.5.4 输入和输出
流程和步骤
通常,流程和步骤之间,流程的多个步骤之间需要传递数据。和函数式编程语言类似,FDL 的步骤类似于函数, 它接受输入(Input),并返回输出(Output),输出会保存在父步骤(调用者)的本地(Local)变量里。 其中,输入和输出的类型必须是 JSON 对象结构,本地变量的类型因步骤而异。 例如
任务步骤把调用函数计算函数的返回结果作为本地变量
并行步骤把它所有分支的输出(数组)作为本地变量。
步骤的输入、输出和本地变量总大小不能超过 32 KiB,否则会导致流程执行失败
2.3.5.5 技术挑战
高效:
1、 workflow 配置信息解析如何做到执行时高效
2、 每个步骤都会产生对应的事件作为对工作流的执行过程记录,执行事件QPS是工作流QPS的倍数
3、 回调模式如何快速激活暂停中的工作流
高可用:
1、 workflow 平台无损上线
复杂性:
1、 第三方服务;
2、 流程配置语法;
2.3.6 XXX 平台 2(内部)
2.3.6.1 架构
+-----------------------------------------+
| |
| Web http |<----------+--+
| | | |
+-----+----------------------+------------+ | |
| | | |
+-----V-----+ +------------V------------+ | |
| | | | | |
| DB | | Redis | | |
| | | | | |
+-----------+ +------^------------^-----+ | |
| | | |
+------+----+ +-----+-----+ | |
| Consumer1 | | Consumer2 | | |
+------+----+ +-----+-----+ | |
| | | |
| +-----------------+ |
+---------------------------------+
(1) 创建任务
(1) 创建 Job 和 Task 到 DB
(2) 下发一个 job 到 Redis
(2) 任务执行
(1) Consumer1 从 Redis 中 pop 到 job 后,进行请求 Server 进行处理, Server 根据 job id 获取当前的 task 进行处理
2.3.7 X1 task
(1) X1 task 架构
schedule 决定 workflow 能不能开始执行
engine 是确保已经开始的 workflow 执行完成(分发 step 任务到消息队列、查询 step 结果等)
worker 是消费任务队列中的消息,处理相应的 step 任务
|
|
|
V 同步 handler 异步 handler(异步 + 编排)
+X1-resource-api------------+ +X1-resource-task-----------+
| +APP1---+ +APPN---+ | | +schedule-+ +worker----+ |
+ | | | | + | | | | +APP1--+ | |
| +-------+ +-------+ | | +---------+ | +------+ | |
+-------------+-------------+ | +engine---+ | +APPN--+ | |
| | | | | +------+ | |
| | +-------+-+ +----+---+-+ |
| | | | | |
| | +Redis--V---------V-+ | |
| | +-------------------+ | |
| +-----------------------|---+
| |
| |
+X1-base------V-------------------------------------------------V---+
| +sdk------------------------------------------------+ +model----+ |
| | +XRM-cli---+ +BCC-cli----+ +VM-cli----+ | | | |
| | +----------+ +-----------+ +----------+ | | | |
| +------------------------+--------------------------+ +----+----+ |
+--------------------------|---------------------------------|------+
| |
..................................V.................................V.......... DB 或者第三方服务
* schedule 处理 waiting(初始状态) & queuing(有相同 mutex 的 task 仍未完成) 的 task,将 task 改为 prerun
waiting --> queuing
queuing --> prerun(会检查处于 prerun & running & postrun & manual 是否有相同 mutex)
* engine 将 prerun & running 的 task 执行
prerun --> running(发起任务)
running --> success|error|manual|prerun
子任务状态
# 初始态
TaskStatusWaiting string = "waiting" # 初始状态,还未被调度,长时间 waiting 可能是 x1-task 未启动
# 排队中
TaskStatusQueuing string = "queuing"
# 运行态
TaskStatusPreRun string = "prerun" # 前置操作
TaskStatusRunning string = "running"
TaskStatusPostRun string = "postrun" # 后置操作
TaskStatusManual string = "manual" # 需人工处理
# 终止态
TaskStatusSuccess string = "success"
TaskStatusError string = "error"
队列(3 个队列)
TaskPriorityHigh string = "high" # high-priority-queue
TaskPriorityMedium string = "medium" # medium-priority-queue
TaskPriorityLow string = "low" # low-priority-queue
(2) worker
Worker 执行任务的 Message And Result
type TaskExecUnit struct {
TaskExecUnitID string
TaskID string
TaskBatchID string
SendAt time.Time
ExpireAt time.Time
Entity string // 执行实体,如 order_id
Dim string
Processor string // 执行函数
Parameters string // 函数参数
}
type TaskExecResult struct {
TaskExecUnitID string
TaskID string
TaskBatchID string
Entity string
StartAt time.Time
EndAt time.Time
Status string
Message string
LastUpdate time.Time
}
worker 消费 Message(伪代码)
process, err = findProcess(teu.Processor);
err = process(ctx, teu)
(3) X1 task 优缺点
优点:
(1) 同步 handler 和 异步 hndler 分离,两个组件可以独立更新(异步 handler 一般是耗时任务)
(2) 使用 Redis 原生 Stream,时效性比较好
缺点:
(1) 同步 handler 和 异步 handler 的逻辑代码中使用 model 时,则需要共用 dao 层代码
(2) 异步 handler 使用的 Redis Stream 命令、目前处于单点状态
(3) 工作流仅支持线性步骤执行
(4) 创建 workflow 时是通过直接往 db 中插入任务,存在代码侵入(引入或继承了别的包或框架)
(5) engine(执行 workflow) 和 worker(执行 task step) 是一个整体,同一个 workflow 涉及的 task step 涉及多个模块,耦合性比较强(存在强耦合)
3 系统设计
3.1 系统架构
+Xingqiao-------------------------------------+
| +Task------------+ |
| +---|/{app}/{handler}| |
| | +----------------+ |
| +Job-------+ | +Task------------+ |
| | +---+---|/{app}/{handler}| |
| +----------+ | +----------------+ |
| | +Task------------+ |
| +---|/{app}/{handler}| |
| +----------------+ |
+----------------------+----------------------+
|
|
+Redis ---------------V----------------------+
| +---------+ |
| |Twemproxy| \ |
| +--+---+--+ +----------+ |
| | | |Metaserver| |
| +--V---V--+ +----------+ |
| | Redis | / |
| +---------+ |
+----^-----------^-----------^-----------^----+
| | | |
| | | |
+----+----+ +----+----+ +----+----+ +----+----+
| worker1 | | worker2 | | worker3 | | workerN |
+---------+ +---------+ +---------+ +---------+
整体架构分为三部分
星桥:用于维护 job、task 状态机,以及 task 执行顺序
Redis 集群:作为消息队列使用
worker:task 具体执行单元
3.2 模块简介
3.2.1 job
3.2.2 task
3.2.2.1 任务的关注点
task 即为 1 个 handler
3.2.2.2 任务的定义
task 参数
task 参数为 task_id
task 进度状态
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | go_waiting | waiting | . | . |
| pending | go_failure | failed | . | . |
| pending | go_running | started | . | . |
| started | go_failure | failed | . | . |
| started | go_success |finished | . | . |
| finished[$]| . | . | . | . |
| waiting[^] | go_failure | failed | . | . |
| waiting[^] | go_pending | pending | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
task 执行流程
检查依赖 准备参数 结果校验
(go_pending) (go_running) (go_success)
即 taskflow 的状态正常流程为 waiting ---> pending ---> started ---> finished[$]
^ | | |
| | | V
| +----------+-------> failed[$]
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
3.3 设计思路及折衷
3.3.1 推动工作流任务流转方法
3.3.1.1 方案一:使用定时任务
创建任务后,同时注册一个定时任务
+------------------------------+
|+---------------+ | 10s +----------+
|| job | |<------------------------| ruqi |
|+-----+---------+ | +----------+
| | task_list:[1,3,4...] |
| V task_index:0 |
|+-----+ +-----+ |
||task1| |task3| ... |
|+-----+ +-----+ |
+------------------------------+
(1) job_create
创建任务,并同时创建个定时任务
(2) job_action
定时任务每隔 10s 发起一次 job_action, job_action 中 job 根据 task_index 依次执行 task_list 中的任务
job 结束时,删除掉 ruqi 定时任务
注意点:
(1) 执行 job_action 时需要加锁(防止并发操作 job)
执行 job_action 时需要加锁,同时需要对锁进行续时间
3.3.1.2 方案二:使用消息队列
创建任务后,同时发一个消息到消息队列
+------------------------------+
|+---------------+ |<------------------------------------+
|| job | | |
|+-----+---------+ | +---------+ +---------+----------+
| | task_list:[1,3,4...] |------->| MQ |<-------|push_forward_handler|
| V task_index:0 | +---------+ +--------------------+
|+-----+ +-----+ |
||task1| |task3| ... |
|+-----+ +-----+ |
+------------------------------+
(1) job_create
创建任务,并同时发布一条消息到消息队列
(2) job_action
push_forward_handler 从 MQ 中拿到消息,进行请求下 job_action 进行执行任务,然后再继续发个消息进行下次流转
job 结束时,则不再往消息队列中发消息
注意点:
(1) 需要关注发布消息到消息队列是否成功,若丢失,则此 job 任务不再进行状态变更
(2) 执行 job_action 时需要加锁(防止并发操作 job)
3.3.1.3 方案三:任务完成后进行回调
创建任务后,任务执行完成时进行回调
+------------------------------+
|+---------------+ |
|| job | | <---------+
|+-------+-------+ | |
| | | |
| V | | callback
|+-------+ +-------+ | |
||task1 | | task2 | |+----------+
|+-------+ +-------+ |
+------------------------------+
3.3.1.4 方案四:定时器 + 加锁
+-----------+ +-----------+
| | | |
| | | |
| | | |
| xingqiao1 | | xingqiao2 |
| | | |
| | | |
+-----+-----+ +-----+-----+
| |
| |
+-----V-------------------V-----+
| DB |
+-------------------------------+
优点: 优点是实现简单
缺点: (1) 所有实例均在工作状态,对数据库会有压力
(2) 任务过多时,调度可能会出现瓶颈
3.3.1.5 折衷分析
方案一需要依赖定时任务,方案三通过 callback 方式不太容易管理,方案四任务过多时,调度可能会出现瓶颈
故使用方案二
3.3.2 Bubble 模式
3.3.2.1 方案一:Bubble Down
Job 1: Job 2:
task_A task_A
| / \
v v v
task_B task_B task_C
| |
| task_D
| |
v v
task_E
the "Bubble Down" approach is like "pushing" work through a pipe.
如:Job2 是 task_A 先完成,然后 task_B 和 task_C 开始运行,依次类推
这个的想法就是先完成 task_A, 然后完成 task_B, task_C 等,是传统的执行方式
3.3.2.2 方案二:Bubble Up
The "Bubble Up" approach is the concept of "pulling" work through a pipe.
Bubble Up 是先定义结果,然后生成依赖
job 2:
task_E
^ ^
/ \
task_B task_D
^ ^
| |
| task_C
| |
\ /
task_A
比如最终是要最终执行 task_E,会生成任务 task_B 和 task_D, task_B 和 task_D 开始执行,task_D 执行创建 task_C, 依次类推
当完成 task 时,将 task 任务状态标记完成时,同时检查其母任务是否完成
OSP 平台使用此模式
这个的想法就是完成 task_E 工作,需要先完成 task_B, task_C 等依赖,然后通过其子任务完成时进行检查其他子任务状态,然后变更总 job 状态
3.3.2.3 折衷分析
"Bubble Up" 类似于函数调用,task 与 task 的耦合比较紧,比较难以扩展,故使用 bubble-down 方式
3.3.3 耗时任务处理方式
一些操作类的检查耗时比较长,比如检查 Redis 的数据同步状态完成时间,如果数据比较大,是分钟以及小时以上,如果 task 一直进行处理此任务,则会导致服务耗到所有线程
所以需要有"一次执行,多次检查"的模式
3.3.3.1 方案一:task 中增加任务 review 状态
检查依赖 准备参数 检查(可多次执行) 结果校验
(go_pending) (go_running) (go_review) (go_success)
waiting ---> pending ---> started -----> review---> finished
^ | | | |
| | | V |
| +----------+-------> failed <----------+
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
task 执行完成后,由对应的 review 任务进行检查结果,此任务可以多次执行
优点:状态比较明确
缺点:需要增加状态,另外定义 task 时需要配置 review 参数进行指定 review 方法
3.3.3.2 方案二:task 返回状态标识下次是否重新执行
(exe_again) task_ing
+--------------+
| |
(go_pending) V (go_running) |(go_success)
waiting ---> pending ------> started ---> finished
^ | | |
| | | V
| +-----------+-----------> failed
| (go_failure) |
| |
+----------------------------+
(go_waiting) job retry
task 执行完成时,当结果状态为 ERR_TASK_ING 时,则发起一个 "go_pending" 事件,将 task 状态置为 pending 状态
优点:无需进行添加 task 状态,只需添加一个约定(根据 task 执行状态则触发一个事件)
3.3.3.3 折衷分析
方案二对服务侵入低,方案一还会变更创建 task 方法,不方便兼容,故使用方案二
3.3.4 Job 间并发限制
3.3.4.1 方案一:job_name 唯一性限制
job_name 添加唯一索引
job_name = CharField(max_length=64, unique=True)
3.3.4.2 方案二:job 执行时加锁
(1) 传 job_extra 时,添加锁标识
(2) job 添加状态
-----------------------------------------------------------
检查是否抢锁成功
|
V
job_status: waiting --> pending --> started --> finished
| | |
| | V
+-----------+-------->failed
-----------------------------------------------------------
3.3.4.3 折衷分析
方案一可作为强限制(创建任务时,如果出现同 job_name,则创建任务失败)
方案二是处理时并发限制
场景:要对 Redis 实例进行迁移,需要控制分片内串行迁移,同一个集群可多个分片进行同时迁移(比如 5 个并发)
可以提供 1 个锁标识,比如集群名 (ceshi_name),同时提供个并发数 (5),程序内部生成 (ceshi_name:0, ceshi_name:1, ceshi_name:2, ceshi_name:3, ceshi_name:4) 锁
锁标识 + 并发度可以在 xingqiao plugin 中设置默认值
3.3.5 超时处理
Task 任务在执行中,则此时整体 Job 超时,则此任务会丢失
备注: Task 任务都是在异步消费处理,在任务队列中保存 1 天有效时间
解决方法:若有正在执行中的任务,则等待此执行任务处理完成
waiting: 不再进行状态流转
pending: 不再进行状态流转
started: 仅处于 stared 状态的 task 可以流转
finished: 无需改动
failed: 不再进行状态流转(比如 ERR_TASK_ING 不再往 waiting 进行流转)
最后一个 Task 执行完成(stared --> finished) 时,则此时超时表明任务失败是得不偿失的
解决方法:判断任务是否完成放在判断任务是否超时前面
3.3.6 job 参数如何传到 task
生成 task 所需要的参数 dict self.task_requires_dict 两种方式生成:
(1) 预设 task_params: 创建 task 任务时预置的 task 参数值
(2) 动态生成 self.task_requires_dict:
1> 在 job_extra 中获取参数数据
2> 在 task_extra 中获取参数数据
3> 若存在依赖任务:在依赖的 task 的 ret_data 中获取参数数据
4> 若有 all_taskdata 依赖参数,则将所有依赖的 ret_data 放到 all_taskdata 列表中
3.3.7 状态变更通知
定义 job_notifier
触发 job_notifier
job 使用状态机实现
event 触发 job 状态机改变时发起 action(推送状态变更通知)
3.3.8 任务执行顺序
方案一:无差别执行 task,task 内部判断依赖是否已完成,若未完成则继续等待
方案二:task 进行排序,根据顺序执行 job_step 执行(初始值为 0)
Job
|
task_A(0)
/ \
v v
task_B(1)task_C(1)
| |
| task_D(2)
| |
v v
task_E(3)
* task_B 依赖 task_A 时,task_B 的 job_step 是在 task_A 的基础上加 1
* task_E 依赖 task_B 和 task_D 时,使用 task_B、task_D 的 job_step 最大值加 1
job 就可以根据 job_step 进行依次执行 task。
每次获取当前 job_step 的 task, 若未完成,则等下次调度,若已完成,则将 job_step+1 保存
在 task 表中为 task_priority
折中分析:
方案一有很多无效检查,会导致数据库无效查询,方案二特定场景会导致任务执行时间变长(task_B 和 task_D 任务比较耗时,执行时间会变长),考虑到优化调度的时间,故而选择方案一
4 详细设计
4.1 job
4.1.1 job 状态
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | . | | . | . |
| finished[$]| . | . | . | . |
| started[^] | go_failure | failed | . | . |
| started[^] | go_success |finished | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
任务依赖性时,在执行 task 时,任一 task 异常,则标记此 job 异常
子任务
job1
|
+-------+-------+-------+
| | | |
task1 task2 task3 taskN
任务依赖
Job_2
|
task_A
/ \
v v
task_B task_C
| |
| task_D
| |
v v
task_E
4.1.2 交互流程
4.1.2.1 创建任务
/-------\
| start |
\---+---/
|
V
/------------------------------------------\
. Check whether the job_name does not exist . ----------+
\--------------------+---------------------/ |
| |
/--------------------V---------------------\ |
. Check whether the job_type exists . ----------|
\--------------------+---------------------/ |
| |
/--------------------V---------------------\ |
. Check whether the job_extra is valid . ----------|
\--------------------+---------------------/ |
| |
+-------------V-------------+ |
| create job | |
+-------------+-------------+ |
| |
/--------------------V---------------------\ |
. Is the job created successfully . ----------|
\--------------------+---------------------/ |
| |
+-------------V-------------+ |
| Push message to MQ | |
+-------------+-------------+ |
| |
+----------------------------------+
|
/---V---\
| end |
\-------/
4.1.2.2 执行任务
/-------\
| start |
\---+---/
|
+-------------------------+
| |
| V
| /------------------------------------------\
| . Is the exe_id valid . -----N----+
| \--------------------+---------------------/ |
| | |
| +-----------------V-----------------+ |
| | If the job has a lock, | |
| | the lock is renewed | |
| +-----------------+-----------------+ |
| | |
| +-----------------V-----------------+ |
| | Send related events to tasks that | |
| | have not reached the final state | |
| +-----------------+-----------------+ |
| | |
| /--------------------V---------------------\ |
| . failed_count == 0 . -----N----|
| \--------------------+---------------------/ |
| | |
| /--------------------V---------------------\ |
| . finished_count < total_task_num . -----N----|
| \--------------------+---------------------/ |
| | |
| /--------------------V---------------------\ |
| . Is not timeout? . -----N----|
| \--------------------+---------------------/ |
| | |
| +-------------V-------------+ |
| | exe_id = exe_id + 1 | |
| | Push message to MQ | |
| +-------------+-------------+ |
| | |
+-------------------------+ /---V---\
| end |
\-------/
由 task handler 进行消费 task 任务
(1) 设置 task 状态 (waiting->started)
(2) 执行 task 任务
(3) 设置 task 状态 (started->finished or started->failed)
4.1.3 数据库设计
CREATE TABLE `workflow_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
`job_client` varchar(64) NOT NULL COMMENT 'job client ip',
`job_namespace` varchar(64) NOT NULL COMMENT "namespace",
`job_reqid` varchar(64) NOT NULL COMMENT "butterfly reqid",
`job_name` varchar(64) NOT NULL COMMENT "job name",
`job_status` varchar(64) NOT NULL COMMENT "job status",
`job_type` varchar(64) NOT NULL COMMENT "job type, 插件名",
`ret_stat` varchar(64) NOT NULL COMMENT "ret stat",
`ret_data` text COMMENT "ret data",
`job_cost` double NOT NULL COMMENT "cost, 单位是秒",
`job_extra` varchar(2048) NOT NULL COMMENT "job extra 参数",
`job_timeout` int(11) NOT NULL COMMENT "timeout, 单位是秒",
`job_lock` varchar(64) NOT NULL COMMENT 'job lock name',
`exe_id` int(11) NOT NULL,
`operator` varchar(64) NOT NULL,
`is_valid` tinyint(1) NOT NULL COMMENT "是否有效",
`c_time` datetime NOT NULL COMMENT "create time",
`s_time` datetime NOT NULL COMMENT "start time",
`e_time` datetime NOT NULL COMMENT "end time",
PRIMARY KEY (`job_id`),
UNIQUE KEY `job_job_name` (`job_name`),
KEY `job_job_client` (`job_client`),
KEY `job_job_namespace` (`job_namespace`),
KEY `job_job_reqid` (`job_reqid`),
KEY `job_job_status` (`job_status`),
KEY `job_job_type` (`job_type`),
KEY `job_ret_stat` (`ret_stat`),
KEY `job_job_cost` (`job_cost`),
KEY `job_operator` (`operator`),
KEY `job_is_valid` (`is_valid`),
KEY `job_exe_id` (`exe_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT "星桥 job 表";
4.1.4 接口
详细看用户手册
/xingqiao/create_job 创建 job
/xingqiao/delete_job 删除 job
/xingqiao/retry_job 重试 job
/xingqiao/list_jobs job 列表
/xingqiao/get_job_detail job 详情
4.2 task
4.2.1 task 状态
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| failed[$] | go_waiting | waiting | . | . |
| pending | go_failure | failed | . | . |
| pending | go_running | started | . | . |
| started | go_failure | failed | . | . |
| started | go_success |finished | . | . |
| finished[$]| . | . | . | . |
| waiting[^] | go_failure | failed | . | . |
| waiting[^] | go_pending | pending | . | . |
+------------+------------+---------+----------+---------+
[^] 表示初始状态
[$] 表示终止状态
检查依赖 准备参数 结果校验
(go_pending) (go_running) (go_success)
即 taskflow 的状态正常流程为 waiting ---> pending ---> started ---> finished[$]
^ | | |
| | | V
| +----------+-------> failed[$]
| (go_failure) |
| |
+------------------------+
(go_waiting) retry
4.2.2 交互流程
4.2.3 数据库设计
CREATE TABLE `workflow_task` (
`task_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT "自增 id",
`job_id` bigint(20) NOT NULL COMMENT "job id",
`task_label` varchar(64) NOT NULL COMMENT "task 名字",
`task_reqid` varchar(64) NOT NULL COMMENT "task 在百川中的任务 id",
`task_cmd` varchar(128) NOT NULL COMMENT "handler path",
`task_params` varchar(1024) DEFAULT NULL COMMENT "task params, 若有值则是个 json",
`task_requires` varchar(1024) NOT NULL COMMENT "task 所需的参数: 即 arg1,arg2",
`task_provides` varchar(1024) NOT NULL COMMENT "task 可提供的结果: 即 result1,result2",
`task_dependencies` varchar(1024) NOT NULL COMMENT "task 依赖: 即 label1,label2",
`task_status` varchar(64) NOT NULL COMMENT "task 状态",
`task_cost` double NOT NULL COMMENT "task 耗时",
`task_extra` varchar(2048) NOT NULL COMMENT "task extra, 暂时没用",
`task_is_save` tinyint(1) NOT NULL COMMENT "是否保存结果到 job",
`ret_stat` varchar(64) NOT NULL COMMENT "tast ret stat",
`ret_data` text COMMENT "task ret data",
`task_retrymax` int(11) NOT NULL COMMENT "task retry max",
`task_retrycount` int(11) NOT NULL COMMENT "task retry count",
`task_timeout` int(11) NOT NULL COMMENT "task timeout",
`c_time` datetime NOT NULL COMMENT "create time",
`u_time` datetime NOT NULL COMMENT "update time",
PRIMARY KEY (`task_id`),
KEY `task_job_id` (`job_id`),
KEY `task_task_label` (`task_label`),
KEY `task_task_reqid` (`task_reqid`),
KEY `task_task_cmd` (`task_cmd`),
KEY `task_task_status` (`task_status`),
KEY `task_task_cost` (`task_cost`),
KEY `task_ret_stat` (`ret_stat`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT "星桥 task 表";
5 传送门
StackStorm/orquesta 基于图的工作流引擎
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
adage: https://github.com/yadage/adage
腾讯蓝鲸作业平台:https://github.com/Tencent/bk-job
阿里云 Serverless 工作流
spotify/luigi X1 在使用此组件
StackStorm/st2/st2actions
Last updated