运筹决策 BaiCe(百策)
运筹决策 BaiCe(百策)
1 系统设计
+-----------------------------------------------------------+
| +source-------+                                           |
| | +---------+ |                                           |
| | | bianque | |       +-----------+        +------------+ |
| | +---------+ | event |           | action |            | |
| |             +------>|   baice   +------->|  xingqiao  | |
| | +---------+ |       |           |        |            | |
| | | xxx     | |       +-----+-----+        +------------+ |
| | +---------+ |             |                             |
| +-------------+             |                             |
|                       +-----V-----+                       |
|                       |    DB     |                       |
|                       +-----------+                       |
+-----------------------------------------------------------+1.1 存储 cloudevent
将 cloudevent 存储
1.2 发布事件
发布 cloudevent create 事件
发布 cloudevent 数据变化事件
1.3 执行决策(插件)
执行相应插件,发起 action(可以创建星桥 workflow,也可以直接执行某个任务)
局部决策:
监听 source create 事件, 进行决策
监听 source 数据变化事件, 进行决策(IFTTT: if this than that)
全局决策
定时任务
可以设置冷却期,冷却期间不再发 action 任务
2 上下游
                                                           | 外部输入
+----------------------------------------------------------|---------------------------------------+
|                                +-------------------------V------------------------------+        |
| +ruqi-----------------------+  |  +baichuan-----------+     +callback----------------+  |        |
| |     target + cron         |  |  |                   |     |                        |  |        |
| +------------+--------------+  |  +---------+---------+     +------------+-----------+  |        |
|              |                 +------------+----------------------------+--------------+        |
|              |                              |                            |                       |
|              |1a                            |1b                          |1c                     |
|              V                              V                            V                       |
| +----------------------------------------------------------------------------------------------+ |
| |                               queue                                                          | |
| +------------------^------------------------------^-------------------------------^------------+ |
|                    |2                             |3                              |4             |
| +bianque_agent-----+--------------+  +baice-------+--------------+  +xingqiao-----+------------+ |
| | +plugin-----------------------+ |  |+-------++-------++-------+|  |+plugin------------------+| | 故障自愈
| | | detection + trigger         | |  || baice || baice || baice ||  ||unit_migrate/delete/copy|| |
| | +-----------------------------+ |  |+-------++-------++-------+|  ||hotkey/bigkey           || |
| +---------------------------------+  +------------+--------------|  ||...                     || |
|                                                   |                 |+------------------------+| |
|                                      +DB----------V--------------+  +--------------------------+ |
|                                      |                           |                               |
|                                      +---------------------------+                               |
|                                                                                                  |
|         <sense + trigger>                  <save + decision>                  <execute>          |
+--------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------+
| +------------+    +------------+    +------------+    +------------+    +------------+           |
| |   huoyan   |    |    ruqi    |    |   wuxing   |    |  baichuan  |    |  xingqiao  |           | 通用服务
| +------------+    +------------+    +------------+    +------------+    +------------+           |
+--------------------------------------------------------------------------------------------------+
〖扁鹊〗产生一个 event(cloudevent),将其 push 到消息队列中
〖百策〗将 event 进行存储,并根据 rule 规则生成任务
存储 cloudevent
发布 cloudevent create 事件
发布 cloudevent 数据变化事件
〖星桥〗工作流进行执行具体任务
3 实践
3.1 volume_resize_demo
import logging
from xlib import db
from xlib.util import http_util
from handlers.baice import base_rule
baichuan_cache = db.my_caches["baichuan"]
# 300s 内同一个 event_target 仅能触发一次操作
rate_limit = baichuan_cache.rate_limit('volume_resize', limit=1, per=300)
logger = logging.getLogger("plugin")
class Rule(base_rule.BaseRule):
    """
    Rule
    """
    def __init__(self):
        self.endpoint = "http://<ip>:<port>"
    def decision(self, event_obj, event_count):
        """
        决策
        event_obj.data
            {
                "data":{
                    "volume_cursize_in_gb":600
                    "volume_id":"v-F5RIqyMQ"
                }
                "mnt_disk_free_value":48.3838726679484
            }
        Returns:
            action_result
        """
        if rate_limit.limit(event_obj.event_target):
            logger.info("event_target={}, event_count={}, status=skip, msg=rate_limit".format(
                event_obj.event_target, event_count))
            return {"stat": "ERR_RATE_LIMIE"}
        return self._action(event_obj)
    def _action(self, event_obj):
        """
        执行
        """
        data = {
            "job_namespace": "scs",
            "job_type": "volume_resize",
            "job_extra": {
                "region": event_obj.event_region,
                "host_uuid": event_obj.event_target,
                "volume_incrsize_in_gb": 50,
                "volume_cursize_in_gb": event_obj.data["data"]["volume_cursize_in_gb"]
            }
        }
        res = http_util.post_json("{endpoint}/xingqiao/create_job".format(endpoint=self.endpoint),
                                  data=data,
                                  check_key="stat",
                                  check_value="OK"
                                  )
        if not res.success():
            return {"stat": "ERR_CREATE_JOB_FAILED"}
        return {"stat": "OK", "job_id": res.output()["job_id"]}
def setup(app):
    """
    插件注册函数
    """
    app.register_formatter(
        # 事件来源
        source="scs_volume_free_check",
        # 水平触发(level) 和 边缘触发(edge) 方式
        watch_method="level",
        # 事件状态
        watch_status_list=["ERR_VOLUME_FREE_TOO_SMALL"],
        formatter=Rule)Last updated