def acquire_cleaning_lock(self):
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
"""
return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
1.4.3.2 解锁
解锁一般使用 DEL 命令,但可能存在下列问题。
t1 时刻,App1 设置了分布式锁 resource_1,过期时间为 3 秒。
App1 由于程序慢等原因等待超过了 3 秒,而 resource_1 已经在 t2 时刻被释放。
t3 时刻,App2 获得这个分布式锁。
App1 从等待中恢复,在 t4 时刻运行 DEL resource_1 将 App2 持有的分布式锁释放了。
从上述过程可以看出,一个客户端设置的锁,必须由自己解开。因此客户端需要先使用 GET 命令确认锁是不是自己设置的,然后再使用 DEL 解锁。在 Redis 中通常需要用 Lua 脚本来实现自锁自解:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
from xlib.db import redis
class Monitor():
def __init__(self, connection_pool):
self.connection_pool = connection_pool
self.connection = None
def __del__(self):
try:
self.reset()
except:
pass
def reset(self):
if self.connection:
self.connection_pool.release(self.connection)
self.connection = None
def monitor(self):
if self.connection is None:
self.connection = self.connection_pool.get_connection(
'monitor', None)
self.connection.send_command("monitor")
return self.listen()
def parse_response(self):
return self.connection.read_response()
def listen(self):
while True:
yield self.parse_response()
if __name__ == '__main__':
ip="127.0.0.1"
port=6379
pool = redis.ConnectionPool(host=ip, port=port, db=0)
monitor = Monitor(pool)
commands = monitor.monitor()
count = 0
for c in commands :
count += 1
print(c)
if count > 2000:
monitor.reset()
break
3.2 execute_command 注意点
现象:使用 execute_command 执行命令时
执行 ("config get maxmemory") 返回个格式是 list
执行 ("config get", "maxmemory")返回格式是 dict
执行 ("config", "get", "maxmemory")返回格式是 list
from xlib.db import redis
redis_client = redis.Redis(host="127.0.0.1",port=int(6379))
# ['maxmemory', '0']
result = redis_client.execute_command("config get maxmemory")
print(result)
# {'maxmemory': '0'}
result = redis_client.execute_command("config get", "maxmemory")
print(result)
# ['maxmemory', '0']
result = redis_client.execute_command("config", "get", "maxmemory")
print(result)
代码分析
class Redis(object):
...
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
conn = self.connection or pool.get_connection(command_name, **options)
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except (ConnectionError, TimeoutError) as e:
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
raise
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
finally:
if not self.connection:
pool.release(conn)
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response
// 异常日志:
xlib.db.redis.exceptions.ConnectionError: Error 98 connecting to 127.0.0.1:6379. Address already in use.
// 连接情况
$ netstat -tanp | grep 12345
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 127.0.0.1:12345 127.0.0.1:6379 TIME_WAIT -
从预留端口中自动获取随机端口
import socket
import random
from xlib.db import redis
class CustomSocket(redis.Connection):
"""
connection = CustomSocket(host='localhost', port=6379, source_port_range=port_range)
"""
def __init__(self, *args, **kwargs):
self.source_port_range = kwargs.pop('source_port_range', None)
super(CustomSocket, self).__init__(*args, **kwargs)
def _connect(self):
"Create a TCP socket connection"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.source_port_range:
source_port = self._find_unused_port(self.source_port_range)
if source_port:
sock.bind(('', source_port))
sock.settimeout(self.socket_timeout)
sock.connect((self.host, self.port))
return sock
def _find_unused_port(self, port_range):
for _ in range(100): # 尝试最多100次
source_port = random.randint(port_range[0], port_range[1])
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
temp_sock.bind(('', source_port))
return source_port
except socket.error:
continue
finally:
temp_sock.close()
raise RuntimeError("No available ports in the specified range")
# 使用自定义的连接类
port_range = (10000, 11000)
client = redis.Redis(
connection_pool=redis.ConnectionPool(
connection_class=CustomSocket,
host='localhost',
port=6379,
source_port_range=port_range))
# 测试连接
client.set('foo', 'bar')
print(client.get('foo'))