目录

# setnx 加锁有什么问题?

加锁 redis.setnx(lockKey, 1)redis.expire(lockKey, 20) 不是原子操作,如果加锁成功,但是设置过期时间失败,将导致锁无法得到释放。可使用 set 操作一步完成加锁同时设置过期时间 set lockKey 1 EX 100 NX

# 释放了别人的锁

用户 A 和 B 都要对 lockKey 加锁,用户 A 先成功加锁,继续执行业务,执行时间较长导致锁超过了超时时间,此时用户 B 过来请求对 lockKey 加锁,加锁成功了,此时 A 执行完了业务操作要过来释放锁了,这时会把 B 加的锁给释放了,这样就用户 B 的锁就有问题了。为了解决这个问题,可以在设置锁的时候把锁的 value 值设置为 requestId ,在释放锁的时候先检查是否是自己的 requestId 再去释放。

if($redis.get($lockKey) == $requestId){
  $redis.del($lockKey);
}

以上代码同样不是原子性操作,所以还是建议使用 lua 脚本处理

# 锁超时过期问题

还是有 A B 两个线程,A 线程先获取到了锁并设置了过期时间 lockTime ,由于线程 A 执行的时间比较长,导致超过了 lockTime 的时间,这样就可能导致线程 B 也获取到了锁,此时就无法保证严格的串行执行了。

解决这个问题单纯的延长 lockTime 并不可取,无法确定 lockTime 具体的取值范围。

能想到的办法是在启动线程 A 的同时开启一个守护线程 a,比如让守护线程 a 在 2/3 个 lockTime 时间内执行一次延长操作,每次延长 2/3 lockTime 的时间,当主线程 A 结束时守护线程 a 也同时结束。守护线程伪代码如下:

int lockTime = 9;
int waitTime = lockTime / 3 * 2;
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";
while(true){
  Thread.sleep(waitTime);
  //执行 lua 延长时间
      if(luaScript()){
      // 延长成功
    }else{
      // 未延长时间,任务已经完成了,结束守护线程
      this.stop();
    }
  //
}
# 自旋锁-并发请求获取锁失败问题?

线程 A 和 线程 B 同时获取锁,如果 A 先获取到了然后去处理业务了,那 B 是返回失败还是怎么办?解决办法:用自旋锁, 当线程 B 获取不到锁时,不直接返回错误,在规定的时间内(比如500ms)内,不断的循环(一定间隔时间)去尝试获取锁,如果规定时间内还是获取不到锁则返回失败。这种做法对短时间内可以完成的业务可用,如果线程 A 长时间占用锁不释放,线程 B 没必要去等待。

优点:如果业务操作在短时间内可以完成,那么其他线程等一会儿是可以的,因为重新创建线程的资源消耗明显大于等待锁的消耗,有利于资源的利用。

缺点:同时的请求量非常多或者业务操作短时间内无法完成,这种情况只有一个线程获得锁,其他大量线程都在等待,这种场景不合适。

# 可重入锁

有这样一种场景,有个方法 A 在同一个线程中需要多次调用,如果这个线程这个方法 A 是线程安全的(同时只能有一个线程在执行),这样就需要给方法 A 加锁,使方法 A 只允许同时被一个线程占用,普通的锁锁住之后同一个线程中也没有办法执行多次,只能第一次加锁成功,第二次执行时大概率是不成功的,此时就有了可重入锁的出现,为线程 id 加锁,同一线程多次执行时只需累加计数器,同时释放锁时需要释放相同次数的锁记录。类似 Java 中的 Synchronized 的实现原理。

-- lua 加锁代码
-- 如果 lockKey 不存在,也就是没有其他线程在占用锁,此时当前线程可以去加锁
-- 如果 lockKey 存在并且 redis 中的 threadId 是当前 threadId,这说明锁可以重入,计数器加 1 
-- 如果 lockKey 存在但是 redis 中的 threadId 不是当前 threadId,此时锁被其他线程占用,不可获取锁
-- EVAL "lua script" 1 lockKey 10000  threadId
-- 1: KEYS 的数量, lockKey: 锁, 10000: 锁的过期时间,单位毫秒,  threadId: 线程id

