Redis 原生协议
1 使用方式
1.1 常规使用
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib.db import redis
@funcattr.api
def get(req, redis_ip, redis_port):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
redis_client = redis.Redis(host=redis_ip,port=int(redis_port),password='xxxx')
result = redis_client.get("meetbill")
# set
# result = redis_client.set("meetbill", "")
# execute_command
# cmd = ["set", "meetbill", ""]
# result = redis_client.execute_command(*cmd)
return retstat.OK, {"data": result}1.2 Cache
/butterfly/handlers/wuxing/libs/cache.py
1.3 lua 支持
1.3.1 lua 命令
1.3.2 twemproxy(nutcraker) 对 lua 中的 key 的路由策略
对于 EVAL/EVALSHA 使用第一个参数进行 Hash 作为 Lua 脚本的目标分片,也即这类命令参数要大于等于 1 个。不支持 Script 类命令。
当命令参数小于 1 个的时候,会提示出错
命令参数为 1 个时
1.3.3 python redis client 使用 lua
1.3.3.1 简单例子
1.3.3.2 使用 list 作为消息队列
用 lua 脚本同时从 list 中 pop 数据和添加到 unacked 的独立 zset 中
1.4 锁
1.4.1 分布式锁及其应用场景
应用开发时,如果需要在同进程内的不同线程并发访问某项资源,可以使用各种互斥锁、读写锁;
如果一台主机上的多个进程需要并发访问某项资源,则可以使用进程间同步的原语,例如信号量、管道、共享内存等。
但如果多台主机需要同时访问某项资源,就需要使用一种在全局可见并具有互斥性的锁了。这种锁就是分布式锁,可以在分布式场景中对资源加锁,避免竞争资源引起的逻辑错误。

1.4.2 分布式锁的特性
互斥性:在任意时刻,只有一个客户端持有锁。
不死锁:分布式锁本质上是一个基于租约(Lease)的租借锁,如果客户端获得锁后自身出现异常,锁能够在一段时间后自动释放,资源不会被锁死。
一致性:硬件故障或网络异常等外部问题,以及慢查询、自身缺陷等内部因素都可能导致 Redis 发生高可用切换,replica 提升为新的 master。此时,如果业务对互斥性的要求非常高,锁需要在切换到新的 master 后保持原状态。
1.4.3 使用原生 Redis 实现分布式锁
1.4.3.1 加锁
锁 15 分钟 (899 为 14 分 59 秒)
resource_1: 分布式锁的 key,只要这个 key 存在,相应的资源就处于加锁状态,无法被其它客户端访问。
random_value: 一个随机字符串,不同客户端设置的值不能相同。
EX: 设置过期时间,单位为秒。您也可以使用 PX 选项设置单位为毫秒的过期时间。
NX: 如果需要设置的 key 在 Redis 中已存在,则取消设置。
如
1.4.3.2 解锁
解锁一般使用 DEL 命令,但可能存在下列问题。

