面试只要问到分布式,必问分布式锁

摘要:分布式知识是考验一个程序员知识面广度和深度很好的度量标准,而分布式锁又是其中非常重要的一个知识点。

本文分享自华为云社区《分布式锁实现——超级详细、高级程序员必知必会》,作者: 李子捌 。

一、简介

分布式知识是考验一个程序员知识面广度和深度很好的度量标准,而分布式锁又是其中非常重要的一个知识点。

我们知道分布式应用在进行逻辑处理的时候经常涉及到并发的问题,比如对一个转账修改一个用户的账户金额数据,此时可能会涉及多个用户同时对这个用户进行转账,它们都需要将数据读取到各内存中进行修改,然后再存刷新回去,这个时候就会出现并发问题:

这里假设初始account.money = 400

  1. 客户端A,向账户转账100,并未提交到Redis
  2. 客户端B,读取金额为400,并向账户转账200
  3. 客户端A提交
  4. 客户端B提交

最后account.money = 600

执行流程如下所示:

面试只要问到分布式,必问分布式锁

出现上述问题的主要原因是,“读取”和“写入”这两个操作并不是一个原子操作(所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程))。

关于上述问题的解决,分布式锁就可以派上用场了,我们通过分布式锁来限制程序的并发执行,就像Java中的synchronized,常见的分布式锁解决方案如下所示:

  1. 基于数据库锁机制实现的分布式锁
  2. 基于Zookeeper实现的分布式锁
  3. 基于Redis实现的分布式锁

本文和大家共同探讨的是基于Redis的分布式锁。

二、分布式锁的演进

2.1 精细胞与卵细胞的爱情故事

嗯……学过生物的宝宝们都知道,精细胞和卵细胞相遇的故事,上亿精细胞中最后也只会有一个幸运的精细胞和卵细胞发生甜蜜的爱情故事,这是因为当有一个精子进入卵子后,卵子会发生皮质反应、透明带反应使透明带对精子的结合能力下降,阻止了多精受精。分布式锁就好像卵细胞,而万千访问客户端就好比精细胞,无论客户访问并发多激烈,我们也应该保证分布式锁只被一个线程获取到。

2.2 Redis中的分布式锁

2.2.1 setnx

我们将Redis实现分布式锁理解为上厕所蹲坑排队,只有一个坑,但是有多个人要到坑里面去,所以就只能一个一个的来了。

很多人一开始想到的是Redis中一般使用setnx(set if no exists)指令来实现,setnx是如果不存在,则 SET的简写,这个指令描述如下:

  • 只在键 key 不存在的情况下,将键 key 的值设置为 value 。
  • 若键 key 已经存在, 则 SETNX 命令不做任何动作。
127.0.0.1:6379> exists lock  			# lock key不存在
(integer) 0
127.0.0.1:6379> setnx lock true		# 设值成功
(integer) 1
127.0.0.1:6379> setnx lock false	# 覆盖失败
(integer) 0
127.0.0.1:6379> del lock          # 删除lock 释放
(integer) 1

如上这种方案存在的问题非常明显,如果逻辑执行过程中间出现了异常,可能导致del key 指令没有执行,这样会产生死锁。如下图所示:

面试只要问到分布式,必问分布式锁

2.2.2 setnx + expire

在第一种解决方案的基础上,可能部分人会相到,既然主动删除key可能会出现异常情况,那么就设值key的过期时间到期自动删除。

127.0.0.1:6379> setnx lock true		
(integer) 1
127.0.0.1:6379> expire lock 10		# 设值过期时间10s
(integer) 1	
127.0.0.1:6379> setnx lock true		# 10s内再次设值失败
(integer) 0
127.0.0.1:6379> setnx lock true		# 10skey过期,后设置成功
(integer) 1

这种的方案和前面的方案其实并没有本质上的区别,它还是可能会出现服务器异常等情况,导致expire的不到执行的情况,换汤不换药,如下图所示:

面试只要问到分布式,必问分布式锁

