Redis 原生协议

1 使用方式

1.1 常规使用

# coding=utf8

from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib.db import redis

@funcattr.api
def get(req, redis_ip, redis_port):
    """
    Args:
        req     : Request
    Returns:
        json_status, Content, headers
    """
    isinstance(req, Request)
    redis_client = redis.Redis(host=redis_ip,port=int(redis_port),password='xxxx')
    result = redis_client.get("meetbill")
    
    # set
    # result = redis_client.set("meetbill", "")
    
    # execute_command
    # cmd = ["set", "meetbill", ""]
    # result = redis_client.execute_command(*cmd)
    
    return retstat.OK, {"data": result}

1.2 Cache

/butterfly/handlers/wuxing/libs/cache.py

1.3 lua 支持

1.3.1 lua 命令

1.3.2 twemproxy(nutcraker) 对 lua 中的 key 的路由策略

对于 EVAL/EVALSHA 使用第一个参数进行 Hash 作为 Lua 脚本的目标分片,也即这类命令参数要大于等于 1 个。不支持 Script 类命令。

当命令参数小于 1 个的时候,会提示出错

命令参数为 1 个时

1.3.3 python redis client 使用 lua

1.3.3.1 简单例子

1.3.3.2 使用 list 作为消息队列

用 lua 脚本同时从 list 中 pop 数据和添加到 unacked 的独立 zset 中

1.4 锁

1.4.1 分布式锁及其应用场景

应用开发时,如果需要在同进程内的不同线程并发访问某项资源,可以使用各种互斥锁、读写锁;

如果一台主机上的多个进程需要并发访问某项资源,则可以使用进程间同步的原语,例如信号量、管道、共享内存等。

但如果多台主机需要同时访问某项资源,就需要使用一种在全局可见并具有互斥性的锁了。这种锁就是分布式锁,可以在分布式场景中对资源加锁,避免竞争资源引起的逻辑错误。

snap

1.4.2 分布式锁的特性

  • 互斥性:在任意时刻,只有一个客户端持有锁。

  • 不死锁:分布式锁本质上是一个基于租约(Lease)的租借锁,如果客户端获得锁后自身出现异常,锁能够在一段时间后自动释放,资源不会被锁死。

  • 一致性:硬件故障或网络异常等外部问题,以及慢查询、自身缺陷等内部因素都可能导致 Redis 发生高可用切换,replica 提升为新的 master。此时,如果业务对互斥性的要求非常高,锁需要在切换到新的 master 后保持原状态。

1.4.3 使用原生 Redis 实现分布式锁

1.4.3.1 加锁

锁 15 分钟 (899 为 14 分 59 秒)

  • resource_1: 分布式锁的 key,只要这个 key 存在,相应的资源就处于加锁状态,无法被其它客户端访问。

  • random_value: 一个随机字符串,不同客户端设置的值不能相同。

  • EX: 设置过期时间,单位为秒。您也可以使用 PX 选项设置单位为毫秒的过期时间。

  • NX: 如果需要设置的 key 在 Redis 中已存在,则取消设置。

1.4.3.2 解锁

解锁一般使用 DEL 命令,但可能存在下列问题。

snap
  • t1 时刻,App1 设置了分布式锁 resource_1,过期时间为 3 秒。

  • App1 由于程序慢等原因等待超过了 3 秒,而 resource_1 已经在 t2 时刻被释放。

  • t3 时刻,App2 获得这个分布式锁。

  • App1 从等待中恢复,在 t4 时刻运行 DEL resource_1 将 App2 持有的分布式锁释放了。

从上述过程可以看出,一个客户端设置的锁,必须由自己解开。因此客户端需要先使用 GET 命令确认锁是不是自己设置的,然后再使用 DEL 解锁。在 Redis 中通常需要用 Lua 脚本来实现自锁自解:

1.4.3.3 续租

当客户端发现在锁的租期内无法完成操作时,就需要延长锁的持有时间,进行续租(renew)。同解锁一样,客户端应该只能续租自己持有的锁。在 Redis 中可使用如下 Lua 脚本来实现续租:

1.4.4 如何保障一致性

Redis 的主从同步(replication)是异步进行的,如果向 master 发送请求修改了数据后 master 突然出现异常,发生高可用切换, 缓冲区的数据可能无法同步到新的 master(原 replica)上,导致数据不一致。如果丢失的数据跟分布式锁有关,则会导致锁的机制出现问题,从而引起业务异常。

1.4.4.1 使用红锁(RedLock)

