middleware_whitelist

白名单

1 路径

butterfly/handlers/models.py
butterfly/handlers/middleware.py

2 代码

2.1 models.py

from datetime import datetime
from xlib import db
from xlib.db import redisorm
baichuan_cache = db.my_caches["baichuan"]


class WhiteList(db.RedisModel):
    """
    whitelist
    """
    _database_ = baichuan_cache
    w_type = redisorm.TextField(index=True)
    w_value = redisorm.TextField(index=True)
    w_user = redisorm.TextField(index=True, default="")
    u_time = redisorm.DateTimeField(default=datetime.now())

def w_create(w_type, w_value, w_user=""):
    """
    create token
    """
    if w_type not in ["ip", "token"]:
        print "ERR_TYPE"
        exit(0)

    print WhiteList.create(w_type=w_type, w_value=w_value, w_user=w_user)

def w_list():
    for whitelist in WhiteList.all():
        print whitelist.get_id(), whitelist.w_type, whitelist.w_value, whitelist.w_user

def w_delete(w_type, w_value):
    print WhiteList.query_delete((WhiteList.w_type==w_type) & (WhiteList.w_value==w_value))



if __name__ == '__main__':
    import sys, inspect
    if len(sys.argv) < 2:
        print "Usage:"
        for k, v in sorted(globals().items(), key=lambda item: item[0]):
            if inspect.isfunction(v) and k[0] != "_":
                args, __, __, defaults = inspect.getargspec(v)
                if defaults:
                    print sys.argv[0], k, str(args[:-len(defaults)])[1:-1].replace(",", ""), \
                          str(["%s=%s" % (a, b) for a, b in zip(args[-len(defaults):], defaults)])[1:-1].replace(",", "")
                else:
                    print sys.argv[0], k, str(v.func_code.co_varnames[:v.func_code.co_argcount])[1:-1].replace(",", "")
        sys.exit(-1)
    else:
        func = eval(sys.argv[1])
        args = sys.argv[2:]
        try:
            r = func(*args)
        except Exception, e:
            print "Usage:"
            print "\t", "python %s" % sys.argv[1], str(func.func_code.co_varnames[:func.func_code.co_argcount])[1:-1].replace(",", "")
            if func.func_doc:
                print "\n".join(["\t\t" + line.strip() for line in func.func_doc.strip().split("\n")])
            print e
            r = -1
            import traceback
            traceback.print_exc()
        if isinstance(r, int):
            sys.exit(r)

2.2 middleware.py

import logging
import httplib

from handlers import models
from xlib.middleware import cache
from xlib.util import retry


@cache.cache_page(expire=60)
@retry.retry(max_retries=3, interval=3, timeout=10)
def get_whitelist():
    """
    get whitelist
    """
    ip_whitelist_list = []
    token_whitelist_list = []
    whitelist_list = models.WhiteList.all()
    for whitelist in whitelist_list:
        if whitelist.w_type == "ip":
            ip_whitelist_list.append(whitelist.w_value)
        else:
            token_whitelist_list.append(whitelist.w_value)

    if "127.0.0.1" not in ip_whitelist_list:
        ip_whitelist_list.append("127.0.0.1")

    return ip_whitelist_list, token_whitelist_list


def is_auth(req):
    """
    return bool
    """
    ip_whitelist_list, token_whitelist_list = get_whitelist()
    if req.ip in ip_whitelist_list:
        return True

    token = req.wsgienv.get("HTTP_BUTTERFLY_TOKEN") or "-"
    if token in token_whitelist_list:
        return True

    return False


class Middleware():
    '''
    Middleware
    '''

    def __init__(self, protocol):
        self.protocol = protocol

    def __call__(self, req):
        log_msg = "[butterfly Request] [reqid]:{reqid} [wsgienv]:{wsgienv}".format(
            reqid=req.reqid, wsgienv=str(req.wsgienv))
        logging.debug(log_msg)
        if not is_auth(req):
            status = 401
            status_line = "%s %s" % (status, httplib.responses.get(status, ""))
            req.log_ret_code = "ERR_FORBIDDEN"
            return status_line, [], ""

        httpstatus, headers, content = self.protocol(req)
        return httpstatus, headers, content

3 测试

用户需要传 token 或者机器在白名单中才能访问此服务,若传 token,则格式为 butterfly_token:xxxx

$ curl -H "butterfly_token:xxxx" "http://<IP>:8585/demo_api/ping"
若 token 无效,则返回
HTTP/1.1 401 Unauthorized

Last updated