Redis 原生协议
1 使用方式
1.1 常规使用
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib.db import redis
@funcattr.api
def get(req, redis_ip, redis_port):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
redis_client = redis.Redis(host=redis_ip,port=int(redis_port),password='xxxx')
result = redis_client.get("meetbill")
# set
# result = redis_client.set("meetbill", "")
# execute_command
# cmd = ["set", "meetbill", ""]
# result = redis_client.execute_command(*cmd)
return retstat.OK, {"data": result}
1.2 Cache
/butterfly/handlers/wuxing/libs/cache.py
from xlib import db
__wuxing_db = db.my_caches[wuxing_database_name]
_cache = __wuxing_db.cache()
_cache.set(key, value, timeout)
_cache.get(key)
_cache.delete(key)
1.3 lua 支持
1.3.1 lua 命令
EVAL script numkeys key [key ...] arg [arg ...]
numkeys 是 key 的个数,后边接着写 key1 key2... val1 val2....,举例
redis 的 lua 脚本包含三个部分:脚本本身,操作的 KEY,要传入的参数 ARGV。这个一定要严格区分开,虽然在单机版上可以随便使用,但是在集群上是会影响集群行为的。按照 redis 命令的定义,lua 的执行类似:
-------------------------------------------------------------
eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
| | | | |
脚本命令的关键字 脚本内容 操作的 key 的数量 操作的 key 输入的参数
-------------------------------------------------------------
127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 val1 val2
1) "key1"
2) "key2"
3) "val1"
4) "val2"
1.3.2 twemproxy(nutcraker) 对 lua 中的 key 的路由策略
对于 EVAL/EVALSHA 使用第一个参数进行 Hash 作为 Lua 脚本的目标分片,也即这类命令参数要大于等于 1 个。不支持 Script 类命令。
当命令参数小于 1 个的时候,会提示出错
>EVAL "return redis.call('GET','hello')" 0
(error) ERR wrong number of arguments for 'eval' command
命令参数为 1 个时
> EVAL "return redis.call('GET',KEYS[1])" 1 hello
"world"
1.3.3 python redis client 使用 lua
1.3.3.1 简单例子
import redis
r = redis.Redis(host="127.0.0.1", port=6000)
lua_script="""
local key = KEYS[1]
local value = ARGV[1]
local max_idx = redis.call('HLEN', key)
redis.call('HSET', key, max_idx, value)
return max_idx
"""
cmd = r.eval(lua_script, 1, *['{test}1', "meetbill"])
print cmd
1.3.3.2 使用 list 作为消息队列
用 lua 脚本同时从 list 中 pop 数据和添加到 unacked 的独立 zset 中
lua = '''
local v = redis.call("lpop", KEYS[1])
if v then
redis.call('zadd',KEYS[2],ARGV[1],v)
end
return v
'''
1.4 锁
1.4.1 分布式锁及其应用场景
应用开发时,如果需要在同进程内的不同线程并发访问某项资源,可以使用各种互斥锁、读写锁;
如果一台主机上的多个进程需要并发访问某项资源,则可以使用进程间同步的原语,例如信号量、管道、共享内存等。
但如果多台主机需要同时访问某项资源,就需要使用一种在全局可见并具有互斥性的锁了。这种锁就是分布式锁,可以在分布式场景中对资源加锁,避免竞争资源引起的逻辑错误。