红锁是 Redis 作者提出的一致性解决方案。红锁的本质是一个概率问题:如果一个主从架构的 Redis 在高可用切换期间丢失锁的概率是 k%,那么相互独立的 N 个 Redis 同时丢失锁的概率是多少?如果用红锁来实现分布式锁,那么丢锁的概率是 (k%)^N。鉴于 Redis 极高的稳定性,此时的概率已经完全能满足产品的需求。

红锁的问题在于:

  • 加锁和解锁的延迟较大。

  • 难以在集群版或者标准版(主从架构)的 Redis 实例中实现。

  • 占用的资源过多,为了实现红锁,需要创建多个互不相关的 Redis 实例或者自建 Redis。

1.4.4.2 使用 WAIT 命令

Redis 的 WAIT 命令会阻塞当前客户端,直到这条命令之前的所有写入命令都成功从 master 同步到指定数量的 replica,命令中可以设置单位为毫秒的等待超时时间。在 Redis 版中使用 WAIT 命令提高分布式锁一致性的示例如下:

使用以上代码,客户端在加锁后会等待数据成功同步到 replica 才继续进行其它操作,最大等待时间为 5000 毫秒。执行 WAIT 命令后如果返回结果是 1 则表示同步成功,无需担心数据不一致。相比红锁,这种实现方法极大地降低了成本。

需要注意的是:

  • WAIT 只会阻塞发送它的客户端,不影响其它客户端。

  • WAIT 返回正确的值表示设置的锁成功同步到了 replica,但如果在正常返回前发生高可用切换,数据还是可能丢失,此时 WAIT 只能用来提示同步可能失败,无法保证数据不丢失。您可以在 WAIT 返回异常值后重新加锁或者进行数据校验。

  • 解锁不一定需要使用 WAIT,因为锁只要存在就能保持互斥,延迟删除不会导致逻辑问题。

1.4.5 惊群效应

惊群效应是什么? 惊群效应也有人叫做雷鸣群体效应,不过叫什么,简言之,惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。

为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。

1.4.6 Butterfly lock 使用

依赖 Redis Lua script 命令,如果使用 twemproxy,则需要 twemproxy 支持此命令

当 lock.acquire(block=True) 时,则会监听 lock 事件,有释放锁的时候,会产生一个事件告诉客户端锁已释放

装饰器

1.5 限流

维护一个固定大小的时间戳列表。当时间戳列表达到最大容量时,程序将查看最早的时间戳和当前时间之间的时差,以确定是否可以记录新事件。

限流示例,该限流设置每 60 秒允许 2 个事件。

redis 行为

2 请求流程图

  • (1) StrictRedis 创建客户端对象,并初始化连接池

  • (2) 执行 ping 命令

  • (3) 调用 execute_command 从使用 get_connection 从连接池读取连接,然后发送命令,接着解析返回的响应,最后释放连接。

  • (4) get_connection 调用,在重连接池中 pop 一个连接,如果连接不存在,则调用 make_connection 方法创建连接。

  • (5) 初始化连接对象。

  • (6) 返回连接对象

  • (7) 执行 send_command 函数,打包编码 resp 协议的命令。

  • (8) 编码 resp 过程。

  • (9) 检查连接是否存在,如果存在则发送 socket 数据。如果不存在,则调用 connect 方法创建连接对象。

  • (10) 创建 socket,用于网络通信。

  • (11) 连接创建之后,调用连接的 on_connect 方法。

  • (12) 调用 pythonparse 的 on connect 方法。初始化 socketbuffer 对象,用于接受数据时候的 socket 通信。

  • (13) 初始化 socketbuffer 对象。

  • (14) 逐步返回连接对象,直到可以 sendall 数据到服务器。

  • (15) 结束发送过程。

  • (16) 调用 parse_response 方法,用于读取服务器返回的响应数据

  • (17) 逐步回溯调用 pythonparse 封装的方法读取一行数据。

  • (18) 通过 socketbuffer 读取一行数据

  • (19) 遇到批量回复或多批量回复,调用 read 读取除 token 之后的数据。

  • (20) 与 19 类似,递归处理多批量回复。

  • (21) 从 socket 读取数据。

3 异常

若执行不识别的命令,则会报异常

4 其他

4.1 模拟 monitor 命令,可设置命令数然后截断

4.2 execute_command 注意点

现象:使用 execute_command 执行命令时

  • 执行 ("config get maxmemory") 返回个格式是 list

  • 执行 ("config get", "maxmemory")返回格式是 dict

  • 执行 ("config", "get", "maxmemory")返回格式是 list

代码分析

redis-py 对命令的第一个参数进行匹配,如存在,则执行 self.response_callbacks 进行解析响应。

4.3 指定客户端端口

执行第一次时运行正常,执行第二次时报错

从预留端口中自动获取随机端口

传送门

Last updated