相信使用过redis的,或者正在做分布式开发的童鞋都知道redisson组件,它的功能很多,但我们使用最频繁的应该还是它的分布式锁功能,少量的代码,却实现了加锁、锁续命(看门狗)、锁订阅、解锁、锁等待(自旋)等功能,我们来看看都是如何实现的。
加锁
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();
根据redissonLock.lock()
方法跟踪到具体的private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)
方法,真正获取加锁的逻辑是在tryAcquireAsync
该方法中调用的tryLockInnerAsync()
方法,看看这个方法是怎么实现的?
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 判断是否存在分布式锁,getName()也就是KEYS[1],也就是锁key名
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 加锁,执行hset 锁key名 1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 这个分支是redisson的重入锁逻辑,锁还在,锁计数+1,重新设置过期时长
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回锁的剩余过期时长
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
发现底层是结合lua脚本实现了加锁逻辑。
为什么底层结合了Lua脚本?
Redis是在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到redis执行。使用脚本的好处如下:
1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑,可以一次性放到redis中执行,较少了网络往返时延。这点跟管道有点类似。
2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过
redis的批量操作命令(类似mset)是原子的。
也就意味着虽然脚本中有多条redis指令,那即使有多条线程并发执行,在同一时刻也只有一个线程能够执行这段逻辑,等这段逻辑执行完,分布式锁也就获取到了,其它线程再进来就获取不到分布式锁了。
锁续命(自旋)
大家都听过锁续命,肯定也知道这里涉及到看门狗的概念。在调用tryLockInnerAsync()
方法时,第一个参数是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
也就是默认的看门狗过期时间是private long lockWatchdogTimeout = 30 * 1000
毫秒。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 添加监听器,判断获取锁是否成功,成功的话,添加定时任务:定期更新锁过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
// 根据tryLockInnerAsync方法,加锁成功,return nil 也就是null
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 添加定时任务:定期更新锁过期时间
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
当线程获取到锁后,会进入if (ttlRemaining == null)
分支,调用定期更新锁过期时间scheduleExpirationRenewal
方法,我们看看该方法实现:
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 检测KEYS[1]锁是否还在,在的话再次设置过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
// 通过上面lua脚本执行后会返回1,也就true,再次调用更新过期时间进行续期
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
// 延迟 internalLockLeaseTime / 3再执行续命
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
发现scheduleExpirationRenewal
方法只是用了Timeout作为任务,并没有使用java的Timer()之类的定时器,而是在Timeout任务run()方法中定义了RFuture对象,通过给RFuture对象设置listener,在listener中通过Lua脚本执行结果进行判断是否还需要进行续期。通过这样的方式来给分布式锁进行续期。
这种方式实现定时更新确实很巧妙,定期时间很灵活。
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/293383.html