redis 分布式锁

distributed locks 原理

分布式锁的 是指不同进程甚至不同机器上的进程要对同一个共享变量进行互斥操作。 在很多场景中很实用, 例如秒杀活动,全局自增 ID 等, 我在刚工作的时候的第一个项目里有一个场景是 不同的进程要对 redis 同一个队列进行处理,其中就用到了 redis 的分布式锁, 还有一些其他的应用场景就不一一说明了。 redis 的特性使得其很容易实现分布式锁的功能,而且官方提出了一个 Redlock 算法。

使用目标

我们希望 分布式锁有三个特性

  • 安全性: 锁应该是互斥的,任何时候只有一个 client 可以持有锁

  • 无死锁: 即使一个持有锁的节点崩溃的情况下其他节点也最终能获取到锁

  • 容错性: 只要大多数 redis 节点正常工作,客户端就能释放和获取锁

简单的分布式锁

锁的获取

SET LIST_A random_value NX PX 30000

最简单的方法是创建一个带超时的 KEY, 例如我们要锁住 队列 A, 我们可以用 SET LIST_A random_value NX PX 30000, 来锁住A, 其中 random_value 是一串随机字符串,并在所有的锁定请求中要求是唯一的(产生一串随机字符串),其中 NX 选项保证了只有在 LIST_A 不存在的情况下才能创建成功。其他 client 需要获取该锁的时候,先检查 LIST_A 是否存在, 如果不存在则能获取该锁。 因为 LIST_A 有设置超时,所以即使占有锁的时候崩溃了也不影响其他客户端获取锁。这样的 分布式锁其实能够满足一般的需求了,但作者强烈不建议使用这种办法,因为不满足 安全性 和 容错性两条特性。

因为这种情况需要考虑到如果该 redis 节点崩溃了那么整个系统就不能使用,即使我们为该节点添加一个从节点。但是因为 redis 的主从复制是异步的,所以如果在复制的过程中 master 节点崩溃,这种情况下也可能导致两个客户端同时获取到该锁。

锁的释放

我们用上面提到的方式就能正确的获取锁,在释放锁的时候,可以利用 LUA 脚本来完成锁的释放(原子性操作)

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

我们只在 KEY 的值是我们自己设置的值的时候才删除该值, 这是为了避免释放掉其他 client 创建的锁。 例如当 client A 获取了一个锁,锁的超时是 10 秒,但 client A 因为其他原因阻塞了 15 秒后才调用释放锁,那么如果这时候不判断值是不是自己设置的话就会将其他 client 的锁给释放掉。 如果在某些不支持 LUA 脚本的情况下可以考虑自己用事务来实现该操作。

Redlock

redlock 算法, 我们假设有N个相互独立的节点。基本原理是获取锁的时候,只有成功锁住 N/2 + 1 个节点是,才算成功获取该锁,或者释放掉已有的锁。下面是官方的介绍。

  • 以毫秒为单位获取当前时间。
  • 尝试在所有N个实例中顺序获取锁,在所有实例中使用相同的 KEY 和 随机的 VALUE。同时,获取锁的超时时间相比锁的自动锁定时间来说应当小很多。当一个节点不可用时,应该尽量尝试同下一个节点获取锁定。
  • 客户端计算通过从当前时间中减去步骤1中获得的时间戳来获取锁定所需的时间。当且仅当客户端能够在大多数实例中获取锁定时,并且获取锁定的总时间小于锁定有效时间,则认为该锁被获取。
  • 如果锁被获取,则其有效时间被认为是初始有效时间减去经过的时间,如步骤3中所计算的。
  • 如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而无法获取锁定,则会尝试解锁所有实例。

redlock 的 Go 实现

下面是 Redloc 的一份Go语言实现,源码经过了注释,为了方便阅读,删除了一些不重要的代码

// Fields of a Mutex must not be changed after first use.
type Mutex struct {
    Name   string        // Resouce name
    Expiry time.Duration // 锁的有效期

    Tries int           // 在确认获取锁失败之前的重试次数
    Delay time.Duration // 在两次获取锁请求之间的间隔

    Factor float64 // Drift factor, DefaultFactor if 0

    Quorum int // // 判定成功的值, 通常是 len(nodes)/ 2 + 1

    value string
    until time.Time

    nodes []Pool
    nodem sync.Mutex
}

