分布式锁实现
之前因为接触到这篇分布式锁的文章,地址是:再有人问你分布式锁,这篇文章扔给他,自己兴趣浓烈,决定把里面提到的东西实现一遍,Mysql提到的分布式锁主要还是依赖mysql的事务和锁结构表的维护实现,相对简单,下面只讲Zookeeper和redis分布式锁,包括源码解析。
¶Zookeeper的实现
Zookeeper里面提供的3个分布式锁,分别为InterProcessMutex-可重入互斥锁、InterProcessSemaphoreMutex-不可重入互斥锁、InterProcessReadWriteLock-可重入读写锁,下面对三种锁的源码进行解析,其中第一个InterProcessMutex-可重入互斥锁源码分析参考文章:Zookeeper源码分析1-分布式锁,感谢作者!其他为个人发布。
¶InterProcessMutex-可重入互斥锁
¶构造函数
1 | public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> |
¶acquire,获取锁
1 | // 无限等待 |
¶internalLock
1 | private boolean internalLock(long time, TimeUnit unit) throws Exception { |
¶attemptLock
1 | // 尝试获取锁,并返回锁对应的 Zookeeper 临时顺序节点的路径 |
¶createsTheLock
1 | // From StandardLockInternalsDriver |
¶internalLockLoop
1 | // 循环等待来激活分布式锁,实现锁的公平性 |
¶getsTheLock
1 | // From StandardLockInternalsDriver |
¶release
1 | public void release() throws Exception { |
¶测试过程
测试代码
1 | public static void main(String[] args) throws Exception { |
测试结果:
¶InterProcessSemaphoreMutex
不可重入互斥锁,不再通过线程的map ThreadMap进行重入的记录,使用租约Lease来获得与服务器的链接
¶构造函数
1 | public class InterProcessSemaphoreMutex implements InterProcessLock |
¶acquire,获得锁
1 |
|
¶Lease租约
1 | public interface Lease extends Closeable |
¶InterProcessSemaphore
1 | public class InterProcessSemaphore |
¶测试过程
测试代码:
1 | public static void main(String[] args) throws Exception { |
测试结果:
¶InterProcessReadWriteLock
下面是InterProcessReadWriteLock的类,我们读锁的调用链条是interProcessReadWriteLock.readLock().acquire(),
1 | /** |
¶测试过程
测试代码:
1 |
|
测试结果:从下面的结果可以看出,读锁和写锁相互竞争,多个线程同时获取读锁,但是写锁是独占的,需要释放之后,该线程才被加入到竞争中。
¶Redis分布式锁
¶RedissonLock
我们先来看分布式锁RedissonLock的调用方式
1 | public static void main(String[] args) throws Exception { |
先看看类的调用关系,Redisson实现了父类RLock的接口
¶Redisson类图
先看看常用的Lock方法的实现。
1 |
|
再看lockInterruptibly方法
1 |
|
总结lockInterruptibly
:获取锁,不成功则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁。
下面看看如何获取锁:Long ttl = tryAcquire(leaseTime, unit, threadId)
1 | private Long tryAcquire(long leaseTime, TimeUnit unit) { |
如果leaseTime != -1,调用tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)方法,需要注意的是,此处用到了Netty的Future-listen模型,这儿我不太清楚,后面我会把它挖得清清楚楚。
1 | private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { |
1 | Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) { |
¶CommandExecutor执行脚本
commandExecutor继承关系:
1 | public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor { |
¶commandExecutor类图:
以上evalWriteAsync方法调用redis执行EVAL 命令来执行Lua脚本,Lua脚本参考文档,我们看一下执行获取锁的步骤:
1 | -- 1. 没被锁{key不存在} |
以下是释放锁:
1 |
|
从释放锁代码中看到,删除key后会发送消息,所以上文提到获取锁失败后,阻塞订阅此消息。
另外,上文提到刷新过期时间方法scheduleExpirationRenewal
,指线程获取锁后需要不断刷新失效时间,避免未执行完锁就失效。这个方法的实现原理也类似,只是使用了Netty的TimerTask,每到过期时间1/3就去重新刷一次,如果key不存在则停止刷新。Timer实现大概如下:
1 | private static void nettyTimer() { |
¶测试结果
测试代码:
1 | /** |
运行结果:
redis monitor :
¶CommandAsyncService
我们来回过头来看evalWriteAsync的里面Lua脚本的执行过程,看看到底去哪儿执行了。以下分析来自原文,通过下面的类可以知道是到了CommandAsyncService里面执行,如下
1 | public class CommandAsyncService implements CommandAsyncExecutor { |
追本溯源,看看async实现,
1 | protected <V, R> void async(final boolean readOnlyMode, |
由于代码太长,我只贴出了和执行命令有关的部分代码,我们可以从上面代码中看到
- Redisson 对每次操作都提供了重试机制,可配置
retryAttempts
来控制重试次数(缺省为3次),可配置retryInterval
来控制重试间隔(缺省为 1000 ms)。Redisson 中使用了 Netty 的TimerTask
和Timeout
工具来实现其重试机制。 - Redisson 中也大量使用了 Netty 实现的异步工具
Future
和FutureListener
,使得异步调用执行完成后能够立刻做出对应的操作。 - RedissonConnection 是基于 Netty 实现的,发送命令的
send
方法实现是使用 Netty 的Channel.writeAndFlush
方法。
Redisson使用了Netty链接redis的服务,并依赖Netty异步工具来实现异步通信、重试、阻塞等特性,之后再补全Netty的知识再继续更新!