异步任务 BaiChuan(百川)

百川汇聚终成海,大海泛波扬激情

即使是最小的水流,也能最终汇聚成浩瀚的海洋。

通过百川使得每个 handler 都可以贡献自己的一份力量。

1 部署及配置

1.1 开启百川自动消费任务

CACHES = {
    "baichuan": "redis://@localhost:6379/0?socket_timeout=2&socket_connect_timeout=0.2&retry_on_timeout=false",
    }

在 CACHES 中添加 baichuan 的 redis 连接配置,即开启百川 worker 消费任务

开启了百川 worker 消费任务的 butterfly 仅消费当前进程已有的 handler

1.2 部署百川 handler

将 examples/baichuan 拷贝到 butterfly/handlers/ 下即可

部署百川 handler 后,可通过 HTTP 请求发起异步任务

2 使用

2.1 百川消息状态流转图

queued-->started-->finished
            |
            V
          failed

2.2 接口

2.2.1 发布任务

方法

POST

参数

handler: (str) butterfly handler
params: (dict)

例子

$ curl -d '{"handler":"/demo_api/ping", "params":{}}' "http://127.0.0.1:8585/baichuan/task_create"
{"stat": "OK", "data": {"task_id": "AC3C301A7CA516EA"}}

2.2.2 任务查看

方法

GET

参数

task_id: (str) task id

例子

$ curl "http://127.0.0.1:8585/baichuan/task_get?task_id=xxxx"
{"stat": "OK", "task_info": {}}

响应

状态:
    "ERR_MSG_NOT_EXIST": 任务不存在
    "ERR_MSG_IN_QUEUED": 任务处于队列中,待消费
    "ERR_MSG_IN_STARTED": 任务正在执行中
    "ERR_MSG_EXE_FAILED": 任务执行失败,有可能是参数错误,也有可能是执行逻辑不符合预期
    "ERR_MSG_RESULT_FAILED": 任务返回数据不合法,不是个 json,应该没有此错误
    "OK": 执行成功

3 番外

3.1 使用 MQ 库进行发消息

3.1.1 仅发消息

import json

from xlib import db
from xlib.mq import Queue

# Redis connection
baichuan_connection = db.my_caches["baichuan"]
# Queue
mq_queue=Queue("/demo_api/hello", connection=baichuan_connection)

# 消息入队列
msg_dict = {"str_info": "ceshi"}
msg_data = json.dumps(msg_dict)
msg_obj=mq_queue.enqueue(msg_data)
print msg_obj.id

3.1.2 设置消息的保留时长

参数

# result_ttl=900 : 正常结果保留时长
msg_obj=mq_queue.enqueue(w_str, result_ttl=900)

3.1.3 重复消息跳过

若调用的是同样的参数的任务仍在队列中或者正在处理中,则进行跳过

mq_queue.enqueue(params_json, result_ttl=900, msg_create_unique="yes")

3.2 消息结果获取

3.2.1 例子

阻塞

count = 1

# 1m
while count < 12:
    if msg_obj.get_status() == "finished":
        # 获取数据
        print msg_obj.result <--------------------------"str"
        break

    if msg_obj.get_status() == "failed":
        raise ValueError("Get msg data failed")

    time.sleep(5)
    count = count + 1

非阻塞

# 执行任务时生成了 id
msg_id = msg_obj.get_id()
-----------------------------------------------------------------------------

# 获取结果
from xlib.mq import msg
from xlib import db

baichuan_connection = db.my_caches["baichuan"]


@funcattr.api
def msg_status(req, msg_id):
    if not msg.Msg.exists(msg_id, baichuan_connection):
        return "ERR_MSG_NOT_EXIST", {}, [(__info, __version)]

    my_msg = msg.Msg(msg_id, baichuan_connection)
    if  my_msg.get_status() == "queued":
        return "ERR_MSG_IN_QUEUED", {}, [(__info, __version)]
    if  my_msg.get_status() == "started":
        return "ERR_MSG_IN_STARTED", {}, [(__info, __version)]
    if  my_msg.get_status() == "failed":
        return "ERR_MSG_EXE_FAILED", {}, [(__info, __version)]
    if  my_msg.get_status() == "finished":
        try:
            msg_result = json.loads(my_msg.result)
        except:
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
        if "stat" not in msg_result.keys():
            return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]

        return data["stat"], {"data": msg_data["data"]}, [(__info, __version)]

3.2.2 获取消息

from xlib import db
from xlib.mq import msg
from xlib.mq import exceptions as mq_exceptions

baichuan_connection = db.my_caches["baichuan"]
msg_id = "a1116553-1e3d-4f48-a0b4-5a6d02897c4c"

try:
    msg_obj = msg.Msg.fetch(msg_id, connection=baichuan_connection)
except mq_exceptions.NoSuchMsgError:
    print "msg {msg_id} not exists".format(msg_id=msg_id)

cost=getattr(msg_obj, "cost", "-1")
result=msg_obj.result
status=msg_obj.get_status()

print cost
print result
print status

3.3 Msg 数据轮转

Redis 中的 Msg key 为

mq:msg:xxxx

可以 hgetall mq:msg:xxxx

3.3.1 处于 Queued 状态的 Msg

存储在 Redis 中的 Field,此时 key 没有过期时间

'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'

3.3.2 处于 Started 状态的 Msg

存储在 Redis 中的 Field,此时 key 没有过期时间

'origin'
'status'
'timeout'
'ip'
'created_at'
'enqueued_at'
'handle_worker'
'data'
----------------相比于 Queued 新增的字段
'cost'
'result_ttl'

3.3.3 处于 Finished/Failed 状态的 Msg

存储在 Redis 中的 Field,此时 key 有过期时间

'origin'        : '/paoding/bigkey_shard'
'status'        : 'finished'
'timeout'       : '180'
'ip'            : 'xx.xx.xx.xx'
'created_at'    : '2025-05-25T05:25:01.375784Z'
'enqueued_at'   : '2025-05-25T05:25:01.376349Z'
'handle_worker' : 'xxxxxxxx(主机名):5201:11083'
'data'          : '{"shard_id": 71148, "app_name": "xx"}'
------------------相比于 Queued 新增的字段
'cost'          : '116.321657'
'result_ttl'    : '86400'
------------------相比于 Started 新增的字段
'started_at'    : '2025-05-26T09:17:39.105436Z'       // 开始时间
'ended_at'      : '2025-05-26T09:19:35.429057Z'       // 结束时间
'result'        : '{"stat": "OK", "data":{}}'         // 返回结果, 通过 pickle dumps 和 loads

3.3.4 异常状态的 Msg

消息被删除,但是 handle_worker 又将数据写入

此时 key 仅有如下两个字段
['status', 'handle_worker']

Last updated