运筹决策 BaiCe(百策)
运筹决策 BaiCe(百策)
1 系统设计
+-----------------------------------------------------------+
| +source-------+ |
| | +---------+ | |
| | | bianque | | +-----------+ +------------+ |
| | +---------+ | event | | action | | |
| | +------>| baice +------->| xingqiao | |
| | +---------+ | | | | | |
| | | xxx | | +-----+-----+ +------------+ |
| | +---------+ | | |
| +-------------+ | |
| +-----V-----+ |
| | DB | |
| +-----------+ |
+-----------------------------------------------------------+
1.1 存储 cloudevent
将 cloudevent 存储
1.2 发布事件
发布 cloudevent create 事件
发布 cloudevent 数据变化事件
1.3 执行决策(插件)
执行相应插件,发起 action(可以创建星桥 workflow,也可以直接执行某个任务)
局部决策:
监听 source create 事件, 进行决策
监听 source 数据变化事件, 进行决策(IFTTT: if this than that)
全局决策
定时任务
可以设置冷却期,冷却期间不再发 action 任务
2 上下游
| 外部输入
+----------------------------------------------------------|---------------------------------------+
| +-------------------------V------------------------------+ |
| +ruqi-----------------------+ | +baichuan-----------+ +callback----------------+ | |
| | target + cron | | | | | | | |
| +------------+--------------+ | +---------+---------+ +------------+-----------+ | |
| | +------------+----------------------------+--------------+ |
| | | | |
| |1a |1b |1c |
| V V V |
| +----------------------------------------------------------------------------------------------+ |
| | queue | |
| +------------------^------------------------------^-------------------------------^------------+ |
| |2 |3 |4 |
| +bianque_agent-----+--------------+ +baice-------+--------------+ +xingqiao-----+------------+ |
| | +plugin-----------------------+ | |+-------++-------++-------+| |+plugin------------------+| | 故障自愈
| | | detection + trigger | | || baice || baice || baice || ||unit_migrate/delete/copy|| |
| | +-----------------------------+ | |+-------++-------++-------+| ||hotkey/bigkey || |
| +---------------------------------+ +------------+--------------| ||... || |
| | |+------------------------+| |
| +DB----------V--------------+ +--------------------------+ |
| | | |
| +---------------------------+ |
| |
| <sense + trigger> <save + decision> <execute> |
+--------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------+
| +------------+ +------------+ +------------+ +------------+ +------------+ |
| | huoyan | | ruqi | | wuxing | | baichuan | | xingqiao | | 通用服务
| +------------+ +------------+ +------------+ +------------+ +------------+ |
+--------------------------------------------------------------------------------------------------+
〖扁鹊〗产生一个 event(cloudevent),将其 push 到消息队列中
〖百策〗将 event 进行存储,并根据 rule 规则生成任务
存储 cloudevent
发布 cloudevent create 事件
发布 cloudevent 数据变化事件
〖星桥〗工作流进行执行具体任务
3 实践
3.1 volume_resize_demo
import logging
from xlib import db
from xlib.util import http_util
from handlers.baice import base_rule
baichuan_cache = db.my_caches["baichuan"]
# 300s 内同一个 event_target 仅能触发一次操作
rate_limit = baichuan_cache.rate_limit('volume_resize', limit=1, per=300)
logger = logging.getLogger("plugin")
class Rule(base_rule.BaseRule):
"""
Rule
"""
def __init__(self):
self.endpoint = "http://<ip>:<port>"
def decision(self, event_obj, event_count):
"""
决策
event_obj.data
{
"data":{
"volume_cursize_in_gb":600
"volume_id":"v-F5RIqyMQ"
}
"mnt_disk_free_value":48.3838726679484
}
Returns:
action_result
"""
if rate_limit.limit(event_obj.event_target):
logger.info("event_target={}, event_count={}, status=skip, msg=rate_limit".format(
event_obj.event_target, event_count))
return {"stat": "ERR_RATE_LIMIE"}
return self._action(event_obj)
def _action(self, event_obj):
"""
执行
"""
data = {
"job_namespace": "scs",
"job_type": "volume_resize",
"job_extra": {
"region": event_obj.event_region,
"host_uuid": event_obj.event_target,
"volume_incrsize_in_gb": 50,
"volume_cursize_in_gb": event_obj.data["data"]["volume_cursize_in_gb"]
}
}
res = http_util.post_json("{endpoint}/xingqiao/create_job".format(endpoint=self.endpoint),
data=data,
check_key="stat",
check_value="OK"
)
if not res.success():
return {"stat": "ERR_CREATE_JOB_FAILED"}
return {"stat": "OK", "job_id": res.output()["job_id"]}
def setup(app):
"""
插件注册函数
"""
app.register_formatter(
# 事件来源
source="scs_volume_free_check",
# 水平触发(level) 和 边缘触发(edge) 方式
watch_method="level",
# 事件状态
watch_status_list=["ERR_VOLUME_FREE_TOO_SMALL"],
formatter=Rule)
Last updated