🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • 1 部署及配置
  • 1.1 开启百川自动消费任务
  • 1.2 部署百川 handler
  • 2 使用
  • 2.1 百川消息状态流转图
  • 2.2 接口
  • 3 番外
  • 3.1 使用 MQ 库进行发消息
  • 3.2 消息结果获取
  • 3.3 Msg 数据轮转
  1. 四 应用(通用服务)

异步任务 BaiChuan(百川)

百川汇聚终成海,大海泛波扬激情

即使是最小的水流,也能最终汇聚成浩瀚的海洋。

通过百川使得每个 handler 都可以贡献自己的一份力量。

1 部署及配置

1.1 开启百川自动消费任务

CACHES = {
    "baichuan": "redis://@localhost:6379/0?socket_timeout=2&socket_connect_timeout=0.2&retry_on_timeout=false",
    }

在 CACHES 中添加 baichuan 的 redis 连接配置,即开启百川 worker 消费任务

开启了百川 worker 消费任务的 butterfly 仅消费当前进程已有的 handler

1.2 部署百川 handler

将 examples/baichuan 拷贝到 butterfly/handlers/ 下即可

部署百川 handler 后,可通过 HTTP 请求发起异步任务

2 使用

2.1 百川消息状态流转图

queued-->started-->finished
            |
            V
          failed

2.2 接口

2.2.1 发布任务

方法

POST

参数

handler: (str) butterfly handler
params: (dict)

例子

$ curl -d '{"handler":"/demo_api/ping", "params":{}}' "http://127.0.0.1:8585/baichuan/task_create"
{"stat": "OK", "data": {"task_id": "AC3C301A7CA516EA"}}

2.2.2 任务查看

方法

GET

参数

task_id: (str) task id

例子

$ curl "http://127.0.0.1:8585/baichuan/task_get?task_id=xxxx"
{"stat": "OK", "task_info": {}}

响应

状态:
    "ERR_MSG_NOT_EXIST": 任务不存在
    "ERR_MSG_IN_QUEUED": 任务处于队列中,待消费
    "ERR_MSG_IN_STARTED": 任务正在执行中
    "ERR_MSG_EXE_FAILED": 任务执行失败,有可能是参数错误,也有可能是执行逻辑不符合预期
    "ERR_MSG_RESULT_FAILED": 任务返回数据不合法,不是个 json,应该没有此错误
    "OK": 执行成功

3 番外

3.1 使用 MQ 库进行发消息

3.1.1 仅发消息

import json

from xlib import db
from xlib.mq import Queue

# Redis connection
baichuan_connection = db.my_caches["baichuan"]
# Queue
mq_queue=Queue("/demo_api/hello", connection=baichuan_connection)

# 消息入队列
msg_dict = {"str_info": "ceshi"}
msg_data = json.dumps(msg_dict)
msg_obj=mq_queue.enqueue(msg_data)
print msg_obj.id

3.1.2 设置消息的保留时长

参数

# result_ttl=900 : 正常结果保留时长
msg_obj=mq_queue.enqueue(w_str, result_ttl=900)

3.1.3 重复消息跳过

若调用的是同样的参数的任务仍在队列中或者正在处理中,则进行跳过

mq_queue.enqueue(params_json, result_ttl=900, msg_create_unique="yes")

3.2 消息结果获取

3.2.1 例子

阻塞

count = 1

# 1m
while count < 12:
    if msg_obj.get_status() == "finished":
        # 获取数据
        print msg_obj.result <--------------------------"str"
        break

    if msg_obj.get_status() == "failed":
        raise ValueError("Get msg data failed")

    time.sleep(5)
    count = count + 1

非阻塞

# 执行任务时生成了 id
msg_id = msg_obj.get_id()
-----------------------------------------------------------------------------

# 获取结果
from xlib.mq import msg
from xlib import db

baichuan_connection = db.my_caches["baichuan"]


@funcattr.api
def msg_status(req, msg_id):
    if not msg.Msg.exists(msg_id, baichuan_connection):
        return "ERR_MSG_NOT_EXIST", {}, [(__info, __version)]

    my_msg = msg.Msg(msg_id, baichuan_connection)
    if  my_msg.get_status() == "queued":
        return "ERR_MSG_IN_QUEUED", {}, [(__info, __version)]
    if  my_msg.get_status() == "started":
        return "ERR_MSG_IN_STARTED", {}, [(__info, __version)]
    if  my_msg.get_status() == "failed":
        return "ERR_MSG_EXE_FAILED", {}, [(__info, __version)]
    if  my_msg.get_status() == "finished":
        try:
            msg_result = json.loads(my_msg.result)
        except:
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
        if "stat" not in msg_result.keys():
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]

        return data["stat"], {"data": msg_data["data"]}, [(__info, __version)]

3.2.2 获取消息

from xlib import db
from xlib.mq import msg
from xlib.mq import exceptions as mq_exceptions

baichuan_connection = db.my_caches["baichuan"]
msg_id = "a1116553-1e3d-4f48-a0b4-5a6d02897c4c"

try:
    msg_obj = msg.Msg.fetch(msg_id, connection=baichuan_connection)
except mq_exceptions.NoSuchMsgError:
    print "msg {msg_id} not exists".format(msg_id=msg_id)

cost=getattr(msg_obj, "cost", "-1")
result=msg_obj.result
status=msg_obj.get_status()

print cost
print result
print status

3.3 Msg 数据轮转

Redis 中的 Msg key 为

mq:msg:xxxx

可以 hgetall mq:msg:xxxx

3.3.1 处于 Queued 状态的 Msg

存储在 Redis 中的 Field,此时 key 没有过期时间

'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'

3.3.2 处于 Started 状态的 Msg

存储在 Redis 中的 Field,此时 key 没有过期时间

'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'
----------------相比于 Queued 新增的字段
'cost'
'result_ttl'

3.3.3 处于 Finished/Failed 状态的 Msg

存储在 Redis 中的 Field,此时 key 有过期时间

'origin'        : '/paoding/bigkey_shard'
'status'        : 'finished'
'timeout'       : '180'
'ip'            : 'xx.xx.xx.xx'
'created_at'    : '2025-05-25T05:25:01.375784Z'
'enqueued_at'   : '2025-05-25T05:25:01.376349Z'
'handle_worker' : 'xxxxxxxx(主机名):5201:11083'
'data'          : '{"shard_id": 71148, "app_name": "xx"}'
------------------相比于 Queued 新增的字段
'cost'          : '116.321657'
'result_ttl'    : '86400'
------------------相比于 Started 新增的字段
'started_at'    : '2025-05-26T09:17:39.105436Z'       // 开始时间
'ended_at'      : '2025-05-26T09:19:35.429057Z'       // 结束时间
'result'        : '{"stat": "OK", "data":{}}'         // 返回结果, 通过 pickle dumps 和 loads

3.3.4 异常状态的 Msg

消息被删除,但是 handle_worker 又将数据写入

此时 key 仅有如下两个字段
['status', 'handle_worker']
PreviousAPI JSON 规范Next任务调度 RuQi(如期)

Last updated 11 days ago