command2http_decorator
xlib 中不包含此装饰器,如有需要,可将装饰器代码放在 app 中,命名为 lib_local.py
1 场景
对[如期]任务做操作时,因涉及到调度服务操作,需要将 COMMAND 请求转为 HTTP 请求,请求本机 server 服务
2 使用方法
@funcattr.api
@lib_local.command2http("ruqi")
def job_show(req, job_id):
"""
job show
"""
pass
3 装饰器代码
#!/usr/bin/python
# coding=utf8
"""
# Author: wangbin34
# Created Time : 2025-02-07 22:20:37
# File Name: lib_local.py
# Description:
对[如期]任务做操作时,因涉及到调度服务操作,需要将 COMMAND 请求转为 HTTP 请求,请求本机 server 服务
"""
from conf import config
from xlib import retstat
from xlib.util import http_util
from xlib.util import wrapt
from xlib.util import funcsigs
def command2http(app):
"""
请求本机 server
"""
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
"""
wrapper
使用 COMMAND 方式时,均是位置参数, 会都转化为 kargs
"""
# 1. 提取原 func 中的 req 和 user_id 值, 不论使用位置参数还是关键字参数
sig = funcsigs.signature(wrapped)
kargs = dict(zip([k for k in sig.parameters if k not in kwargs], args))
req = kargs.get("req", kwargs.get("req"))
# 2. COMMAND 场景,请求本机服务
if req.wsgienv.get("REQUEST_METHOD", "COMMAND") == "COMMAND":
body = kargs
del body["req"]
server_port = config.SERVER_LISTEN_ADDR[1]
endoint = "http://127.0.0.1:{server_port}/{app}/{func_name}".format(server_port=server_port,
app=app,
func_name=wrapped.__name__)
rsp = http_util.post_json(endoint, data=body)
if not rsp.success():
req.error_str = "request local server failed"
return retstat.ERR, {}
result = rsp.json()
# 若请求失败,则从响应 header 中获取下错误信息
if result["stat"] != retstat.OK:
req.error_str = rsp.header.get("x-reason", "")
return result["stat"], result
# 3. HTTP 场景,请求 func
return wrapped(*args, **kwargs)
return wrapper
Last updated