1.4.2 分布式锁的特性
互斥性:在任意时刻,只有一个客户端持有锁。
不死锁:分布式锁本质上是一个基于租约(Lease)的租借锁,如果客户端获得锁后自身出现异常,锁能够在一段时间后自动释放,资源不会被锁死。
一致性:硬件故障或网络异常等外部问题,以及慢查询、自身缺陷等内部因素都可能导致 Redis 发生高可用切换,replica 提升为新的 master。此时,如果业务对互斥性的要求非常高,锁需要在切换到新的 master 后保持原状态。
1.4.3 使用原生 Redis 实现分布式锁
1.4.3.1 加锁
锁 15 分钟 (899 为 14 分 59 秒)
"SET" "{key}" "1" "EX" "899" "NX"
SET resource_1 random_value NX EX 5
示例为 resource_1 这个 key 设置了 5 秒的过期时间,如果客户端不释放这个 key,5 秒后 key 将过期,锁就会被系统回收,此时其它客户端就能够再次为资源加锁并访问资源了。
resource_1: 分布式锁的 key,只要这个 key 存在,相应的资源就处于加锁状态,无法被其它客户端访问。
random_value: 一个随机字符串,不同客户端设置的值不能相同。
EX: 设置过期时间,单位为秒。您也可以使用 PX 选项设置单位为毫秒的过期时间。
NX: 如果需要设置的 key 在 Redis 中已存在,则取消设置。
如
def acquire_cleaning_lock(self):
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
"""
return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
1.4.3.2 解锁
解锁一般使用 DEL 命令,但可能存在下列问题。

t1 时刻,App1 设置了分布式锁 resource_1,过期时间为 3 秒。
App1 由于程序慢等原因等待超过了 3 秒,而 resource_1 已经在 t2 时刻被释放。
t3 时刻,App2 获得这个分布式锁。
App1 从等待中恢复,在 t4 时刻运行 DEL resource_1 将 App2 持有的分布式锁释放了。
从上述过程可以看出,一个客户端设置的锁,必须由自己解开。因此客户端需要先使用 GET 命令确认锁是不是自己设置的,然后再使用 DEL 解锁。在 Redis 中通常需要用 Lua 脚本来实现自锁自解:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
1.4.3.3 续租
当客户端发现在锁的租期内无法完成操作时,就需要延长锁的持有时间,进行续租(renew)。同解锁一样,客户端应该只能续租自己持有的锁。在 Redis 中可使用如下 Lua 脚本来实现续租:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("expire",KEYS[1], ARGV[2])
else
return 0
end
1.4.4 如何保障一致性
Redis 的主从同步(replication)是异步进行的,如果向 master 发送请求修改了数据后 master 突然出现异常,发生高可用切换, 缓冲区的数据可能无法同步到新的 master(原 replica)上,导致数据不一致。如果丢失的数据跟分布式锁有关,则会导致锁的机制出现问题,从而引起业务异常。
1.4.4.1 使用红锁(RedLock)
红锁是 Redis 作者提出的一致性解决方案。红锁的本质是一个概率问题:如果一个主从架构的 Redis 在高可用切换期间丢失锁的概率是 k%,那么相互独立的 N 个 Redis 同时丢失锁的概率是多少?如果用红锁来实现分布式锁,那么丢锁的概率是 (k%)^N。鉴于 Redis 极高的稳定性,此时的概率已经完全能满足产品的需求。
红锁的实现并非这样严格,一般保证 M(1<M=<N) 个同时锁上即可,但通常仍旧可以满足需求。
红锁的问题在于:
加锁和解锁的延迟较大。
难以在集群版或者标准版(主从架构)的 Redis 实例中实现。
占用的资源过多,为了实现红锁,需要创建多个互不相关的 Redis 实例或者自建 Redis。
1.4.4.2 使用 WAIT 命令
Redis 的 WAIT 命令会阻塞当前客户端,直到这条命令之前的所有写入命令都成功从 master 同步到指定数量的 replica,命令中可以设置单位为毫秒的等待超时时间。在 Redis 版中使用 WAIT 命令提高分布式锁一致性的示例如下:
SET resource_1 random_value NX EX 5
WAIT 1 5000
使用以上代码,客户端在加锁后会等待数据成功同步到 replica 才继续进行其它操作,最大等待时间为 5000 毫秒。执行 WAIT 命令后如果返回结果是 1 则表示同步成功,无需担心数据不一致。相比红锁,这种实现方法极大地降低了成本。
需要注意的是:
WAIT 只会阻塞发送它的客户端,不影响其它客户端。
WAIT 返回正确的值表示设置的锁成功同步到了 replica,但如果在正常返回前发生高可用切换,数据还是可能丢失,此时 WAIT 只能用来提示同步可能失败,无法保证数据不丢失。您可以在 WAIT 返回异常值后重新加锁或者进行数据校验。
解锁不一定需要使用 WAIT,因为锁只要存在就能保持互斥,延迟删除不会导致逻辑问题。
1.4.5 惊群效应
惊群效应是什么? 惊群效应也有人叫做雷鸣群体效应,不过叫什么,简言之,惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。
1.4.6 Butterfly lock 使用
from xlib import db
lock_db = db.my_caches["lock_redis"]
# key, milliseconds, lock_id
lock = lock_db.lock("my-lock", 300000, "lock_value1")
# 申请锁返回值 true or false,默认 block=True
lock.acquire(block=False)
# 续期 true or false
lock.extend()
# 释放锁
lock.release()
当 lock.acquire(block=True) 时,则会监听 lock 事件,有释放锁的时候,会产生一个事件告诉客户端锁已释放
装饰器
from xlib import db
lock_db = db.my_caches["baichuan"]
lock = lock_db.lock('event_expire')
@lock
def event_expire(req):
"""
event_expire
"""
req.log_res.add("event_record_expire=yes")
xxx
req.timming("event_record_expire")
#------------------------------------------
# lock 装饰器使用的 block=true 方式
# 如果配置的 Redis 服务不支持此命令(blpop)的话,就不能使用 lock 装饰器
#------------------------------------------
1.5 限流
维护一个固定大小的时间戳列表。当时间戳列表达到最大容量时,程序将查看最早的时间戳和当前时间之间的时差,以确定是否可以记录新事件。
限流示例,该限流设置每 60 秒允许 2 个事件。
from xlib import db
baichuan_cache = db.my_caches["baichuan"]
rate_limit = baichuan_cache.rate_limit('mylimit', limit=2, per=60)
>>> rate_limit.limit('user-1')
False
>>> rate_limit.limit('user-1')
False
>>> rate_limit.limit('user-1') # Slow down, user-1!
True
>>> rate_limit.limit('user-2') # User 2 has not performed any events yet.
False
redis 行为
# ------------------------one request
1614739505.778289 [0 127.0.0.1:19951] "LLEN" "mylimit:user-1"
1614739505.778555 [0 127.0.0.1:19951] "LPUSH" "mylimit:user-1" "1614739505.78"
1614739505.778696 [0 127.0.0.1:19951] "PEXPIRE" "mylimit:user-1" "120000" # 单位是毫秒,此值是时间窗口的 2 倍
----
当 per 为 1 时,此值为 2000
当 per 为 2 时,此值为 4000
----
# ------------------------two request
1614739505.778835 [0 127.0.0.1:19951] "LLEN" "mylimit:user-1"
1614739505.778939 [0 127.0.0.1:19951] "LPUSH" "mylimit:user-1" "1614739505.78"
1614739505.779042 [0 127.0.0.1:19951] "PEXPIRE" "mylimit:user-1" "120000"
# ------------------------three request
1614739505.779145 [0 127.0.0.1:19951] "LLEN" "mylimit:user-1"
1614739505.779240 [0 127.0.0.1:19951] "LINDEX" "mylimit:user-1" "-1"
1614739505.779356 [0 127.0.0.1:19951] "LTRIM" "mylimit:user-1" "0" "1"
1614739505.779464 [0 127.0.0.1:19951] "PEXPIRE" "mylimit:user-1" "120000"
# ------------------------four request
1614739505.779565 [0 127.0.0.1:19951] "LLEN" "mylimit:user-2"
1614739505.779651 [0 127.0.0.1:19951] "LPUSH" "mylimit:user-2" "1614739505.78"
1614739505.779750 [0 127.0.0.1:19951] "PEXPIRE" "mylimit:user-2" "120000"
#--------------------------
1637322680.582704 [0 127.0.0.1:32416] "LLEN" "mylimit:user-1"
1637322680.582862 [0 127.0.0.1:32416] "LINDEX" "mylimit:user-1" "-1"
1637322680.582993 [0 127.0.0.1:32416] "LPUSH" "mylimit:user-1" "1637322680.58"
1637322680.583135 [0 127.0.0.1:32416] "LTRIM" "mylimit:user-1" "0" "1"
1637322680.583249 [0 127.0.0.1:32416] "PEXPIRE" "mylimit:user-1" "240000"
1637322694.491930 [0 127.0.0.1:32820] "LLEN" "mylimit:user-1"
# LINDEX 通过索引获取列表中最后一个元素
1637322694.492128 [0 127.0.0.1:32820] "LINDEX" "mylimit:user-1" "-1"
1637322694.492259 [0 127.0.0.1:32820] "LPUSH" "mylimit:user-1" "1637322694.49"
# LTRIM 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
下标 0 表示列表的第一个元素,以 1 表示列表的第二个元素
1637322694.492401 [0 127.0.0.1:32820] "LTRIM" "mylimit:user-1" "0" "1"
1637322694.492514 [0 127.0.0.1:32820] "PEXPIRE" "mylimit:user-1" "240000"
1637322701.566245 [0 127.0.0.1:33128] "LLEN" "mylimit:user-1"
1637322701.566403 [0 127.0.0.1:33128] "LINDEX" "mylimit:user-1" "-1"
1637322701.566533 [0 127.0.0.1:33128] "LTRIM" "mylimit:user-1" "0" "1"
1637322701.566671 [0 127.0.0.1:33128] "PEXPIRE" "mylimit:user-1" "240000"
1637322711.330047 [0 127.0.0.1:33541] "LLEN" "mylimit:user-1"
1637322711.330203 [0 127.0.0.1:33541] "LINDEX" "mylimit:user-1" "-1"
1637322711.330337 [0 127.0.0.1:33541] "LTRIM" "mylimit:user-1" "0" "1"
1637322711.330523 [0 127.0.0.1:33541] "PEXPIRE" "mylimit:user-1" "240000"
2 请求流程图
(1) StrictRedis 创建客户端对象,并初始化连接池
(2) 执行 ping 命令
(3) 调用 execute_command 从使用 get_connection 从连接池读取连接,然后发送命令,接着解析返回的响应,最后释放连接。
(4) get_connection 调用,在重连接池中 pop 一个连接,如果连接不存在,则调用 make_connection 方法创建连接。
(5) 初始化连接对象。
(6) 返回连接对象
(7) 执行 send_command 函数,打包编码 resp 协议的命令。
(8) 编码 resp 过程。
(9) 检查连接是否存在,如果存在则发送 socket 数据。如果不存在,则调用 connect 方法创建连接对象。
(10) 创建 socket,用于网络通信。
(11) 连接创建之后,调用连接的 on_connect 方法。
(12) 调用 pythonparse 的 on connect 方法。初始化 socketbuffer 对象,用于接受数据时候的 socket 通信。
(13) 初始化 socketbuffer 对象。
(14) 逐步返回连接对象,直到可以 sendall 数据到服务器。
(15) 结束发送过程。
(16) 调用 parse_response 方法,用于读取服务器返回的响应数据
(17) 逐步回溯调用 pythonparse 封装的方法读取一行数据。
(18) 通过 socketbuffer 读取一行数据
(19) 遇到批量回复或多批量回复,调用 read 读取除 token 之后的数据。
(20) 与 19 类似,递归处理多批量回复。
(21) 从 socket 读取数据。
3 其他
3.1 模拟 monitor 命令,可设置命令数然后截断
from xlib.db import redis
class Monitor():
def __init__(self, connection_pool):
self.connection_pool = connection_pool
self.connection = None
def __del__(self):
try:
self.reset()
except:
pass
def reset(self):
if self.connection:
self.connection_pool.release(self.connection)
self.connection = None
def monitor(self):
if self.connection is None:
self.connection = self.connection_pool.get_connection(
'monitor', None)
self.connection.send_command("monitor")
return self.listen()
def parse_response(self):
return self.connection.read_response()
def listen(self):
while True:
yield self.parse_response()
if __name__ == '__main__':
ip="127.0.0.1"
port=6379
pool = redis.ConnectionPool(host=ip, port=port, db=0)
monitor = Monitor(pool)
commands = monitor.monitor()
count = 0
for c in commands :
count += 1
print(c)
if count > 2000:
monitor.reset()
break
3.2 execute_command 注意点
现象:使用 execute_command 执行命令时
执行 ("config get maxmemory") 返回个格式是 list
执行 ("config get", "maxmemory")返回格式是 dict
执行 ("config", "get", "maxmemory")返回格式是 list
from xlib.db import redis
redis_client = redis.Redis(host="127.0.0.1",port=int(6379))
# ['maxmemory', '0']
result = redis_client.execute_command("config get maxmemory")
print(result)
# {'maxmemory': '0'}
result = redis_client.execute_command("config get", "maxmemory")
print(result)
# ['maxmemory', '0']
result = redis_client.execute_command("config", "get", "maxmemory")
print(result)
代码分析
class Redis(object):
...
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
conn = self.connection or pool.get_connection(command_name, **options)
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except (ConnectionError, TimeoutError) as e:
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
raise
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
finally:
if not self.connection:
pool.release(conn)
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response
redis-py 对命令的第一个参数进行匹配,如存在,则执行 self.response_callbacks 进行解析响应。
3.3 指定客户端端口
import socket
from xlib.db import redis
class CustomSocket(redis.Connection):
"""
#connection = CustomSocket(host='localhost', port=6379, source_port=12345)
"""
def __init__(self, *args, **kwargs):
self.source_port = kwargs.pop('source_port', None)
super(CustomSocket, self).__init__(*args, **kwargs)
def _connect(self):
"Create a TCP socket connection"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.source_port:
sock.bind(('', self.source_port))
sock.settimeout(self.socket_timeout)
sock.connect((self.host, self.port))
return sock
# 使用自定义的连接类
client = redis.Redis(
connection_pool=redis.ConnectionPool(
connection_class=CustomSocket,
host='127.0.0.1',
port=6379,
source_port=12345))
# 测试连接
client.set('foo', 'bar')
print(client.get('foo'))
执行第一次时运行正常,执行第二次时报错
// 异常日志:
xlib.db.redis.exceptions.ConnectionError: Error 98 connecting to 127.0.0.1:6379. Address already in use.
// 连接情况
$ netstat -tanp | grep 12345
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 127.0.0.1:12345 127.0.0.1:6379 TIME_WAIT -
从预留端口中自动获取随机端口
import socket
import random
from xlib.db import redis
class CustomSocket(redis.Connection):
"""
connection = CustomSocket(host='localhost', port=6379, source_port_range=port_range)
"""
def __init__(self, *args, **kwargs):
self.source_port_range = kwargs.pop('source_port_range', None)
super(CustomSocket, self).__init__(*args, **kwargs)
def _connect(self):
"Create a TCP socket connection"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.source_port_range:
source_port = self._find_unused_port(self.source_port_range)
if source_port:
sock.bind(('', source_port))
sock.settimeout(self.socket_timeout)
sock.connect((self.host, self.port))
return sock
def _find_unused_port(self, port_range):
for _ in range(100): # 尝试最多100次
source_port = random.randint(port_range[0], port_range[1])
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
temp_sock.bind(('', source_port))
return source_port
except socket.error:
continue
finally:
temp_sock.close()
raise RuntimeError("No available ports in the specified range")
# 使用自定义的连接类
port_range = (10000, 11000)
client = redis.Redis(
connection_pool=redis.ConnectionPool(
connection_class=CustomSocket,
host='localhost',
port=6379,
source_port_range=port_range))
# 测试连接
client.set('foo', 'bar')
print(client.get('foo'))
传送门
锁
bbangert/retools -- acquire does spinloop
distributing-locking-python-and-redis -- acquire does polling
cezarsa/redis_lock -- acquire does not block
andymccurdy/redis-py -- acquire does spinloop
mpessas/python-redis-lock -- blocks fine but no expiration
brainix/pottery -- acquire does spinloop
ionelmc/python-redis-lock -- 支持 lock 续期
Last updated