// Lock locks m.
// In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) Lock() error {
    m.nodem.Lock()            // 使用本地锁来保护分布式锁的访问
    defer m.nodem.Unlock()

    b := make([]byte, 16)
    _, err := rand.Read(b)
    if err != nil {
        return err
    }
    value := base64.StdEncoding.EncodeToString(b)            // 产生一个随机的 value

    expiry := m.Expiry        
    if expiry == 0 {
        expiry = DefaultExpiry                                // 设置锁的默认过期时间
    }

    retries := m.Tries
    if retries == 0 {
        retries = DefaultTries                                // 默认重试次数
    }

    for i := 0; i < retries; i++ {                            // 不停的尝试获取锁
        n := 0
        start := time.Now()                                    // 获取当前时间
        for _, node := range m.nodes {
            if node == nil {
                continue
            }

            conn := node.Get()
            reply, err := redis.String(conn.Do("set", m.Name, value, "nx", "px", int(expiry/time.Millisecond)))
            conn.Close()            // 依次尝试从每一个节点获取锁
            if err != nil {
                continue        // 失败后立即尝试下一个节点
            }
            if reply != "OK" {
                continue
            }
            n++            // 成功锁定一个节点
        }

        factor := m.Factor
        if factor == 0 {
            factor = DefaultFactor
        }

        until := time.Now().Add(expiry - time.Now().Sub(start) - time.Duration(int64(float64(expiry)*factor)) 
            + 2*time.Millisecond)
        if n >= m.Quorum && time.Now().Before(until) {    // 当大部分节点成功获取锁定,且获取锁的时间小于锁的有效时间
            m.value = value        // 锁被成功获取
            m.until = until        // 锁的有效期
            return nil
        }
        // 如果没有成功锁定分布式锁,那么尝试对所有节点进行解锁(不论之前是否已近锁定了) 
        for _, node := range m.nodes {
            if node == nil {
                continue
            }

            conn := node.Get()
            _, err := delScript.Do(conn, m.Name, value)    // 调用 LUA 的删除脚本 只要是自己 value 和 自己 的值相等,则释放锁
            conn.Close()
            if err != nil {
                continue
            }
        }

        // Have no delay on the last try so we can return ErrFailed sooner.
        if i == retries-1 {
            continue            // 最后一次尝试失败后不要在延迟,立马返回错误
        }

        delay := m.Delay
        if delay == 0 {
            delay = DefaultDelay
        }
        time.Sleep(delay)        // 每一次失败后延迟一段时间,其实官方文档建议的是随机延迟一段时间
    }

    return ErrFailed
}

// Touch resets m's expiry to the expiry value.
// It is a run-time error if m is not locked on entry to Touch.
// It returns the status of the touch
func (m *Mutex) Touch() bool {        // 重置锁的有效期到默认的值
    m.nodem.Lock()
    defer m.nodem.Unlock()

    value := m.value
    if value == "" {
        panic("redsync: touch of unlocked mutex")
    }

    expiry := m.Expiry
    if expiry == 0 {
        expiry = DefaultExpiry
    }
    reset := int(expiry / time.Millisecond)

    n := 0
    for _, node := range m.nodes {
        if node == nil {
            continue
        }

        conn := node.Get()
        reply, err := touchScript.Do(conn, m.Name, value, reset)
        conn.Close()
        if err != nil {
            continue
        }
        if reply != "OK" {
            continue
        }
        n++
    }
    if n >= m.Quorum {
        return true
    }
    return false
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
// It returns the status of the unlock
func (m *Mutex) Unlock() bool {
    m.nodem.Lock()            // 使用本地锁来保护分布式锁
    defer m.nodem.Unlock()

    value := m.value
    if value == "" {
        panic("redsync: unlock of unlocked mutex")
    }

    m.value = ""
    m.until = time.Unix(0, 0)

    n := 0
    for _, node := range m.nodes {        // 对每一个节点释放锁  和 Lock 时不同的是,这里失败后没有重试
        if node == nil {
            continue
        }

        conn := node.Get()
        status, err := delScript.Do(conn, m.Name, value)    // 调用 LUA 脚本释放对该节点的锁定
        conn.Close()
        if err != nil {
            continue
        }
        if status == 0 {
            continue
        }
        n++    // 成功释放对一个节点的锁定
    }
    if n >= m.Quorum {
        return true        // 成功释放掉大部分的节点就视为成功释放掉分布式锁****
    }
    return false
}

var delScript = redis.NewScript(1, `            LUA 脚本****
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end`)

var touchScript = redis.NewScript(1, `
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("set", KEYS[1], ARGV[1], "xx", "px", ARGV[2])
else
    return "ERR"
end`)

总结

仔细看了一下 redis 分布式锁的原理很简单,实现代码也比较简单。但其实其中还是有很多细节需要我们去理解。在使用的时候尽量弄清楚原理对于我们的代码还是有很大帮助的。 作者还在官方文档中讨论了分布式锁的一些其他特性,感兴趣的可以去看看并与作者交流。

参考文档

Distributed locks

如果你觉得本文对你有帮助,欢迎打赏!