2.2.3 原子操作

基于上面两种方案,我们可以发现,产生问题的本质在于两个操作并不是原子操作。方案一中是setnx指令加一个del指令,方案二中是setnx指令加一个expire指令,这两个指令并不是原子指令。基于这个问题,Redis官方将这两个指令组合在了一起,解决Redis分布式锁原子性操作的问题。

先认真看set指令可选参数 EX 和 NX

set key value [EX seconds] [PX milliseconds] [NX|XX]

EX seconds :将键的过期时间设置为 seconds 秒。执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。

PX milliseconds :将键的过期时间设置为 milliseconds 毫秒。执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value 。

NX :只在键不存在时,才对键进行设置操作。执行 SET key value NX 的效果等同于执行 SETNX key value 。

XX :只在键已经存在时,才对键进行设置操作。

127.0.0.1:6379> set lock true EX 10 NX		# 设置 10s生效
OK
127.0.0.1:6379> set lock true EX 10 NX		# 10s内再次设值失败
(nil)
127.0.0.1:6379> set lock true EX 10 NX		# 10s后设置成功
OK

如上这个操作就成功的解决了Redis分布式锁的原子操作问题。

2.2.4 解锁

Redis分布式锁加锁在上面讲述了,而Redis分布式锁的解锁过程其实就是将key删除,key的删除有客户端调用del指令删除,也有设置key的过期时间自动删除。但是这个删除不能乱删除,不能说客户端A请求的锁被客户端B给删除了……,那这把锁就是一把烂锁了。

为了防止客户端A请求的锁被客户端B给删除了这种情况,我们通过匹配客户端传入的锁的值与当前锁的值是否相等来做判断(这个值是随机且保证不会重复的),如果相等就删除,解锁成功。

但是Redis并未提供这样的功能,我们只能通过Lua脚本来处理,因为Lua脚本可以保证多个指令的原子性执行。

示例:

首先设置一个key,这个key的值是123456789,通过客户端传入的value值是否相等来校验是否允许删除这个key

127.0.0.1:6379> get lock
(nil)
127.0.0.1:6379> set lock 123456789	# 设置一个key 值为123456789
OK
127.0.0.1:6379> get lock
"123456789"

在客户机上编写lua脚本,lock.lua文件,文件内容如下

面试只要问到分布式,必问分布式锁

面试只要问到分布式,必问分布式锁

if redis.call("get",KEYS[1]) == ARGV[1] then 
   return redis.call("del",KEYS[1]) 
else
   return 0 
end

测试通过错误的value值去执行lua脚本,这个时候删除key失败,返回0

面试只要问到分布式,必问分布式锁

通过正确的value值执行则返回1,说明key被删除了。

面试只要问到分布式,必问分布式锁

2.2.5 代码实现

一下演示一个spring boot项目来实现Redis分布式锁,为了方便大家使用,我贴出的代码比较全面,篇幅稍多。

pom依赖

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.4.RELEASE</version>
</parent>

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.1</version>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>

  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.3.4</version>
  </dependency>
</dependencies>

Redis配置文件

package com.lizba.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * <p>
 *         Redis简单配置文件
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/7/11 11:17
 */
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    protected static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);
 
    @Value("${spring.redis.host}")
    private String host;
 
    @Value("${spring.redis.port}")
    private int port;
 
    @Value("${spring.redis.jedis.pool.max-active}")
    private int maxTotal;
 
    @Value("${spring.redis.jedis.pool.max-idle}")
    private int maxIdle;
 
    @Value("${spring.redis.jedis.pool.min-idle}")
    private int minIdle;
 
    @Value("${spring.redis.password}")
    private String password;
 
    @Value("${spring.redis.timeout}")
    private int timeout;

    @Bean
    public JedisPool redisPoolFactory() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(maxTotal);
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMinIdle(minIdle);
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, null);
        logger.info("JedisPool注入成功!!");
        logger.info("redis地址:" + host + ":" + port);
        return jedisPool;
    }
}

