运筹决策 BaiCe(百策)

运筹决策 BaiCe(百策)

1 系统设计

+-----------------------------------------------------------+
| +source-------+                                           |
| | +---------+ |                                           |
| | | bianque | |       +-----------+        +------------+ |
| | +---------+ | event |           | action |            | |
| |             +------>|   baice   +------->|  xingqiao  | |
| | +---------+ |       |           |        |            | |
| | | xxx     | |       +-----+-----+        +------------+ |
| | +---------+ |             |                             |
| +-------------+             |                             |
|                       +-----V-----+                       |
|                       |    DB     |                       |
|                       +-----------+                       |
+-----------------------------------------------------------+

1.1 存储 cloudevent

将 cloudevent 存储

1.2 发布事件

  • 发布 cloudevent create 事件

  • 发布 cloudevent 数据变化事件

1.3 执行决策(插件)

执行相应插件,发起 action(可以创建星桥 workflow,也可以直接执行某个任务)

  • 局部决策:

    • 监听 source create 事件, 进行决策

    • 监听 source 数据变化事件, 进行决策(IFTTT: if this than that)

  • 全局决策

    • 定时任务

可以设置冷却期,冷却期间不再发 action 任务

2 上下游

                                                           | 外部输入
+----------------------------------------------------------|---------------------------------------+
|                                +-------------------------V------------------------------+        |
| +ruqi-----------------------+  |  +baichuan-----------+     +callback----------------+  |        |
| |     target + cron         |  |  |                   |     |                        |  |        |
| +------------+--------------+  |  +---------+---------+     +------------+-----------+  |        |
|              |                 +------------+----------------------------+--------------+        |
|              |                              |                            |                       |
|              |1a                            |1b                          |1c                     |
|              V                              V                            V                       |
| +----------------------------------------------------------------------------------------------+ |
| |                               queue                                                          | |
| +------------------^------------------------------^-------------------------------^------------+ |
|                    |2                             |3                              |4             |
| +bianque_agent-----+--------------+  +baice-------+--------------+  +xingqiao-----+------------+ |
| | +plugin-----------------------+ |  |+-------++-------++-------+|  |+plugin------------------+| | 故障自愈
| | | detection + trigger         | |  || baice || baice || baice ||  ||unit_migrate/delete/copy|| |
| | +-----------------------------+ |  |+-------++-------++-------+|  ||hotkey/bigkey           || |
| +---------------------------------+  +------------+--------------|  ||...                     || |
|                                                   |                 |+------------------------+| |
|                                      +DB----------V--------------+  +--------------------------+ |
|                                      |                           |                               |
|                                      +---------------------------+                               |
|                                                                                                  |
|         <sense + trigger>                  <save + decision>                  <execute>          |
+--------------------------------------------------------------------------------------------------+

+--------------------------------------------------------------------------------------------------+
| +------------+    +------------+    +------------+    +------------+    +------------+           |
| |   huoyan   |    |    ruqi    |    |   wuxing   |    |  baichuan  |    |  xingqiao  |           | 通用服务
| +------------+    +------------+    +------------+    +------------+    +------------+           |
+--------------------------------------------------------------------------------------------------+
  • 〖扁鹊〗产生一个 event(cloudevent),将其 push 到消息队列中

  • 〖百策〗将 event 进行存储,并根据 rule 规则生成任务

    • 存储 cloudevent

    • 发布 cloudevent create 事件

    • 发布 cloudevent 数据变化事件

  • 〖星桥〗工作流进行执行具体任务

3 实践

3.1 volume_resize_demo

import logging

from xlib import db
from xlib.util import http_util
from handlers.baice import base_rule

baichuan_cache = db.my_caches["baichuan"]
# 300s 内同一个 event_target 仅能触发一次操作
rate_limit = baichuan_cache.rate_limit('volume_resize', limit=1, per=300)

logger = logging.getLogger("plugin")


class Rule(base_rule.BaseRule):
    """
    Rule
    """

    def __init__(self):
        self.endpoint = "http://<ip>:<port>"

    def decision(self, event_obj, event_count):
        """
        决策

        event_obj.data
            {
                "data":{
                    "volume_cursize_in_gb":600
                    "volume_id":"v-F5RIqyMQ"
                }
                "mnt_disk_free_value":48.3838726679484
            }

        Returns:
            action_result
        """
        if rate_limit.limit(event_obj.event_target):
            logger.info("event_target={}, event_count={}, status=skip, msg=rate_limit".format(
                event_obj.event_target, event_count))
            return {"stat": "ERR_RATE_LIMIE"}

        return self._action(event_obj)

    def _action(self, event_obj):
        """
        执行
        """
        data = {
            "job_namespace": "scs",
            "job_type": "volume_resize",
            "job_extra": {
                "region": event_obj.event_region,
                "host_uuid": event_obj.event_target,
                "volume_incrsize_in_gb": 50,
                "volume_cursize_in_gb": event_obj.data["data"]["volume_cursize_in_gb"]
            }
        }
        res = http_util.post_json("{endpoint}/xingqiao/create_job".format(endpoint=self.endpoint),
                                  data=data,
                                  check_key="stat",
                                  check_value="OK"
                                  )
        if not res.success():
            return {"stat": "ERR_CREATE_JOB_FAILED"}

        return {"stat": "OK", "job_id": res.output()["job_id"]}


def setup(app):
    """
    插件注册函数
    """
    app.register_formatter(
        # 事件来源
        source="scs_volume_free_check",
        # 水平触发(level) 和 边缘触发(edge) 方式
        watch_method="level",
        # 事件状态
        watch_status_list=["ERR_VOLUME_FREE_TOO_SMALL"],
        formatter=Rule)

Last updated