if(redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
else
    return 0;    
end;
-- lua 解锁
-- lockKey 已经不存在了(可能是过期了),解锁成功
-- 如果其他进程尝试过来解锁,返回 nil
-- EVAL "lua script" 1 lockKey threadId

if(redis.call('exists', KEYS[1]) == 0) then 
    return 1;
end;    
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
    return nil;
end;
local count = redis.call('hincrby', KEYS[1], ARGV[1], -1);
if(count > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end;
return nil;    
# 读写锁-提升读性能

在解决线程并发的场景下,经常会使用到独占锁,也就是同一时刻下只能有一个线程获取到锁。在一些特殊的场景下读操作比写操作多,读操作一般不会影响数据的正确性,这种情况下使用独占锁的话会影响性能。理想情况下是只有写操作是互斥就行,读读操作可共享,此时可以考虑使用读写锁。写写互斥,读写互斥,读读共享。

注意点:

  1. 获取写锁时,如果读锁计数器不为 0 或者持有写锁的线程非当前线程时,获取写锁失败。也就是说读锁未被占用并且写锁未被其他线程占用时才可能获得锁。(写锁和读锁都是可重入锁)。这里的代码有点疑问:

    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    
    if (w == 0 || current != getExclusiveOwnerThread())
            return false;
    // 疑问点在于当前线程的 写锁 和 读锁其实是不互斥的,如果只有当前线程获取了读锁那应该发放写锁的
    // 但是这个地方的判断是只要有读锁被占用了 或者 获取到锁的线程非当前线程 都不发放写锁
    // 尝试解读:
    // 首先 `c != 0 and w == 0` 是说此时总计时器不为 0 ,但是写锁为 0 ,那就说明了肯定有读锁在占用,读锁肯定不为 0 
    // 但是读锁不为 0 的情况下,无法判断出是当前线程持有的读锁,还是其他线程持有的读锁,因为读锁是共享锁,所以此处的判断就直接认为只要读锁被占用那就不发放写锁
    
  2. 释放写锁时,如果写锁计数器 -1 之后为 0 了,则释放写锁

  3. 获取读锁时,当写锁被其他线程获取时,读锁获取失败,否则获取成功。(当前线程的读锁和写锁不互斥)

  4. 释放读锁时,大体逻辑也是计数器 -1 判断是否为 0

排队问题:如果读锁长时间被持有会导致写锁饥饿问题。

# 为什么要用分布式锁?

解决高可用的问题,避免单节点发生故障时造成服务不可用

# Redlock 的实现原理
  1. 获取到当前的系统时间(毫秒)
  2. 为多个 redis 实例去做加锁操作(最好每个实例的加锁操作都设一个超时时间,避免在一个实例上阻塞过长时间),并记录成功执行的次数
  3. 如果成功执行的次数大于总数的一半以上则认为可用,然后再判断锁的剩余可用时间是否大于 0。判断剩余可用时间的方法:锁的过期时间(TTL) - (上完锁之后的时间 - 第一步的开始时间 ) - 漂移时间 = 锁真正的可用时间
  4. 如果第 3 步未成功获取到锁,要在所有的 redis 实例上做解锁操作。(因为如果不是释放所有节点的锁的话会出现如下问题:某节点上 redis 加锁成功了,但由于网络问题未给客户端返回正常状态,可能导致此实例的锁未被正确释放)

实现代码:

public function lock($resource, $ttl)
{
    $this->initInstances();
    $token = uniqid();
    $retry = $this->retryCount;
    do {
        $n = 0;
        $startTime = microtime(true) * 1000;
        foreach ($this->instances as $instance) {
            if ($this->lockInstance($instance, $resource, $token, $ttl)) {
                $n++;
            }
        }
        # Add 2 milliseconds to the drift to account for Redis expires
        # precision, which is 1 millisecond, plus 1 millisecond min drift
        # for small TTLs.
        $drift = ($ttl * $this->clockDriftFactor) + 2;
        $validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift;
        if ($n >= $this->quorum && $validityTime > 0) {
            return [
                'validity' => $validityTime,
                'resource' => $resource,
                'token'    => $token,
                'ttl'      => $ttl,
            ];
        } else {
            foreach ($this->instances as $instance) {
                $this->unlockInstance($instance, $resource, $token);
            }
        }
        // Wait a random delay before to retry
        $delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay);
        usleep($delay * 1000);
        $retry--;
    } while ($retry > 0);
    return false;
}
# Redlock 有什么问题?

实例重启导致问题。例如有 A、B、C 三个实例,A 和 B 加锁成功,C加锁失败,此时 B 实例因为异常重启了,重启之后 B 的锁并未被持久化保存下来,锁丢失。现在另一个进程过来获取锁,在 B 和 C 实例上加锁成功了,这就导致了锁的异常了。

# 怎么解决 Redlock 的问题?

延迟重启。例如 以上 B 实例异常关闭了之后,不是立即重启,而是等待一会儿再重启,等待的时间超过锁设置的过期时间,这样就可以防止 B 实例的锁异常问题。

# 相关文档:

分布式解决方案 (opens new window)

聊聊redis分布式锁的8大坑 (opens new window)

Redlock 详解 (opens new window)

Redis锁从面试连环炮聊到神仙打架 (opens new window)

上次更新: 2024/11/05, 03:15:29