middleware_whitelist
白名单
1 路径
butterfly/handlers/models.py
butterfly/handlers/middleware.py
2 代码
2.1 models.py
from datetime import datetime
from xlib import db
from xlib.db import redisorm
baichuan_cache = db.my_caches["baichuan"]
class WhiteList(db.RedisModel):
"""
whitelist
"""
_database_ = baichuan_cache
w_type = redisorm.TextField(index=True)
w_value = redisorm.TextField(index=True)
w_user = redisorm.TextField(index=True, default="")
u_time = redisorm.DateTimeField(default=datetime.now())
def w_create(w_type, w_value, w_user=""):
"""
create token
"""
if w_type not in ["ip", "token"]:
print "ERR_TYPE"
exit(0)
print WhiteList.create(w_type=w_type, w_value=w_value, w_user=w_user)
def w_list():
for whitelist in WhiteList.all():
print whitelist.get_id(), whitelist.w_type, whitelist.w_value, whitelist.w_user
def w_delete(w_type, w_value):
print WhiteList.query_delete((WhiteList.w_type==w_type) & (WhiteList.w_value==w_value))
if __name__ == '__main__':
import sys, inspect
if len(sys.argv) < 2:
print "Usage:"
for k, v in sorted(globals().items(), key=lambda item: item[0]):
if inspect.isfunction(v) and k[0] != "_":
args, __, __, defaults = inspect.getargspec(v)
if defaults:
print sys.argv[0], k, str(args[:-len(defaults)])[1:-1].replace(",", ""), \
str(["%s=%s" % (a, b) for a, b in zip(args[-len(defaults):], defaults)])[1:-1].replace(",", "")
else:
print sys.argv[0], k, str(v.func_code.co_varnames[:v.func_code.co_argcount])[1:-1].replace(",", "")
sys.exit(-1)
else:
func = eval(sys.argv[1])
args = sys.argv[2:]
try:
r = func(*args)
except Exception, e:
print "Usage:"
print "\t", "python %s" % sys.argv[1], str(func.func_code.co_varnames[:func.func_code.co_argcount])[1:-1].replace(",", "")
if func.func_doc:
print "\n".join(["\t\t" + line.strip() for line in func.func_doc.strip().split("\n")])
print e
r = -1
import traceback
traceback.print_exc()
if isinstance(r, int):
sys.exit(r)
2.2 middleware.py
import logging
import httplib
from handlers import models
from xlib.middleware import cache
from xlib.util import retry
@cache.cache_page(expire=60)
@retry.retry(max_retries=3, interval=3, timeout=10)
def get_whitelist():
"""
get whitelist
"""
ip_whitelist_list = []
token_whitelist_list = []
whitelist_list = models.WhiteList.all()
for whitelist in whitelist_list:
if whitelist.w_type == "ip":
ip_whitelist_list.append(whitelist.w_value)
else:
token_whitelist_list.append(whitelist.w_value)
if "127.0.0.1" not in ip_whitelist_list:
ip_whitelist_list.append("127.0.0.1")
return ip_whitelist_list, token_whitelist_list
def is_auth(req):
"""
return bool
"""
ip_whitelist_list, token_whitelist_list = get_whitelist()
if req.ip in ip_whitelist_list:
return True
token = req.wsgienv.get("HTTP_BUTTERFLY_TOKEN") or "-"
if token in token_whitelist_list:
return True
return False
class Middleware():
'''
Middleware
'''
def __init__(self, protocol):
self.protocol = protocol
def __call__(self, req):
log_msg = "[butterfly Request] [reqid]:{reqid} [wsgienv]:{wsgienv}".format(
reqid=req.reqid, wsgienv=str(req.wsgienv))
logging.debug(log_msg)
if not is_auth(req):
status = 401
status_line = "%s %s" % (status, httplib.responses.get(status, ""))
req.log_ret_code = "ERR_FORBIDDEN"
return status_line, [], ""
httpstatus, headers, content = self.protocol(req)
return httpstatus, headers, content
3 测试
用户需要传 token 或者机器在白名单中才能访问此服务,若传 token,则格式为 butterfly_token:xxxx
$ curl -H "butterfly_token:xxxx" "http://<IP>:8585/demo_api/ping"
若 token 无效,则返回
HTTP/1.1 401 Unauthorized
Last updated