Last updated
Last updated
工作流可以做什么
顺序工作流 -- Sequential Workflow (强调的是顺序过程)
简单的说,工作流可以帮助我们更方便、可视化地管理一些日常操作,将零散操作流式化
工作流引擎提供了一个管理这些操作的平台,将以往零散的操作按照统一的规范管理起来,并将操作执行情况可视化的展现出来
并提供了一系列完善的人工干预接口,整体上提高了操作的规范性、效率、稳定性。
我们为什么需要工作流
工作流提供的规范化管理、状态可视化等特性,一方面能够很好的解决我们日常的零散操作脚本管理需求,将日常操作规范化
另一方面,工作流的可视化及相应的人工干预机制,极大的方便了日常操作,以前的脚本执行黑盒状态,通过工作流透明化
这些特性与我们的日常操作、报警处理等场景相结合,预期可以起到提高效率、提升操作稳定性的效果。
打造易用的工作流,需要考虑工作流应该和业务逻辑解耦
Job :指单个操作对应的任务集合,包含工作流定义及参数
Task:指工作流的单个步骤的执行流程
定时任务执行情况
仅使用 job
子任务(比如对集群的配置进行变更,对集群操作就是生成一个 job,对集群中的实例就是 task)
job + (n)task
任务依赖
多个 task 之间依赖(比如 task_A 依赖 task_B)
场景 1:分析 Redis 集群中的大 key(需要先拉取 RDB,然后进行分析 RDB 中的数据,最后进行汇总数据)
场景 2:分析 Redis 集群中的热 key(需要获取到各个分片的热 key 后,进行汇总分析)
场景 3:实例下线(需要先去掉上下游关联,然后进行销毁实例)
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
https://github.com/sailthru/stolos
https://github.com/openstack/mistral
Openstack taskflow
2.3.1.1 名词解释
Job:一次操作,如 service-upgrade 操作
Task: 完成一个 Job 所需执行的操作,如 service-upgrade 操作中需要进行 dervice-deploy 和 agent_command 两个操作
CTask: 完成一个 task 所需要执行的操作,如 service-upgrade 操作中的 agent_command 会触发 upgrade 和 config-changed 两个 hook,这既是 agent_command 的两个子 task。
job 的 3 种状态
new : 执行操作前,调用 create_job 创建一个 create_job,然后往里面添加 task
start: 自身已经执行,子节点未完成(蓝色)
done : 在 job 所有的 Task,Ctask 都为 done 后,Job 为 done
task 的 3 种状态
new : 执行操作前,调用 add_task 创建一个 task,然后异步发数据
start: 收到 mq 发来的数据,将 task 置为 start
done : 设置为 start 后,检查 CTask 是否都 done,如果为 done,将 task 置为 done
2.3.1.2 service 升级中的平台和 agent 通信流程
执行的命令点和 task 列表
task 列表:SELECT * FROM callback_task WHERE job_id = 46169
4568: service_deploy 操作,生成了 unit-deploy 的 CTask
4569: agent_command,生成了 upgrade 和 config-change 的 Ctask
字段说明
流程图
平台和 agent 涉及的 callback 流程
以 service_deplay 为例,平台会往 agent 发送 unit-deploy 命令,unit 执行命令后,会往平台发送请求,更改状态。
平台往 agent 发送命令
1 service 模块的 service_deploy 组装 unit-deploy 命令,调用 control 模块的 agent_command 方法
2 agent_command 执行以下动作:
在 job(46169) 中增加一个 task(4568)
调 ral 发送数据到 nmq
3 nmq 发送数据到 control 模块的 command_from_nmq 接口
4 control 模块的 command_from_nmq 执行以下动作:
增加一个子 task(4570),父 task 是 4568
调 trigeer 发送数据到 agent
agent 往平台发送命令
1 agent 执行完 unit-deploy 命令后,请求 callback 模块的 callback_from_agent 方法
2 callback_from_agent 调用 ral 将数据发送到 nmq
3 nmq 发送数据到 callback 模块的 callback_from_nmq,
4 callback 模块的 callback_from_nmq 接口执行以下操作:
更新 task 状态,从 new 到 start(taskid=4570)
执行回调中配置的 service::method 方法,这里执行的是 deploy::callback_deploy,在 callback_deploy 调用 unit_stat_transform 更新 unit 的状态。
根据执行结果检查 task 是否成功结束,若结束,检查父 task(4568)是否完成
若父 task 为 done,检查 job 是否为 done
2.3.1.3 疑问
什么情况下进行重试?
agent_command ==> nmq 时有 3 次重试
回调转成同步命令
平台往 agent 发送命令,agent 同步阻塞
Job 如何拆分为 task
一张 Job 表 (callback_job),一张 task 表 (callback_task)
Job 和 Task 以及 CTask 在数据库中如何体现
Job 中有个 task_list 字段,是个 list, 用于标记 task
Task 中有个 ctask_list 字段,是个 list ,用于标记子 task
添加 Task
客户端如何将异步的 job 转为同步请求
客户端发送命令时,会向平台发送 create_job 请求进行生成 job_id
然后进行定时 query_job 查询任务情况
2.3.2.1 mistral 简介
https://github.com/openstack/mistral
最初是由 Mirantis 公司贡献给 Openstack 社区的工作流组件,提供 Workflow As a Service 服务
类似 AWS 的 SWS(Simple Workflow Serivce),Hadoop 生态圈中的 oozie 服务
2.3.2.2 mistral 与 Heat 的关系
mistral 和 OpenStack 资源编排服务 Heat 不同,二者功能可能会有重叠,但 Heat 注重基础资源的编排,而 Mistral 则主要是用于任务编排。
Heat 的主要应用场景是创建租户基础资源模板,管理员可以创建一个资源模板,基于这个模板用户一次请求就可以完成虚拟机创建及配置、挂载数据卷、创建网络和路由、设置安全组等。
而 Mistral 的典型应用场景包括执行计划任务 Cloud Cron,调度任务 Task Scheduling,执行复杂的运行时间长的业务流程等。
2.3.3.1 前言
TaskFlow 是 OpenStack 中的一个 Python 库,主要目的是让 task(任务)执行更加容易可靠,能将轻量的任务对象组织成一个有序的流。
若未安装 taskflow 到环境中:
目前 TaskFlow 支持三种模式:
线性:运行一个任务或流的列表,是一个接一个串行方式运行。
无序:运行一个任务或流的列表,以并行的方式运行,顺序与列表顺序无关,任务之间不存在依赖关系。
图:运行一个图标(组节点和边缘节点)之间组成的任务 / 流依赖驱动的顺序。
2.3.3.2 任务的状态
就像任何其他的任务流系统一样,每个任务都有一些状态:PENDING RUNNING SUCCESS FAILURE,你也可以创建自定义的状态。
OpenStack taskflow 使用的 automaton
(1)工作流状态变更
taskflow/engines/action_engine/engine.py
即工作流状态变更时会通知到 notifier state 和 details 两个参数
(2)任务状态变更
通过代码 notifier 就是实现了个类似 pub/sub 的功能
Retry 是一个控制当错误发生时, 如何进行重试的特殊工作单元, 而且当你需要的时候还能够以其他参数来重试执行别的 Atom 子类. 常见类型:
AlwaysRevert: 错误发生时, 回滚子流
AlwaysRevertAll: 错误发生时, 回滚所有流
Times: 错误发生时, 重试子流
ForEach: 错误发生时, 为子流中的 Atom 提供一个新的值, 然后重试, 直到成功或 retry 中定义的值用完为止.
ParameterizedForEach: 错误发生时, 从后台存储(由 store 参数提供)中获取重试的值, 然后重试, 直到成功或后台存储中的值用完为止.
times
ForEach
feature
内部实现
添加任务 再执行的顺序
workflow 嵌套
Serverless 工作流(Serverless Workflow)是一个用来协调多个分布式任务执行的全托管云服务。
在 Serverless 工作流中,您可以用顺序、分支、并行等方式来编排分布式任务, Serverless 工作流会按照设定好的步骤可靠地协调任务执行,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑, 以确保工作流顺利完成。Serverless 工作流通过提供日志记录和审计来监视工作流的执行,方便您轻松地诊断和调试应用。 Serverless 工作流简化了开发和运行业务流程所需要的任务协调、状态管理以及错误处理等繁琐工作,让您聚焦业务逻辑开发。
下图描述了 Serverless 工作流如何协调分布式任务,这些任务可以是函数、已集成云服务 API、运行在虚拟机或容器上的程序
本图里的关键点:
1、可以对 Faas 函数进行编排,同步调用;
2、可以对Faas 作为胶水间接调用的服务进行编排;
3、可以把已有的 workflow 作为子流程进行标配;
4、可以通过**消息队列服务+ callback **对第三方服务进行编排;
5、可以通过Faas + callback 引入第三方审批环节;
6、用户还有个比较强烈的需要就是对于一些不会做callback 或者针对workflow进行改造的第三方服务,数据状态、任务执行状态依赖检查的编排;
例如 spark 任务,db 表数据;可以为了避免检查任务长时间运行,可以实现多次轮询函数进行检查。
2.3.5.1 产品优势
协调分布式组件
减少流程代码量
提高应用容错性
Serverless
2.3.5.2 功能特性
服务编排能力
协调分布式组件
内置错误处理
可视化监控
支持长时间运行流程
流程状态管理
2.3.5.3 名词解释
本文主要对Serverless工作流涉及的专有名词及术语进行定义及解析,方便您更好地理解相关概念并使用Serverless工作流。
Serverless工作流(Serverless Workflow)
分布式任务
流程(Flow)
步骤(Step)
父步骤
子步骤
任务步骤(task)
传递步骤(pass)
等待步骤(wait)
选择步骤(choice)
并行步骤(parallel)
并行循环步骤(foreach)
成功步骤(succeed)
失败步骤(fail)
流程定义语言FDL(Flow Definition Language)
定时调度
2.3.5.4 输入和输出
流程和步骤
通常,流程和步骤之间,流程的多个步骤之间需要传递数据。和函数式编程语言类似,FDL 的步骤类似于函数, 它接受输入(Input),并返回输出(Output),输出会保存在父步骤(调用者)的本地(Local)变量里。 其中,输入和输出的类型必须是 JSON 对象结构,本地变量的类型因步骤而异。 例如
任务步骤把调用函数计算函数的返回结果作为本地变量
并行步骤把它所有分支的输出(数组)作为本地变量。
步骤的输入、输出和本地变量总大小不能超过 32 KiB,否则会导致流程执行失败
2.3.5.5 技术挑战
高效:
1、 workflow 配置信息解析如何做到执行时高效
2、 每个步骤都会产生对应的事件作为对工作流的执行过程记录,执行事件QPS是工作流QPS的倍数
3、 回调模式如何快速激活暂停中的工作流
高可用:
1、 workflow 平台无损上线
复杂性:
1、 第三方服务;
2、 流程配置语法;
2.3.6.1 架构
(1) 创建任务
(2) 任务执行
(1) X1 task 架构
子任务状态
队列(3 个队列)
(2) worker
Worker 执行任务的 Message And Result
worker 消费 Message(伪代码)
(3) X1 task 优缺点
整体架构分为三部分
星桥:用于维护 job、task 状态机,以及 task 执行顺序
Redis 集群:作为消息队列使用
worker:task 具体执行单元
3.2.2.1 任务的关注点
task 即为 1 个 handler
3.2.2.2 任务的定义
task 参数
task 进度状态
task 执行流程
3.3.1.1 方案一:使用定时任务
创建任务后,同时注册一个定时任务
注意点:
3.3.1.2 方案二:使用消息队列
创建任务后,同时发一个消息到消息队列
注意点:
3.3.1.3 方案三:任务完成后进行回调
创建任务后,任务执行完成时进行回调
3.3.1.4 方案四:定时器 + 加锁
优点: 优点是实现简单
缺点: (1) 所有实例均在工作状态,对数据库会有压力
(2) 任务过多时,调度可能会出现瓶颈
3.3.1.5 折衷分析
方案一需要依赖定时任务,方案三通过 callback 方式不太容易管理,方案四任务过多时,调度可能会出现瓶颈
故使用方案二
3.3.2.1 方案一:Bubble Down
这个的想法就是先完成 task_A, 然后完成 task_B, task_C 等,是传统的执行方式
3.3.2.2 方案二:Bubble Up
The "Bubble Up" approach is the concept of "pulling" work through a pipe.
Bubble Up 是先定义结果,然后生成依赖
OSP 平台使用此模式
这个的想法就是完成 task_E 工作,需要先完成 task_B, task_C 等依赖,然后通过其子任务完成时进行检查其他子任务状态,然后变更总 job 状态
3.3.2.3 折衷分析
"Bubble Up" 类似于函数调用,task 与 task 的耦合比较紧,比较难以扩展,故使用 bubble-down 方式
一些操作类的检查耗时比较长,比如检查 Redis 的数据同步状态完成时间,如果数据比较大,是分钟以及小时以上,如果 task 一直进行处理此任务,则会导致服务耗到所有线程
所以需要有"一次执行,多次检查"的模式
3.3.3.1 方案一:task 中增加任务 review 状态
task 执行完成后,由对应的 review 任务进行检查结果,此任务可以多次执行
优点:状态比较明确
缺点:需要增加状态,另外定义 task 时需要配置 review 参数进行指定 review 方法
3.3.3.2 方案二:task 返回状态标识下次是否重新执行
task 执行完成时,当结果状态为 ERR_TASK_ING 时,则发起一个 "go_pending" 事件,将 task 状态置为 pending 状态
优点:无需进行添加 task 状态,只需添加一个约定(根据 task 执行状态则触发一个事件)
3.3.3.3 折衷分析
方案二对服务侵入低,方案一还会变更创建 task 方法,不方便兼容,故使用方案二
3.3.4.1 方案一:job_name 唯一性限制
job_name 添加唯一索引
3.3.4.2 方案二:job 执行时加锁
3.3.4.3 折衷分析
方案一可作为强限制(创建任务时,如果出现同 job_name,则创建任务失败)
方案二是处理时并发限制
场景:要对 Redis 实例进行迁移,需要控制分片内串行迁移,同一个集群可多个分片进行同时迁移(比如 5 个并发)
可以提供 1 个锁标识,比如集群名 (ceshi_name),同时提供个并发数 (5),程序内部生成 (ceshi_name:0, ceshi_name:1, ceshi_name:2, ceshi_name:3, ceshi_name:4) 锁
锁标识 + 并发度可以在 xingqiao plugin 中设置默认值
Task 任务在执行中,则此时整体 Job 超时,则此任务会丢失
备注: Task 任务都是在异步消费处理,在任务队列中保存 1 天有效时间
解决方法:若有正在执行中的任务,则等待此执行任务处理完成
waiting: 不再进行状态流转
pending: 不再进行状态流转
started: 仅处于 stared 状态的 task 可以流转
finished: 无需改动
failed: 不再进行状态流转(比如 ERR_TASK_ING 不再往 waiting 进行流转)
最后一个 Task 执行完成(stared --> finished) 时,则此时超时表明任务失败是得不偿失的
解决方法:判断任务是否完成放在判断任务是否超时前面
生成 task 所需要的参数 dict self.task_requires_dict 两种方式生成:
(1) 预设 task_params: 创建 task 任务时预置的 task 参数值
(2) 动态生成 self.task_requires_dict:
1> 在 job_extra 中获取参数数据
2> 在 task_extra 中获取参数数据
3> 若存在依赖任务:在依赖的 task 的 ret_data 中获取参数数据
4> 若有 all_taskdata 依赖参数,则将所有依赖的 ret_data 放到 all_taskdata 列表中
定义 job_notifier
触发 job_notifier
job 使用状态机实现
event 触发 job 状态机改变时发起 action(推送状态变更通知)
4.1.1 job 状态
任务依赖性时,在执行 task 时,任一 task 异常,则标记此 job 异常
子任务
任务依赖
4.1.2.1 创建任务
4.1.2.2 执行任务
由 task handler 进行消费 task 任务
详细看用户手册
/xingqiao/create_job 创建 job
/xingqiao/delete_job 删除 job
/xingqiao/retry_job 重试 job
/xingqiao/list_jobs job 列表
/xingqiao/get_job_detail job 详情
4.2.1 task 状态
airflow: http://airflow.apache.org/
go-workflow: https://github.com/go-workflow/go-workflow
Amazon Simple Workflow Service (SWF)
adage: https://github.com/yadage/adage
腾讯蓝鲸作业平台:https://github.com/Tencent/bk-job
阿里云 Serverless 工作流
StackStorm/st2/st2actions
\
(百度云预案平台使用的此 workflow)
(比较轻量)
基于图的工作流引擎
X1 在使用此组件
:依赖
(go)
4568
46169
0
[4570]
946
Control::send2mq
done
yes
0
1407594530
1407594530
0
{"reaction":[]}
4569
46169
0
[4571,4572]
946
Control::send2mq
done
yes
0
1407594530
1407594541
0
{"reaction":[]}
4570
46169
4568
[]
946
unit-deploy
done
yes
0
1407594530
1407594530
0
{"reaction":[]}
4571
46169
4569
[]
946
upgrade
done
yes
0
1407594530
1407594539
0
{"reaction":[]}
4572
46169
4569
[]
946
config-changed
done
yes
0
1407594530
1407594541
0
{"reaction":[]}
三五星桥连月朔,万千灯火彻天街。