application.yml配置文件

server:
  port: 18080

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    timeout: 10000
    password:
    jedis:
      pool:
        max-active: 20
        max-idle: 20
        min-idle: 0

获取锁与释放锁代码

package com.lizba.utill;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *       Redis分布式锁简单工具类
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/7/11 11:42
 */
@Service
public class RedisLockUtil {

    private static Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
    /**
     * 锁键 -> key
     */
    private final String LOCK_KEY = "lock_key";
    /**
     * 锁过期时间 -> TTL
     */
    private Long millisecondsToExpire = 10000L;
    /**
     * 获取锁超时时间 -> get lock timeout for return
     */
    private Long timeout = 300L;
    /**
     * LUA脚本 -> 分布式锁解锁原子操作脚本
     */
    private static final String LUA_SCRIPT =
            "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                    " return redis.call('del',KEYS[1]) " +
                    "else" +
                    " return 0 " +
                    "end";
    /**
     * set命令参数
     */
    private SetParams params = SetParams.setParams().nx().px(millisecondsToExpire);

    @Autowired
    private JedisPool jedisPool;
    /**
     * 加锁 -> 超时锁
     *
     * @param lockId  一个随机的不重复id -> 区分不同客户端
     * @return
     */
    public boolean timeLock(String lockId) {
        Jedis client = jedisPool.getResource();
        long start = System.currentTimeMillis();
        try {
            for(;;) {
                String lock = client.set(LOCK_KEY, lockId, params);
                if ("OK".equalsIgnoreCase(lock)) {
                    return Boolean.TRUE;
                }
                // sleep -> 获取失败暂时让出CPU资源
                TimeUnit.MILLISECONDS.sleep(100);
                long time = System.currentTimeMillis() - start;
                if (time >= timeout) {
                    return Boolean.FALSE;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        } finally {
            client.close();
        }
        return Boolean.FALSE;
    }
    /**
     * 解锁
     *
     * @param lockId 一个随机的不重复id -> 区分不同客户端
     * @return
     */
    public boolean unlock(String lockId) {
        Jedis client = jedisPool.getResource();
        try {
            Object result = client.eval(LUA_SCRIPT, Arrays.asList(LOCK_KEY), Arrays.asList(lockId));
            if (result != null && "1".equalsIgnoreCase(result.toString())) {
                return Boolean.TRUE;
            }
            return Boolean.FALSE;
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        return Boolean.FALSE;
    }

}

测试类

package com.lizba.controller;

import cn.hutool.core.util.IdUtil;
import com.lizba.utill.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * <p>
 *		测试
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/7/11 12:27
 */
@RestController
@RequestMapping("/redis")
public class TestController {

    @Autowired
    private RedisLockUtil redisLockUtil;

    private AtomicInteger count ;

    @GetMapping("/index/{num}")
    public String index(@PathVariable int num) throws InterruptedException {
        count = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(num);
        ExecutorService executorService = Executors.newFixedThreadPool(num);

        Set<String> failSet = new HashSet<>();

        long start = System.currentTimeMillis();
        for (int i = 0; i < num; i++) {
            executorService.execute(() -> {
                long lockId = IdUtil.getSnowflake(1, 1).nextId();
                try {
                    boolean isSuccess = redisLockUtil.timeLock(String.valueOf(lockId));
                    if (isSuccess) {
                        count.addAndGet(1);
                        System.out.println(Thread.currentThread().getName() + "  lock success" );
                    } else {
                        failSet.add(Thread.currentThread().getName());
                    }
                } finally {
                    boolean unlock = redisLockUtil.unlock(String.valueOf(lockId));
                    if (unlock) {
                        System.out.println(Thread.currentThread().getName() + "  unlock success" );
                    }
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdownNow();
        failSet.forEach(t -> System.out.println(t + "  lock fail" ));
        long time = System.currentTimeMillis() - start;
        return String.format("Thread sum: %d, Time sum: %d, Success sum:%d", num, time, count.get());
    }

}

测试结果

面试只要问到分布式,必问分布式锁

面试只要问到分布式,必问分布式锁

2.3 Redis的超时问题

Redis分布式锁有一个问题是锁的超时问题,也就是说如果客户端A获取到锁之后去执行任务,任务没跑完锁的超时时间到了,锁就会自动释放,这个时候客户端B就能乘虚而入了,锁就会出现问题!

关于这个问题其实并没有完全的解决办法,但是能通过如下手段去优化:

  1. 尽可能不要在Redis分布式锁中执行较长的任务,尽可能的缩小锁区间内执行代码,就像单JVM锁中的synchronized优化一样,我们可以考虑优化锁的区间
  2. 多做压力测试和线上真实场景的模拟测试,估算一个合适的锁超时时间
  3. 做好Redis分布式锁超时任务未执行完的问题发生后,数据恢复手段的准备

三、集群中的分布式锁

3.1 集群分布式锁存在的问题

上述的分布式锁,针对单节点实例的Redis是可行的;但是我们在公司根本不会用单节点的Redis实例,往往采用最简单的都是是Redis一主二从+Sentinel监控配置;在sentinel集群中,虽然主节点挂掉时,从节点会取而代之,客户端无感知,但是上述的分布式锁就可能存在节点之间数据同步异常导致分布式锁失效的问题。

正常情况下客户端向sentinel监控的Redis集群申请分布式锁:

面试只要问到分布式,必问分布式锁

比如,客户端A在主节点(机器1)上申请了一把锁,此时主节点(机器1)挂掉了且锁没来得及同步到从节点(机器2和机器3),此时从节点(机器2)成为了新的主节点,但是锁在新的主节点(机器2)上并不存在,所以客户端B申请锁成功,锁的定义在这种场景中就出现了问题!

主节点宕机锁同步失败情况,其他客户端申请锁成功:

面试只要问到分布式,必问分布式锁

上述这种情况虽然之后发生在主从发生failover的情况才产生,但显然是不安全的,普通的业务系统或许能接受,但大金额的业务场景是不允许出现的。

3.2 RedLock

3.2.1 简介

解决这个问题的办法就是使用RedLock算法,也称红锁。RedLock通过使用多个Redis实例,各个实例之间没有主从关系,相互独立;加锁的时候,客户端向所有的节点发送加锁指令,如果过半的节点set成功,就加锁成功。释放锁时,需要向所有的节点发送del指令来释放锁。RedLock的实现思路比较简单,但是实际算法比较复杂,需要考虑非常多的细节问题,如出错重试,时钟漂移等。此外RedLock需要新增较多的实例来申请分布式锁,不仅消耗服务器资源,也会有一定的性能下降。

其架构图如下,客户端向多个独立的Redis服务发送加锁指令(为了追求高吞吐量和低延时,客户端需要使用多路传输来对N个Redis Server服务器进行通信),过半反馈成功则加锁成功,Redis Server的个数最后为奇数。

Redis 中文版网站介绍

http://redis.cn/topics/distlock.html

 

面试只要问到分布式,必问分布式锁

在上述的架构图中,存在5台Redis服务器用于获取锁,那么此时一个客户端获取锁需要做哪些操作呢?

  1. 获取系统当前时间(ms)
  2. 使用相同的key和随机值在5个节点上请求锁,请求锁的过程中包含多个时间值的定义,包括请求单个锁超时时间,请求锁的总耗时时间,锁自动释放时间。单个锁请求的超时时间不宜过大,防止请求宕机的Redis服务阻塞时间过长。
  3. 客户端计算获取锁的总时长和获取锁成功的个数,当所得个数大于等于3且获取锁的时间小于锁的自动释放时间才算成功
  4. 锁获取成功,则锁自动释放时间等于TTL减去获取所得消耗的时间(这个锁消耗的时间计算比较复杂)
  5. 锁获取失败,向所有的Redis服务器发送删除指令,一定是所有的Redis服务器都要发送
  6. 失败重试,锁获取失败后进行重试,重试的时间应该是一个随机值,避免与其他客户端同时请求锁而加大失败的可能,且这个时间应该大于获取锁消耗的时间

3.2.2 锁的最小有效时长

由于上面说到存在时钟漂移的问题,并且客户端向不同的Redis服务器请求锁的时间也会有细微的差异,所以有必要认真的研究一下客户端获取到的锁的最小有效时长计算:

假设客户端申请锁成功,第一个key设置成功的时间为TF,最后一个key设置成功的时间为TL,锁的超时时间为TTL,不同进程之间的时钟差异为CLOCK_DIFF,则锁的最小有效时长是:

TIME = TTL – (TF- TL) – CLOCK_DIFF

面试只要问到分布式,必问分布式锁

3.2.3 故障恢复

采用Redis来实现分布式锁,离不开服务器宕机等不可用问题,这里RedLock红锁也一样,即使是多台服务器申请锁,我们也要考虑服务器宕机后的处理,官方建议采用AOF持久化处理。

但是AOF持久化只对正常SHUTDOWN这种指令能做到重启恢复,但是如果是断电的情况,可能导致最后一次持久化到断电期间的锁数据丢失,当服务器重启后,可能会出现分布式锁语义错误的情况。所以为了规避这种情况,官方建议Redis服务重启后,一个最大客户端TTL时间内该Redis服务不可用(不提供申请锁的服务),这确实可以解决问题,但是显而易见这肯定影响Redis服务器的性能,并且在多数节点都出现这种情况的时候,系统将出现全局不可用的状态。

3.3 Redisson实现分布式锁

3.3.1 Redission简介

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】

Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

Redission github地址

Redission 分布式锁和同步器Wiki

 

总而言之——Redisson非常强大

3.3.2 Reddison之RedLock使用

pom依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.3.2</version>
</dependency>

测试类

package com.liziba.util;

import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *      测试Redisson 之 RedLock
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/7/11 20:55
 */
public class LockTest {

    private static final String resourceName = "REDLOCK_KEY";
    private static RedissonClient cli_79;
    private static RedissonClient cli_89;
    private static RedissonClient cli_99;

    static {
        Config config_79 = new Config();
        config_79.useSingleServer()
                .setAddress("127.0.0.1:6379") // 注意这里我的Redis测试实例没密码
                .setDatabase(0);
        cli_79 = Redisson.create(config_79);

        Config config_89 = new Config();
        config_89.useSingleServer()
                .setAddress("127.0.0.1:6389")
                .setDatabase(0);
        cli_89 = Redisson.create(config_89);

        Config config_99 = new Config();
        config_99.useSingleServer()
                .setAddress("127.0.0.1:6399")
                .setDatabase(0);
        cli_99 = Redisson.create(config_99);
    }
    /**
     * 加锁操作
     */
    private static void lock () {

        // 向3个Redis实例尝试加锁
        RLock lock_79 = cli_79.getLock(resourceName);
        RLock lock_89 = cli_89.getLock(resourceName);
        RLock lock_99 = cli_99.getLock(resourceName);
        RedissonRedLock redLock = new RedissonRedLock(lock_79, lock_89, lock_99);

        try {
            boolean isLock = redLock.tryLock(100, 10000, TimeUnit.MILLISECONDS);
            if (isLock) {
                // do something ...
                System.out.println(Thread.currentThread().getName() + "Get Lock Success!");
                TimeUnit.MILLISECONDS.sleep(10000);
            } else {
                System.out.println(Thread.currentThread().getName() + "Get Lock fail!");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 无论如何一定要释放锁 -> 这里会像所有的Redis服务释放锁
            redLock.unlock();
        }
    }
    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            new Thread(() -> lock()).start();
        }

    }

}

 

点击关注,第一时间了解华为云新鲜技术~

{{o.name}}


{{m.name}}

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/201585.html

(0)
上一篇 2021年11月24日
下一篇 2021年11月24日

相关推荐

发表回复

登录后才能评论