t1 时刻,App1 设置了分布式锁 resource_1,过期时间为 3 秒。
App1 由于程序慢等原因等待超过了 3 秒,而 resource_1 已经在 t2 时刻被释放。
t3 时刻,App2 获得这个分布式锁。
App1 从等待中恢复,在 t4 时刻运行 DEL resource_1 将 App2 持有的分布式锁释放了。
从上述过程可以看出,一个客户端设置的锁,必须由自己解开。因此客户端需要先使用 GET 命令确认锁是不是自己设置的,然后再使用 DEL 解锁。在 Redis 中通常需要用 Lua 脚本来实现自锁自解:
1.4.3.3 续租
当客户端发现在锁的租期内无法完成操作时,就需要延长锁的持有时间,进行续租(renew)。同解锁一样,客户端应该只能续租自己持有的锁。在 Redis 中可使用如下 Lua 脚本来实现续租:
1.4.4 如何保障一致性
Redis 的主从同步(replication)是异步进行的,如果向 master 发送请求修改了数据后 master 突然出现异常,发生高可用切换, 缓冲区的数据可能无法同步到新的 master(原 replica)上,导致数据不一致。如果丢失的数据跟分布式锁有关,则会导致锁的机制出现问题,从而引起业务异常。
1.4.4.1 使用红锁(RedLock)
红锁是 Redis 作者提出的一致性解决方案。红锁的本质是一个概率问题:如果一个主从架构的 Redis 在高可用切换期间丢失锁的概率是 k%,那么相互独立的 N 个 Redis 同时丢失锁的概率是多少?如果用红锁来实现分布式锁,那么丢锁的概率是 (k%)^N。鉴于 Redis 极高的稳定性,此时的概率已经完全能满足产品的需求。
红锁的问题在于:
加锁和解锁的延迟较大。
难以在集群版或者标准版(主从架构)的 Redis 实例中实现。
占用的资源过多,为了实现红锁,需要创建多个互不相关的 Redis 实例或者自建 Redis。
1.4.4.2 使用 WAIT 命令
Redis 的 WAIT 命令会阻塞当前客户端,直到这条命令之前的所有写入命令都成功从 master 同步到指定数量的 replica,命令中可以设置单位为毫秒的等待超时时间。在 Redis 版中使用 WAIT 命令提高分布式锁一致性的示例如下:
使用以上代码,客户端在加锁后会等待数据成功同步到 replica 才继续进行其它操作,最大等待时间为 5000 毫秒。执行 WAIT 命令后如果返回结果是 1 则表示同步成功,无需担心数据不一致。相比红锁,这种实现方法极大地降低了成本。
需要注意的是:
WAIT 只会阻塞发送它的客户端,不影响其它客户端。
WAIT 返回正确的值表示设置的锁成功同步到了 replica,但如果在正常返回前发生高可用切换,数据还是可能丢失,此时 WAIT 只能用来提示同步可能失败,无法保证数据不丢失。您可以在 WAIT 返回异常值后重新加锁或者进行数据校验。
解锁不一定需要使用 WAIT,因为锁只要存在就能保持互斥,延迟删除不会导致逻辑问题。
1.4.5 惊群效应
惊群效应是什么? 惊群效应也有人叫做雷鸣群体效应,不过叫什么,简言之,惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。
1.4.6 Butterfly lock 使用
当 lock.acquire(block=True) 时,则会监听 lock 事件,有释放锁的时候,会产生一个事件告诉客户端锁已释放
装饰器
1.5 限流
维护一个固定大小的时间戳列表。当时间戳列表达到最大容量时,程序将查看最早的时间戳和当前时间之间的时差,以确定是否可以记录新事件。
限流示例,该限流设置每 60 秒允许 2 个事件。
redis 行为
2 请求流程图
(1) StrictRedis 创建客户端对象,并初始化连接池
(2) 执行 ping 命令
(3) 调用 execute_command 从使用 get_connection 从连接池读取连接,然后发送命令,接着解析返回的响应,最后释放连接。
(4) get_connection 调用,在重连接池中 pop 一个连接,如果连接不存在,则调用 make_connection 方法创建连接。
(5) 初始化连接对象。
(6) 返回连接对象
(7) 执行 send_command 函数,打包编码 resp 协议的命令。
(8) 编码 resp 过程。
(9) 检查连接是否存在,如果存在则发送 socket 数据。如果不存在,则调用 connect 方法创建连接对象。
(10) 创建 socket,用于网络通信。
(11) 连接创建之后,调用连接的 on_connect 方法。
(12) 调用 pythonparse 的 on connect 方法。初始化 socketbuffer 对象,用于接受数据时候的 socket 通信。
(13) 初始化 socketbuffer 对象。
(14) 逐步返回连接对象,直到可以 sendall 数据到服务器。
(15) 结束发送过程。
(16) 调用 parse_response 方法,用于读取服务器返回的响应数据
(17) 逐步回溯调用 pythonparse 封装的方法读取一行数据。
(18) 通过 socketbuffer 读取一行数据
(19) 遇到批量回复或多批量回复,调用 read 读取除 token 之后的数据。
(20) 与 19 类似,递归处理多批量回复。
(21) 从 socket 读取数据。
3 异常
若执行不识别的命令,则会报异常
4 其他
4.1 模拟 monitor 命令,可设置命令数然后截断
4.2 execute_command 注意点
现象:使用 execute_command 执行命令时
执行 ("config get maxmemory") 返回个格式是 list
执行 ("config get", "maxmemory")返回格式是 dict
执行 ("config", "get", "maxmemory")返回格式是 list
代码分析
redis-py 对命令的第一个参数进行匹配,如存在,则执行 self.response_callbacks 进行解析响应。
4.3 指定客户端端口
执行第一次时运行正常,执行第二次时报错
从预留端口中自动获取随机端口
传送门
锁
bbangert/retools -- acquire does spinloop
distributing-locking-python-and-redis -- acquire does polling
cezarsa/redis_lock -- acquire does not block
andymccurdy/redis-py -- acquire does spinloop
mpessas/python-redis-lock -- blocks fine but no expiration
brainix/pottery -- acquire does spinloop
ionelmc/python-redis-lock -- 支持 lock 续期
Last updated