redis的文档中提供了一个由redis作者antirez实现的分布式锁:redlock-rb
redlock-rb的使用示例如下:
1
2
3
4
5
6
7
8
| # 创建一个distributed lock manager(dlm)
dlm = Redlock.new("redis://127.0.0.1:6379","redis://127.0.0.1:6380","redis://127.0.0.1:6381")
# 上锁
my_lock = dlm.lock("my_resource_name",1000)
# 解锁
dlm.unlock(my_lock)
# 设置重试次数和最大重试延迟(重试时间为小于等于这个值的随机数)
dlm.set_retry(3,200)
|
其实现非常简洁,只有不到100行,下面分析一下代码
类开头声明了一些常量,还有用于解锁的代码:
1
2
3
4
5
6
7
8
9
10
| DefaultRetryCount=3 # 未成功上锁的默认重试次数
DefaultRetryDelay=200 # 未成功上锁的最大重试间隔
ClockDriftFactor = 0.01 # 时钟不同步导致的偏移系数
# 解锁代码
UnlockScript='
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end'
|
init方法作用是声明一些实例变量:
1
2
3
4
5
6
7
8
9
| @servers = []
server_urls.each{|url|
@servers << Redis.new(:url => url)
}
# 一组服务器中需要达成一致意见的最小节点数,该值含义为'超过半数',为分布式系统专有名词
@quorum = server_urls.length / 2 + 1
@retry_count = DefaultRetryCount
@retry_delay = DefaultRetryDelay
@urandom = File.new("/dev/urandom") # Unix-like系统提供的用于生成伪随机数的设备文件
|
redlock的核心逻辑是lock
和unlock
两个方法:
其中值得注意的是锁真实有效时间 = 设置的时间 - (时钟不同步偏移 + redis的过期精度偏移)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| def lock(resource,ttl)
# 通过urandom生成一个随机的锁id
val = get_unique_lock_id
# 至少尝试retry_count次
@retry_count.times {
n = 0 # 成功上锁的服务器数量
start_time = (Time.now.to_f*1000).to_i
@servers.each{|s|
# 在每个服务器上锁后统计上锁成功的服务器数量
n += 1 if lock_instance(s,resource,val,ttl)
}
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 milliescond, plus 1 millisecond min drift
# for small TTLs.
# 考虑到redis过期的精度为1毫秒,还有TTL较小的时候expire会增加一个最小1毫秒左右的偏移
# 因此增加2毫秒过期(expire)偏移时间,
drift = (ttl*ClockDriftFactor).to_i + 2 # 时钟不同步偏移 + redis的过期(expire)精度偏移
validity_time = ttl-((Time.now.to_f*1000).to_i - start_time)-drift # 锁的真实有效时间
# 必须达到quorum个服务器上锁且真实有效时间大于0才视为有效上锁
if n >= @quorum && validity_time > 0
return {
:validity => validity_time,
:resource => resource,
:val => val
}
else
# 未有效上锁时在所有服务器上解锁
@servers.each{|s|
unlock_instance(s,resource,val)
}
end
# Wait a random delay before to retry
# 等待随机事件再重试
sleep(rand(@retry_delay).to_f/1000)
}
return false
end
def unlock(lock)
@servers.each{ |s|
unlock_instance(s,lock[:resource],lock[:val])
}
end
|
redis实现分布式锁可以使用带nx
和px
参数的set
,比较简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| def lock_instance(redis,resource,val,ttl)
begin
return redis.client.call([:set,resource,val,:nx,:px,ttl])
rescue
return false
end
end
def unlock_instance(redis,resource,val)
begin
# 使用上面声明的代码来解锁
redis.client.call([:eval,UnlockScript,1,resource,val])
rescue
# Nothing to do, unlocking is just a best-effort attempt.
# 解锁是尽力而为的尝试,解锁失败也无事可做
end
end
|