锁
# setnx 加锁有什么问题?
加锁 redis.setnx(lockKey, 1) 和 redis.expire(lockKey, 20) 不是原子操作,如果加锁成功,但是设置过期时间失败,将导致锁无法得到释放。可使用 set 操作一步完成加锁同时设置过期时间 set lockKey 1 EX 100 NX
# 释放了别人的锁
用户 A 和 B 都要对 lockKey 加锁,用户 A 先成功加锁,继续执行业务,执行时间较长导致锁超过了超时时间,此时用户 B 过来请求对 lockKey 加锁,加锁成功了,此时 A 执行完了业务操作要过来释放锁了,这时会把 B 加的锁给释放了,这样就用户 B 的锁就有问题了。为了解决这个问题,可以在设置锁的时候把锁的 value 值设置为 requestId ,在释放锁的时候先检查是否是自己的 requestId 再去释放。
if($redis.get($lockKey) == $requestId){
$redis.del($lockKey);
}
以上代码同样不是原子性操作,所以还是建议使用 lua 脚本处理
# 锁超时过期问题
还是有 A B 两个线程,A 线程先获取到了锁并设置了过期时间 lockTime ,由于线程 A 执行的时间比较长,导致超过了 lockTime 的时间,这样就可能导致线程 B 也获取到了锁,此时就无法保证严格的串行执行了。
解决这个问题单纯的延长 lockTime 并不可取,无法确定 lockTime 具体的取值范围。
能想到的办法是在启动线程 A 的同时开启一个守护线程 a,比如让守护线程 a 在 2/3 个 lockTime 时间内执行一次延长操作,每次延长 2/3 lockTime 的时间,当主线程 A 结束时守护线程 a 也同时结束。守护线程伪代码如下:
int lockTime = 9;
int waitTime = lockTime / 3 * 2;
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";
while(true){
Thread.sleep(waitTime);
//执行 lua 延长时间
if(luaScript()){
// 延长成功
}else{
// 未延长时间,任务已经完成了,结束守护线程
this.stop();
}
//
}
# 自旋锁-并发请求获取锁失败问题?
线程 A 和 线程 B 同时获取锁,如果 A 先获取到了然后去处理业务了,那 B 是返回失败还是怎么办?解决办法:用自旋锁, 当线程 B 获取不到锁时,不直接返回错误,在规定的时间内(比如500ms)内,不断的循环(一定间隔时间)去尝试获取锁,如果规定时间内还是获取不到锁则返回失败。这种做法对短时间内可以完成的业务可用,如果线程 A 长时间占用锁不释放,线程 B 没必要去等待。
优点:如果业务操作在短时间内可以完成,那么其他线程等一会儿是可以的,因为重新创建线程的资源消耗明显大于等待锁的消耗,有利于资源的利用。
缺点:同时的请求量非常多或者业务操作短时间内无法完成,这种情况只有一个线程获得锁,其他大量线程都在等待,这种场景不合适。
# 可重入锁
有这样一种场景,有个方法 A 在同一个线程中需要多次调用,如果这个线程这个方法 A 是线程安全的(同时只能有一个线程在执行),这样就需要给方法 A 加锁,使方法 A 只允许同时被一个线程占用,普通的锁锁住之后同一个线程中也没有办法执行多次,只能第一次加锁成功,第二次执行时大概率是不成功的,此时就有了可重入锁的出现,为线程 id 加锁,同一线程多次执行时只需累加计数器,同时释放锁时需要释放相同次数的锁记录。类似 Java 中的 Synchronized 的实现原理。
-- lua 加锁代码
-- 如果 lockKey 不存在,也就是没有其他线程在占用锁,此时当前线程可以去加锁
-- 如果 lockKey 存在并且 redis 中的 threadId 是当前 threadId,这说明锁可以重入,计数器加 1
-- 如果 lockKey 存在但是 redis 中的 threadId 不是当前 threadId,此时锁被其他线程占用,不可获取锁
-- EVAL "lua script" 1 lockKey 10000 threadId
-- 1: KEYS 的数量, lockKey: 锁, 10000: 锁的过期时间,单位毫秒, threadId: 线程id
if(redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
else
return 0;
end;
-- lua 解锁
-- lockKey 已经不存在了(可能是过期了),解锁成功
-- 如果其他进程尝试过来解锁,返回 nil
-- EVAL "lua script" 1 lockKey threadId
if(redis.call('exists', KEYS[1]) == 0) then
return 1;
end;
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
end;
local count = redis.call('hincrby', KEYS[1], ARGV[1], -1);
if(count > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
return nil;
# 读写锁-提升读性能
在解决线程并发的场景下,经常会使用到独占锁,也就是同一时刻下只能有一个线程获取到锁。在一些特殊的场景下读操作比写操作多,读操作一般不会影响数据的正确性,这种情况下使用独占锁的话会影响性能。理想情况下是只有写操作是互斥就行,读读操作可共享,此时可以考虑使用读写锁。写写互斥,读写互斥,读读共享。
注意点:
获取写锁时,如果读锁计数器不为 0 或者持有写锁的线程非当前线程时,获取写锁失败。也就是说读锁未被占用并且写锁未被其他线程占用时才可能获得锁。(写锁和读锁都是可重入锁)。这里的代码有点疑问:
/* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; }if (w == 0 || current != getExclusiveOwnerThread()) return false; // 疑问点在于当前线程的 写锁 和 读锁其实是不互斥的,如果只有当前线程获取了读锁那应该发放写锁的 // 但是这个地方的判断是只要有读锁被占用了 或者 获取到锁的线程非当前线程 都不发放写锁 // 尝试解读: // 首先 `c != 0 and w == 0` 是说此时总计时器不为 0 ,但是写锁为 0 ,那就说明了肯定有读锁在占用,读锁肯定不为 0 // 但是读锁不为 0 的情况下,无法判断出是当前线程持有的读锁,还是其他线程持有的读锁,因为读锁是共享锁,所以此处的判断就直接认为只要读锁被占用那就不发放写锁释放写锁时,如果写锁计数器 -1 之后为 0 了,则释放写锁
获取读锁时,当写锁被其他线程获取时,读锁获取失败,否则获取成功。(当前线程的读锁和写锁不互斥)
释放读锁时,大体逻辑也是计数器 -1 判断是否为 0
排队问题:如果读锁长时间被持有会导致写锁饥饿问题。
# 为什么要用分布式锁?
解决高可用的问题,避免单节点发生故障时造成服务不可用
# Redlock 的实现原理
- 获取到当前的系统时间(毫秒)
- 为多个 redis 实例去做加锁操作(最好每个实例的加锁操作都设一个超时时间,避免在一个实例上阻塞过长时间),并记录成功执行的次数
- 如果成功执行的次数大于总数的一半以上则认为可用,然后再判断锁的剩余可用时间是否大于 0。判断剩余可用时间的方法:锁的过期时间(TTL) - (上完锁之后的时间 - 第一步的开始时间 ) - 漂移时间 = 锁真正的可用时间
- 如果第 3 步未成功获取到锁,要在所有的 redis 实例上做解锁操作。(因为如果不是释放所有节点的锁的话会出现如下问题:某节点上 redis 加锁成功了,但由于网络问题未给客户端返回正常状态,可能导致此实例的锁未被正确释放)
实现代码:
public function lock($resource, $ttl)
{
$this->initInstances();
$token = uniqid();
$retry = $this->retryCount;
do {
$n = 0;
$startTime = microtime(true) * 1000;
foreach ($this->instances as $instance) {
if ($this->lockInstance($instance, $resource, $token, $ttl)) {
$n++;
}
}
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 millisecond, plus 1 millisecond min drift
# for small TTLs.
$drift = ($ttl * $this->clockDriftFactor) + 2;
$validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift;
if ($n >= $this->quorum && $validityTime > 0) {
return [
'validity' => $validityTime,
'resource' => $resource,
'token' => $token,
'ttl' => $ttl,
];
} else {
foreach ($this->instances as $instance) {
$this->unlockInstance($instance, $resource, $token);
}
}
// Wait a random delay before to retry
$delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay);
usleep($delay * 1000);
$retry--;
} while ($retry > 0);
return false;
}
# Redlock 有什么问题?
实例重启导致问题。例如有 A、B、C 三个实例,A 和 B 加锁成功,C加锁失败,此时 B 实例因为异常重启了,重启之后 B 的锁并未被持久化保存下来,锁丢失。现在另一个进程过来获取锁,在 B 和 C 实例上加锁成功了,这就导致了锁的异常了。
# 怎么解决 Redlock 的问题?
延迟重启。例如 以上 B 实例异常关闭了之后,不是立即重启,而是等待一会儿再重启,等待的时间超过锁设置的过期时间,这样就可以防止 B 实例的锁异常问题。
# 相关文档:
← 缓存更新策略