在分布式系统中,之前单一的用synchronized或lock已经不适用了。分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本博客讨论为第二种
代码实现
现象:模拟多个线程去运算同一个数据 可以发现数据计算是不规则的
package com.zhcx.dispatch.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadTest { private static int max = 10; public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ threadPool.submit(new Runnable() { public void run() { int current = getMax(); if(current>0){ max--; } } }); System.out.println(max); } } private static Integer getMax(){ return max; } }
输出值为混乱的
解决方法:
创建redis锁类
package com.zhcx.dispatch.test; import java.util.Collections; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Component public class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private static JedisPool pool = null; public static Jedis getSource(){ JedisPoolConfig config = new JedisPoolConfig(); // 设置最大连接数 config.setMaxTotal(200); // 设置最大空闲数 config.setMaxIdle(8); // 设置最大等待时间 config.setMaxWaitMillis(1000 * 100); // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.0", 6379, 3000); return pool.getResource(); } /** * * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean getLock(Jedis jedis,String lockKey,String requestId, int expireTime){ String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if(LOCK_SUCCESS.equals(result)){ return true; } return false; } /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
可以看到,我们加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:
-
第一个为key,我们使用key来当锁,因为key是唯一的。
-
第二个为value,我们传的是requestId,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
-
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
-
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
-
第五个为time,与第四个参数相呼应,代表key的过期时间。
解锁代码,写了简单的Lua脚本代码。我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。
简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。
我们加入到之前的线程中
package com.zhcx.dispatch.test; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class ThreadTest { private static int max = 10; synchronized public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ String requestId = UUID.randomUUID().toString(); String lockKey = "lock:"; RedisLock.getLock(RedisLock.getSource(), lockKey, requestId, 30); threadPool.submit(new Runnable() { public void run() { int current = getMax(); if(current>0){ max--; } } }); RedisLock.releaseDistributedLock(RedisLock.getSource(), lockKey, requestId); System.out.println(max); } } private static Integer getMax(){ return max; } }
查看输出:
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/8944.html