异步任务 BaiChuan(百川)
1 部署及配置
1.1 开启百川自动消费任务
CACHES = {
    "baichuan": "redis://@localhost:6379/0?socket_timeout=2&socket_connect_timeout=0.2&retry_on_timeout=false",
    }在 CACHES 中添加 baichuan 的 redis 连接配置,即开启百川 worker 消费任务
开启了百川 worker 消费任务的 butterfly 仅消费当前进程已有的 handler
1.2 部署百川 handler
将 examples/baichuan 拷贝到 butterfly/handlers/ 下即可部署百川 handler 后,可通过 HTTP 请求发起异步任务
2 使用
2.1 百川消息状态流转图
queued-->started-->finished
            |
            V
          failed2.2 接口
2.2.1 发布任务
方法
POST参数
handler: (str) butterfly handler
params: (dict)例子
$ curl -d '{"handler":"/demo_api/ping", "params":{}}' "http://127.0.0.1:8585/baichuan/task_create"
{"stat": "OK", "data": {"task_id": "AC3C301A7CA516EA"}}2.2.2 任务查看
方法
GET参数
task_id: (str) task id例子
$ curl "http://127.0.0.1:8585/baichuan/task_get?task_id=xxxx"
{"stat": "OK", "task_info": {}}响应
状态:
    "ERR_MSG_NOT_EXIST": 任务不存在
    "ERR_MSG_IN_QUEUED": 任务处于队列中,待消费
    "ERR_MSG_IN_STARTED": 任务正在执行中
    "ERR_MSG_EXE_FAILED": 任务执行失败,有可能是参数错误,也有可能是执行逻辑不符合预期
    "ERR_MSG_RESULT_FAILED": 任务返回数据不合法,不是个 json,应该没有此错误
    "OK": 执行成功3 番外
3.1 使用 MQ 库进行发消息
3.1.1 仅发消息
import json
from xlib import db
from xlib.mq import Queue
# Redis connection
baichuan_connection = db.my_caches["baichuan"]
# Queue
mq_queue=Queue("/demo_api/hello", connection=baichuan_connection)
# 消息入队列
msg_dict = {"str_info": "ceshi"}
msg_data = json.dumps(msg_dict)
msg_obj=mq_queue.enqueue(msg_data)
print msg_obj.id3.1.2 设置消息的保留时长
参数
# result_ttl=900 : 正常结果保留时长
msg_obj=mq_queue.enqueue(w_str, result_ttl=900)3.1.3 重复消息跳过
若调用的是同样的参数的任务仍在队列中或者正在处理中,则进行跳过
mq_queue.enqueue(params_json, result_ttl=900, msg_create_unique="yes")3.2 消息结果获取
3.2.1 例子
阻塞
count = 1
# 1m
while count < 12:
    if msg_obj.get_status() == "finished":
        # 获取数据
        print msg_obj.result <--------------------------"str"
        break
    if msg_obj.get_status() == "failed":
        raise ValueError("Get msg data failed")
    time.sleep(5)
    count = count + 1非阻塞
# 执行任务时生成了 id
msg_id = msg_obj.get_id()
-----------------------------------------------------------------------------
# 获取结果
from xlib.mq import msg
from xlib import db
baichuan_connection = db.my_caches["baichuan"]
@funcattr.api
def msg_status(req, msg_id):
    if not msg.Msg.exists(msg_id, baichuan_connection):
        return "ERR_MSG_NOT_EXIST", {}, [(__info, __version)]
    my_msg = msg.Msg(msg_id, baichuan_connection)
    if  my_msg.get_status() == "queued":
        return "ERR_MSG_IN_QUEUED", {}, [(__info, __version)]
    if  my_msg.get_status() == "started":
        return "ERR_MSG_IN_STARTED", {}, [(__info, __version)]
    if  my_msg.get_status() == "failed":
        return "ERR_MSG_EXE_FAILED", {}, [(__info, __version)]
    if  my_msg.get_status() == "finished":
        try:
            msg_result = json.loads(my_msg.result)
        except:
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
        if "stat" not in msg_result.keys():
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
        return data["stat"], {"data": msg_data["data"]}, [(__info, __version)]3.2.2 获取消息
from xlib import db
from xlib.mq import msg
from xlib.mq import exceptions as mq_exceptions
baichuan_connection = db.my_caches["baichuan"]
msg_id = "a1116553-1e3d-4f48-a0b4-5a6d02897c4c"
try:
    msg_obj = msg.Msg.fetch(msg_id, connection=baichuan_connection)
except mq_exceptions.NoSuchMsgError:
    print "msg {msg_id} not exists".format(msg_id=msg_id)
cost=getattr(msg_obj, "cost", "-1")
result=msg_obj.result
status=msg_obj.get_status()
print cost
print result
print status3.3 Msg 数据轮转
Redis 中的 Msg key 为
3.3.1 处于 Queued 状态的 Msg
存储在 Redis 中的 Field,此时 key 没有过期时间
'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'3.3.2 处于 Started 状态的 Msg
存储在 Redis 中的 Field,此时 key 没有过期时间
'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'
----------------相比于 Queued 新增的字段
'cost'
'result_ttl'3.3.3 处于 Finished/Failed 状态的 Msg
存储在 Redis 中的 Field,此时 key 有过期时间
'origin'        : '/paoding/bigkey_shard'
'status'        : 'finished'
'timeout'       : '180'
'ip'            : 'xx.xx.xx.xx'
'created_at'    : '2025-05-25T05:25:01.375784Z'
'enqueued_at'   : '2025-05-25T05:25:01.376349Z'
'handle_worker' : 'xxxxxxxx(主机名):5201:11083'
'data'          : '{"shard_id": 71148, "app_name": "xx"}'
------------------相比于 Queued 新增的字段
'cost'          : '116.321657'
'result_ttl'    : '86400'
------------------相比于 Started 新增的字段
'started_at'    : '2025-05-26T09:17:39.105436Z'       // 开始时间
'ended_at'      : '2025-05-26T09:19:35.429057Z'       // 结束时间
'result'        : '{"stat": "OK", "data":{}}'         // 返回结果, 通过 pickle dumps 和 loads3.3.4 异常状态的 Msg
消息被删除,但是 handle_worker 又将数据写入
此时 key 仅有如下两个字段
['status', 'handle_worker']Last updated