异步任务 BaiChuan(百川)
百川汇聚终成海,大海泛波扬激情
即使是最小的水流,也能最终汇聚成浩瀚的海洋。通过百川使得每个 handler 都可以贡献自己的一份力量。
1 部署及配置
1.1 开启百川自动消费任务
CACHES = {
"baichuan": "redis://@localhost:6379/0?socket_timeout=2&socket_connect_timeout=0.2&retry_on_timeout=false",
}
在 CACHES 中添加 baichuan 的 redis 连接配置,即开启百川 worker 消费任务
开启了百川 worker 消费任务的 butterfly 仅消费当前进程已有的 handler
1.2 部署百川 handler
将 examples/baichuan 拷贝到 butterfly/handlers/ 下即可
部署百川 handler 后,可通过 HTTP 请求发起异步任务
2 使用
2.1 百川消息状态流转图
queued-->started-->finished
|
V
failed
2.2 接口
2.2.1 发布任务
方法
POST
参数
handler: (str) butterfly handler
params: (dict)
例子
$ curl -d '{"handler":"/demo_api/ping", "params":{}}' "http://127.0.0.1:8585/baichuan/task_create"
{"stat": "OK", "data": {"task_id": "AC3C301A7CA516EA"}}
2.2.2 任务查看
方法
GET
参数
task_id: (str) task id
例子
$ curl "http://127.0.0.1:8585/baichuan/task_get?task_id=xxxx"
{"stat": "OK", "task_info": {}}
响应
状态:
"ERR_MSG_NOT_EXIST": 任务不存在
"ERR_MSG_IN_QUEUED": 任务处于队列中,待消费
"ERR_MSG_IN_STARTED": 任务正在执行中
"ERR_MSG_EXE_FAILED": 任务执行失败,有可能是参数错误,也有可能是执行逻辑不符合预期
"ERR_MSG_RESULT_FAILED": 任务返回数据不合法,不是个 json,应该没有此错误
"OK": 执行成功
3 番外
3.1 直接使用 MQ 库进行发消息
3.1.1 仅发消息
import json
from xlib import db
from xlib.mq import Queue
# Redis connection
baichuan_connection = db.my_caches["baichuan"]
# Queue
mq_queue=Queue("/demo_api/hello", connection=baichuan_connection)
# 消息入队列
msg_dict = {"str_info": "ceshi"}
msg_data = json.dumps(msg_dict)
msg_obj=mq_queue.enqueue(msg_data)
print msg_obj.id
3.1.2 设置消息的保留时长
参数
# result_ttl=900 : 正常结果保留时长
msg_obj=mq_queue.enqueue(w_str, result_ttl=900)
3.1.3 重复消息跳过
若调用的是同样的参数的任务仍在队列中或者正在处理中,则进行跳过
mq_queue.enqueue(params_json, result_ttl=900, msg_create_unique="yes")
3.2 消息结果获取
3.2.1 例子
阻塞
count = 1
# 1m
while count < 12:
if msg_obj.get_status() == "finished":
# 获取数据
print msg_obj.result <--------------------------"str"
break
if msg_obj.get_status() == "failed":
raise ValueError("Get msg data failed")
time.sleep(5)
count = count + 1
非阻塞
# 执行任务时生成了 id
msg_id = msg_obj.get_id()
-----------------------------------------------------------------------------
# 获取结果
from xlib.mq import msg
from xlib import db
baichuan_connection = db.my_caches["baichuan"]
@funcattr.api
def msg_status(req, msg_id):
if not msg.Msg.exists(msg_id, baichuan_connection):
return "ERR_MSG_NOT_EXIST", {}, [(__info, __version)]
my_msg = msg.Msg(msg_id, baichuan_connection)
if my_msg.get_status() == "queued":
return "ERR_MSG_IN_QUEUED", {}, [(__info, __version)]
if my_msg.get_status() == "started":
return "ERR_MSG_IN_STARTED", {}, [(__info, __version)]
if my_msg.get_status() == "failed":
return "ERR_MSG_EXE_FAILED", {}, [(__info, __version)]
if my_msg.get_status() == "finished":
try:
msg_result = json.loads(my_msg.result)
except:
return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
if "stat" not in msg_result.keys():
return "ERR_MSG_RESULT_FAILED", {}, [(__info, __version)]
return data["stat"], {"data": msg_data["data"]}, [(__info, __version)]
3.2.2 获取消息
from xlib import db
from xlib.mq import msg
from xlib.mq import exceptions as mq_exceptions
baichuan_connection = db.my_caches["baichuan"]
msg_id = "a1116553-1e3d-4f48-a0b4-5a6d02897c4c"
try:
msg_obj = msg.Msg.fetch(msg_id, connection=baichuan_connection)
except mq_exceptions.NoSuchMsgError:
print "msg {msg_id} not exists".format(msg_id=msg_id)
cost=getattr(msg_obj, "cost", "-1")
result=msg_obj.result
status=msg_obj.get_status()
print cost
print result
print status
Last updated