几种分布式锁的集中实现

分布式锁实现

之前因为接触到这篇分布式锁的文章,地址是:再有人问你分布式锁,这篇文章扔给他,自己兴趣浓烈,决定把里面提到的东西实现一遍,Mysql提到的分布式锁主要还是依赖mysql的事务和锁结构表的维护实现,相对简单,下面只讲Zookeeper和redis分布式锁,包括源码解析。

Zookeeper的实现

Zookeeper里面提供的3个分布式锁,分别为InterProcessMutex-可重入互斥锁、InterProcessSemaphoreMutex-不可重入互斥锁、InterProcessReadWriteLock-可重入读写锁,下面对三种锁的源码进行解析,其中第一个InterProcessMutex-可重入互斥锁源码分析参考文章:Zookeeper源码分析1-分布式锁,感谢作者!其他为个人发布。

InterProcessMutex-可重入互斥锁

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
private final LockInternals internals;
private final String basePath;

//处理可重入的存储线程和锁数据的ConcurrentMap
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);

private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

private static final String LOCK_NAME = "lock-";
// 最常用
public InterProcessMutex(CuratorFramework client,
String path){
// Zookeeper 利用 path 创建临时顺序节点,实现公平锁的核心
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client,
String path, LockInternalsDriver driver){
/*
* maxLeases=1,表示可以获得分布式锁的线程数量(跨 JVM)为 1,即为互斥锁。
* 租约,表示同一时间内可以连接到服务端的客户端个数
*/
this(client, path, LOCK_NAME, 1, driver);
}

// protected 构造函数
InterProcessMutex(CuratorFramework client, String
path, String lockName, int maxLeases,
LockInternalsDriver driver){
basePath = PathUtils.validatePath(path);
/* internals 的类型为 LockInternals ,
* InterProcessMutex 将分布式锁的申请和释放操作委托给internals 执行
*/
internals = new LockInternals(client, driver, path,
lockName, maxLeases);
}

acquire,获取锁

1
2
3
4
5
6
7
8
9
10
11
12
// 无限等待
public void acquire() throws Exception {
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock:" + basePath);
}
}

// 限时等待
public boolean acquire(long time, TimeUnit unit)
throws Exception {
return internalLock(time, unit);
}

internalLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private boolean internalLock(long time, TimeUnit unit) throws Exception {
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
并发注意:一个给定LockData实例只能被单一线程操作,所以锁定不是必须的。这里和release形成呼应,就是同一个线程,如果里面的LockData的lockCount减少到0之后,release才删除在threadData的线程信息,否则仍然保持拥有锁(具体见release里面的逻辑)。
*/
Thread currentThread =
Thread.currentThread();
LockData lockData =
threadData.get(currentThread);
if (lockData != null) {
/**
* 实现可重入
* 同一线程再次 acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则
* 原子 + 1,然后返回
*/
lockData.lockCount.incrementAndGet();
return true;
}
// 映射表内没有对应的锁信息,尝试通过LockInternals 获取锁
String lockPath = internals.attemptLock(time,unit, getLockNodeBytes());
if (lockPath != null) {
// 成功获取锁,记录信息到映射表
LockData newLockData = new
LockData(currentThread, lockPath);
threadData.put(currentThread,
newLockData);
return true;
}
return false;
}

// 映射表
// 记录线程与锁信息的映射关系
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
// 锁信息
// Zookeeper 中一个临时顺序节点对应一个“锁”,但让锁生效激活需要排队(公平锁),下面会继续分析

private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new
AtomicInteger(1); // 分布式锁重入次数

private LockData(Thread owningThread,
String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

attemptLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 尝试获取锁,并返回锁对应的 Zookeeper 临时顺序节点的路径
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
// 无限等待时,millisToWait 为 null
final Long millisToWait = (unit != null) ?
unit.toMillis(time) : null;
// 创建 ZNode 节点时的数据内容,无关紧要,这里为 null,采用默认值(IP 地址)
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
// 当前已经重试次数,与CuratorFramework的重试策略有关
int retryCount = 0;
// 在 Zookeeper 中创建的临时顺序节点的路径,相当于一把待激活的分布式锁
// 激活条件:同级目录子节点,名称排序最小(排队,公平锁),后续继续分析
String ourPath = null;
// 是否已经持有分布式锁
boolean hasTheLock = false;
// 是否已经完成尝试获取分布式锁的操作
boolean isDone = false;
while (!isDone) {
isDone = true;
try {
// 从 InterProcessMutex 的构造函数可知实际 driver 为 StandardLockInternalsDriver 的实例
// 在Zookeeper中创建临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 循环等待来激活分布式锁,实现锁的公平性
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch
(KeeperException.NoNodeException e) {
// 容错处理,不影响主逻辑的理解,可跳过
// 因 为 会 话 过 期 等 原 因 ,StandardLockInternalsDriver 因为无法找到创建的临时 顺序节点而抛出 NoNodeException 异常
if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() -
startMillis, RetryLoop.getDefaultRetrySleeper())) {
// 满足重试策略尝试重新获取锁
isDone = false;
} else {
// 不满足重试策略则继续抛出NoNodeException
throw e;
}
}
}
if (hasTheLock) {
// 成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入
return ourPath;
}
// 获取分布式锁失败,返回 null
return null;
}

createsTheLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// From StandardLockInternalsDriver
// 在 Zookeeper 中创建临时顺序节点
public String createsTheLock(CuratorFramework
client, String path, byte[] lockNodeBytes) throws
Exception {
String ourPath;
// lockNodeBytes 不为 null 则作为数据节点内容,否则采用默认内容(IP 地址)
if (lockNodeBytes != null) {
// 下面对 CuratorFramework 的一些细节做解释,不影响对分布式锁主逻辑的解释,可跳过
// creatingParentContainersIfNeeded:用于创建父节点,如果不支持 CreateMode.CONTAINER
// 那么将采用 CreateMode.PERSISTENT
// withProtection:临时子节点会添加GUID前缀
ourPath = client.create().creatingParentContainersIfNeeded()
//CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper 能保证在节点产生的顺序性
// 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性,后续继续分析
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
ourPath =
client.create().creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}

internalLockLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 循环等待来激活分布式锁,实现锁的公平性
private boolean internalLockLoop(long startMillis,
Long millisToWait, String ourPath) throws Exception {
// 是否已经持有分布式锁
boolean haveTheLock = false;
// 是否需要删除子节点
boolean doDelete = false;
try {
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ((client.getState() ==
CuratorFrameworkState.STARTED) && !haveTheLock) {
// 获取排序后的子节点列表
List<String> children = getSortedChildren();
// 获取前面自己创建的临时顺序子节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 实现锁的公平性的核心逻辑,看下面的分析
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {
// 获得了锁,中断循环,继续返回上层
haveTheLock = true;
} else {
// 没有获得到锁,监听上一临时顺序节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
// exists()会导致导致资源泄漏,因此 exists () 可以监听不存在的 ZNode,因此采用 getData ()
// 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下能直接获得锁,因为锁是公平的

client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if (millisToWait != null) {
millisToWait -=
(System.currentTimeMillis() - startMillis);
startMillis =
System.currentTimeMillis();
if (millisToWait <=
0) {
doDelete =
true; // 获取锁超时,标记删除之前创建的临时顺序节点
break;
}
wait(millisToWait);
// 等待被唤醒,限时等待
} else {
wait(); // 等待被唤醒,无限等待
}
} catch
(KeeperException.NoNodeException e) {
// 容错处理,逻辑稍微有点绕,可跳过,不影响主逻辑的理解
// client.getData()可能调用时抛出 NoNodeException,原因可能是锁被释放或会话过期(连接丢失)等
// 这里并没有做任何处理,因为外层是 while 循环,再次执行 driver.getsTheLock 时会调用 validateOurIndex
// 此 时 会 抛 出NoNodeException,从而进入下面的 catch 和 finally 逻辑,重新抛出上层尝试重试获取锁并删除临时顺序节点
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// 标记删除,在 finally 删除之前创建的临时顺序节点(后台不断尝试)
doDelete = true;
// 重新抛出,尝试重新获取锁
throw e;
} finally {
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}

getsTheLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// From StandardLockInternalsDriver
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
throws Exception {
// 之前创建的临时顺序节点在排序后的子节点列表中的索引
int ourIndex =
children.indexOf(sequenceNodeName);
// 校验之前创建的临时顺序节点是否有效
validateOurIndex(sequenceNodeName,
ourIndex);
// 锁公平性的核心逻辑
// 由 InterProcessMutex 的构造函数可知, maxLeases 为 1,即只有 ourIndex 为 0 时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁
// Zookeeper 的临时顺序节点特性能保证跨多个 JVM 的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁
boolean getsTheLock = ourIndex <
maxLeases;
// 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex - 1)
// 因 为 锁 是 公 平 的 , 因 此 无 需 监 听 除 了(ourIndex - 1)以外的所有节点,这是为了减少羊群效应, 非常巧妙的设计!!
String pathToWatch = getsTheLock ? null :
children.get(ourIndex - maxLeases);
// 返回获取锁的结果,交由上层继续处理(添加监听等操作)
return new PredicateResults(pathToWatch,
getsTheLock);
}

static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
if (ourIndex < 0) {
// 容错处理,可跳过
// 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被 Zookeeper 服务端删除,往外抛出 NoNodeException
// 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点
// 佩服 Curator 的作者将边界条件考虑得 如此周到!
throw new KeeperException.NoNodeException("Sequential path not found:" + sequenceNodeName);
}
}

release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
// 无法从映射表中获取锁信息,不持有锁
throw new IllegalMonitorStateException("You do not own the lock:" + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
// 锁是可重入的,初始值为 1,原子-1 到0,锁才释放
return;
}
if (newLockCount < 0) {
// 理论上无法执行该路径
throw new IllegalMonitorStateException("Lock count has gonenegative for lock:" + basePath);
}
try {
// lockData != null && newLockCount == 0,释放锁资源
internals.releaseLock(lockData.lockPath);
} finally {
// 最后从映射表中移除当前线程的锁信息
threadData.remove(currentThread);
}
}

测试过程

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();


final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");

for (int i = 0; i < 10; i++) {
final int j = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->尝试竞争读");
try {
//阻塞竞争锁
lock.acquire();
System.out.println(Thread.currentThread().getName() + "->成功获得了锁");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
//释放锁
lock.release();
System.out.println(Thread.currentThread().getName() + "->成功释放了锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
}

测试结果:

Screen Shot 2020-05-14 at 11.46.09 AM

InterProcessSemaphoreMutex

不可重入互斥锁,不再通过线程的map ThreadMap进行重入的记录,使用租约Lease来获得与服务器的链接

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
public class InterProcessSemaphoreMutex implements InterProcessLock
{
private final InterProcessSemaphore semaphore;
private volatile Lease lease;

/**
* @param client the client
* @param path path for the lock
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
{
this.semaphore = new InterProcessSemaphore(client, path, 1);
}

acquire,获得锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void acquire() throws Exception
{
//无限等待,使用semaphore,只是返回一个租约,后面的acquire和release过程依然委托给InterProcessMutex的LockInternals,在以上的的可重入互斥锁里面有详细分析,不可重入互斥锁没有了使用threadData ConcurrentMap来处理重入过程
lease = semaphore.acquire();
}

@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
//有限等待
Lease acquiredLease = semaphore.acquire(time, unit);
if ( acquiredLease == null )
{
return false; // important - don't overwrite lease field if couldn't be acquired
}
lease = acquiredLease;
return true;
}

Lease租约

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Lease extends Closeable
{
/**
* Releases the lease so that other clients/processes can acquire it
*
* @throws IOException errors
*/
@Override
public void close() throws IOException;

/**
* Return the data stored in the node for this lease
*
* @return data
* @throws Exception errors
*/
public byte[] getData() throws Exception;
}

InterProcessSemaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class InterProcessSemaphore
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final LockInternals internals;

private static final String LOCK_NAME = "lock-";

/**
* @param client the client
* @param path path for the semaphore
* @param maxLeases the max number of leases to allow for this instance
*/
public InterProcessSemaphore(CuratorFramework client, String path, int maxLeases)
{
this(client, path, maxLeases, null);
}


public Lease acquire() throws Exception
{
//无限等待去获取租约
String path = internals.attemptLock(-1, null, null);
return makeLease(path);
}
/**
* <p>Acquire a lease. If no leases are available, this method blocks until either the maximum
* number of leases is increased or another client/process closes a lease. However, this method
* will only block to a maximum of the time parameters given.</p>
*
* <p>The client must close the lease when it is done with it. You should do this in a
* <code>finally</code> block.</p>
*
* 获取租约的时候,如果没有lease可用(其实就是拿不到lock的ZooKeeper的path,lease是通过这个path来形
* 成租约的),这个方法将锁定并等待,知道增加到最大租约数,或者其他客户端/进程关掉租约。然而,这个方法锁
* 定等待的时间只达到time所给的值。
*
* 方法执行之后客户端必须关闭租约,你用该在finally里面去执行。
*
* @param time time to wait
* @param unit time unit
* @return the new lease or null if time ran out
* @throws Exception ZK errors, interruptions, etc.
*/
public Lease acquire(long time, TimeUnit unit) throws Exception
{
//有限等待去获取租约
String path = internals.attemptLock(time, unit, null);
return (path != null) ? makeLease(path) : null;
}

//生成租约
private Lease makeLease(final String path)
{
return new Lease()
{
@Override
public void close() throws IOException
{
try
{
internals.releaseLock(path);
}
catch ( KeeperException.NoNodeException e )
{
log.warn("Lease already released", e);
}
catch ( Exception e )
{
throw new IOException(e);
}
}

@Override
public byte[] getData() throws Exception
{
return internals.getClient().getData().forPath(path);
}
};
}

测试过程

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();


final InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework,"/locks");

for (int i = 0; i < 10; i++) {
final int j = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->尝试竞争读");
try {
//阻塞竞争锁
lock.acquire();
System.out.println(Thread.currentThread().getName() + "->成功获得了锁");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
//释放锁
lock.release();
System.out.println(Thread.currentThread().getName() + "->成功释放了锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
}

测试结果:

Screen Shot 2020-05-14 at 11.34.41 AM

InterProcessReadWriteLock

下面是InterProcessReadWriteLock的类,我们读锁的调用链条是interProcessReadWriteLock.readLock().acquire(),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/**
* <p>
* A re-entrant read/write mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes
* in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is
* "fair" - each user will get the mutex in the order requested (from ZK's point of view).
* 可重入互斥锁通过JVM工作,使用Zookeeper来控制,所有进程通过同一个lock节点路径获取进程间关键部分。另外,这个互斥锁是公平的,依赖于请求顺序。
* </p>
*
* <p>
* A read write lock maintains a pair of associated locks, one for read-only operations and one
* for writing. The read lock may be held simultaneously by multiple reader processes, so long as
* there are no writers. The write lock is exclusive.
* 锁包含一对相关的子锁,一个负责只读一个负责写,一旦线程里面没有写锁,读锁被多线程中同步持有。写锁是独占的。
* </p>
*
* <p>
* <b>Reentrancy</b><br/>
* This lock allows both readers and writers to reacquire read or write locks in the style of a
* re-entrant lock. Non-re-entrant readers are not allowed until all write locks held by the
* writing thread/process have been released. Additionally, a writer can acquire the read lock, but not
* vice-versa. If a reader tries to acquire the write lock it will never succeed.<br/><br/>
* "重入性"
* 锁包含读锁和写锁进行重入地进行读和写的锁定,不是重入的读操作者不被允许,直到写锁在被线程/进程的操作被释 * 被释放。另外,写操作者可以获取读锁,相反则不成立,一个读操作者获取写锁是绝对不会成功。
*
* <b>Lock downgrading</b><br/>
* Re-entrancy also allows downgrading from the write lock to a read lock, by acquiring the write
* lock, then the read lock and then releasing the write lock. However, upgrading from a read
* lock to the write lock is not possible.
* "锁降级"
* 重入机制允许写锁降级为读锁,通过获取写锁,变成读锁,然后后释放掉写锁。然而,从读锁升级到写锁是不可以的。
* </p>
*/
public class InterProcessReadWriteLock
{
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;

// must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";

private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
{
/**
*
*从fixForSorting里面返回的str是顺序节点的顺序值,如_c_8edecf62-2ce9-4413-b77e-2411861ac8db-__WRIT__0000000029,
*str是:0000000029,这里面是作为LockInternals里面的上级监控点提供一个数值来确定是哪一个lock的
*/
@Override
public final String fixForSorting(String str, String lockName)
{
str = super.fixForSorting(str, READ_LOCK_NAME);
str = super.fixForSorting(str, WRITE_LOCK_NAME);
return str;
}
}

private static class InternalInterProcessMutex extends InterProcessMutex
{
private final String lockName;

InternalInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
}

@Override
public Collection<String> getParticipantNodes() throws Exception
{
Collection<String> nodes = super.getParticipantNodes();
Iterable<String> filtered = Iterables.filter
(
nodes,
new Predicate<String>()
{
@Override
public boolean apply(String node)
{
return node.contains(lockName);
}
}
);
return ImmutableList.copyOf(filtered);
}
}

/**
* @param client the client
* @param basePath path to use for locking
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath)
{
//写锁初始化
writeMutex = new InternalInterProcessMutex
(
client,
basePath,
WRITE_LOCK_NAME,
1,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);

//读锁初始化
readMutex = new InternalInterProcessMutex
(
client,
basePath,
READ_LOCK_NAME,
Integer.MAX_VALUE,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
/**
*读锁的初始化和写锁不同,写锁继承父类的getsTheLock方法,
*而读锁自己实现了获取锁的逻辑readLockPredicate,PredicateResults里面包含成员变量
* boolean getsTheLock(是否获得锁),String pathToWatch监听的路径
*/
return readLockPredicate(children, sequenceNodeName);
}
}
);
}

/**
* Returns the lock used for reading.
*
* @return read lock
*/
public InterProcessMutex readLock()
{
return readMutex;
}

/**
* Returns the lock used for writing.
*
* @return write lock
*/
public InterProcessMutex writeLock()
{
return writeMutex;
}

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
/**
*如果writeMutex被当前的线程占有的话,返回new PredicateResults(null, true)
*writeMutex继承自InterProcessMutex,里面的getsTheLock中返回的即是PredicateResults,
*var1 表示watchThePath为空,不对其他所节点进行监听,
*var2 表示haveTheLock,表示获得读锁,这里面继续了类描述里面的锁降级
*/
return new PredicateResults(null, true);
}

int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = Integer.MAX_VALUE;
for ( String node : children )
{

if ( node.contains(WRITE_LOCK_NAME) )
{
/**
* 如果当前线程没有被写锁占有,且node路径包含写锁标识
* 计算出最小值赋值给firstWriteIndex
*/
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
/**
* 正常情况下包含读锁,进行ourIndex的赋值
*/
ourIndex = index;
break;
}

++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

/**
*通过上一轮loop赋值,如果ourInde小于写锁的下表,那getsTheLock为ture,可以获得读锁,否者获取读锁
*为false,且要监听读锁的路径,这里体现了写锁的独占性。只有当写锁释放了之后,其他读锁才在此线程中进
*行竞争。
*/
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
}

测试过程

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curatorFramework, "/locks");
for (int i = 0; i < 10; i++) {
final int j =i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->尝试竞争读锁");
try {
if(j%3==0) {
lock.writeLock().acquire();
System.out.println(Thread.currentThread().getName() + "->成功获得了写锁");
}else {
lock.readLock().acquire(); //阻塞竞争锁
System.out.println(Thread.currentThread().getName() + "->成功获得了读锁");
}

} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
//释放锁
if(j%3==0) {
lock.writeLock().release();
System.out.println(Thread.currentThread().getName() + "->成功释放了写锁");
}else {
lock.readLock().release();
System.out.println(Thread.currentThread().getName() + "->成功释放了读锁");
}

} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
}

测试结果:从下面的结果可以看出,读锁和写锁相互竞争,多个线程同时获取读锁,但是写锁是独占的,需要释放之后,该线程才被加入到竞争中。

Redis分布式锁

RedissonLock

我们先来看分布式锁RedissonLock的调用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws Exception {
Redisson redisson = Redisson.create();

RLock lock = redisson.getLock("haogrgr");
lock.lock();
try {
System.out.println("hagogrgr");
}
finally {
lock.unlock();
}

redisson.shutdown();
}

先看看类的调用关系,Redisson实现了父类RLock的接口

Redisson类图

先看看常用的Lock方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}

再看lockInterruptibly方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
/*
* 获取成功,为什么ttl == null可以判断获取为空,具体的tryAcquire再后续有讲解
*/
return;
}

// 异步订阅redis chennel
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 阻塞获取订阅结果
commandExecutor.syncSubscription(future);

try {
while (true) {
// 循环判断知道获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消订阅
unsubscribe(future, threadId);
}
}

总结lockInterruptibly:获取锁,不成功则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁。

下面看看如何获取锁:Long ttl = tryAcquire(leaseTime, unit, threadId)

1
2
3
4
5
6
private Long tryAcquire(long leaseTime, TimeUnit unit) {
if (leaseTime != -1) {
return get(tryLockInnerAsync(leaseTime, unit, Thread.currentThread().getId()));
}
return get(tryLockInnerAsync(Thread.currentThread().getId()));
}

如果leaseTime != -1,调用tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)方法,需要注意的是,此处用到了Netty的Future-listen模型,这儿我不太清楚,后面我会把它挖得清清楚楚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
//1 如果设置了超时时间,直接调用 tryLockInnerAsync
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//2 如果leaseTime==-1,则默认超时时间为30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
//3 监听Future,获取Future返回值ttlRemaining(剩余超时时间),获取锁成功,但是ttlRemaining,则刷新过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}

Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

CommandExecutor执行脚本

commandExecutor继承关系:

1
2
3
public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {

}

commandExecutor类图:

以上evalWriteAsync方法调用redis执行EVAL 命令来执行Lua脚本,Lua脚本参考文档,我们看一下执行获取锁的步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 1. 没被锁{key不存在}
eval "return redis.call('exists', KEYS[1])" 1 myLock
-- (1) 设置Lock为key,uuid:threadId为filed, filed值为1
eval "return redis.call('hset', KEYS[1], ARGV[2], 1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (2) 设置key过期时间{防止获取锁后线程挂掉导致死锁}
eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110

-- 2. 已经被同线程获得锁{key存在并且field存在}
eval "return redis.call('hexists', KEYS[1], ARGV[2])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (1) 可重入,但filed字段+1
eval "return redis.call('hincrby', KEYS[1], ARGV[2],1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (2) 刷新过去时间
eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110

-- 3. 已经被其他线程锁住{key存在,但是field不存在}:以毫秒为单位返回 key 的剩余超时时间
eval "return redis.call('pttl', KEYS[1])" 1 myLock

以下是释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
												
-- 1. key不存在
eval "return redis.call('exists', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (1) 发送释放锁的消息,返回1,释放成功
eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110

-- 2. key存在,但field不存在,说明自己不是锁持有者,无权释放,直接return nil
eval "return redis.call('hexists', KEYS[1], ARGV[3])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
eval "return nil"

-- 3. filed存在,说明是本线程在锁,但有可能其他地方重入锁,不能直接释放,应该-1
eval "return redis.call('hincrby', KEYS[1], ARGV[3],-1)" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110

-- 4. 如果减1后大于0,说明还有其他重入锁,刷新过期时间,返回0
eval "return redis.call('pexpire', KEYS[1], ARGV[2])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110

-- 5. 如果不大于0,说明最后一把锁,需要释放
-- 删除key
eval "return redis.call('del', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- 发释放消息
eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- 返回1,释放成功

从释放锁代码中看到,删除key后会发送消息,所以上文提到获取锁失败后,阻塞订阅此消息。

另外,上文提到刷新过期时间方法scheduleExpirationRenewal,指线程获取锁后需要不断刷新失效时间,避免未执行完锁就失效。这个方法的实现原理也类似,只是使用了Netty的TimerTask,每到过期时间1/3就去重新刷一次,如果key不存在则停止刷新。Timer实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void nettyTimer() {
final int expireTime = 6;
EventExecutorGroup group = new DefaultEventExecutorGroup(1);
final Timer timer = new HashedWheelTimer();
timer.newTimeout(timerTask -> {
Future<Boolean> future = group.submit(() -> {
System.out.println("刷新key的失效时间为"+expireTime +"秒");
return false;// 但key不存在时,返回true
});
future.addListener(future1 -> {
if (!future.getNow()) {
nettyTimer();
}
});
}, expireTime/3, TimeUnit.SECONDS);
}

测试结果

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 实现锁逻辑
*/
public static void getRedisLock() {
//redisson配置
Config config = new Config();
SingleServerConfig singleSerververConfig = config.useSingleServer();
singleSerververConfig.setAddress("127.0.0.1:6379");

//redisson客户端
RedissonClient redissonClient = Redisson.create(config);


RLock lock = redissonClient.getLock("lock");
try {
lock.tryLock(0, 1, TimeUnit.SECONDS);//第一个参数代表等待时间,第二是代表超过时间释放锁,第三个代表设置的时间制
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("执行");
} finally {
lock.unlock();
}
}

运行结果:

redis-d-lock-result

redis monitor :

redis-monitor

CommandAsyncService

我们来回过头来看evalWriteAsync的里面Lua脚本的执行过程,看看到底去哪儿执行了。以下分析来自原文,通过下面的类可以知道是到了CommandAsyncService里面执行,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}


private <T, R> Future<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
}

追本溯源,看看async实现,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
protected <V, R> void async(final boolean readOnlyMode,
final NodeSource source,
final Codec codec,
final RedisCommand<V> command,
final Object[] params,
final Promise<R> mainPromise,
final int attempt) {
// ....省略部分代码....
// AsyncDetails 是一个包装对象,用来将异步调用过程中的对象引用包装起来方便使用
final AsyncDetails<V, R> details = AsyncDetails.acquire();
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);

// retryTimerTask 用来实现 Redisson 提供的重试机制
final TimerTask retryTimerTask = new TimerTask() {

@Override
public void run(Timeout t) throws Exception {
// ....省略部分代码....
int count = details.getAttempt() + 1;
// ....省略部分代码....
async(details.isReadOnlyMode(), details.getSource(),
details.getCodec(), details.getCommand(),
details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
};
// 启用重试机制
Timeout timeout = connectionManager.newTimeout(retryTimerTask,
connectionManager.getConfig().getRetryInterval(),
TimeUnit.MILLISECONDS);
details.setTimeout(timeout);

// checkConnectionFuture 用于检查客户端是否与服务端集群建立连接,如果连接建立
// 则可发送命令到服务端执行
if (connectionFuture.isDone()) {
checkConnectionFuture(source, details);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(source, details);
}
});
}

// ....省略部分代码....
}

private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) {
// ....省略部分代码....
// 获取客户端与服务端集群建立的连接
final RedisConnection connection = details.getConnectionFuture().getNow();

if (details.getSource().getRedirect() == Redirect.ASK) {
// 客户端接收到 ASK 转向, 先发送一个 ASKING 命令,然后再发送真正的命令请求
// ....省略部分代码....
} else {
// ....省略部分代码....
// 客户端发送命令到服务端
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
// ....省略部分代码....
// 释放本次连接
releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
details.getAttemptPromise(), details);
}

由于代码太长,我只贴出了和执行命令有关的部分代码,我们可以从上面代码中看到

  • Redisson 对每次操作都提供了重试机制,可配置 retryAttempts 来控制重试次数(缺省为3次),可配置 retryInterval 来控制重试间隔(缺省为 1000 ms)。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具来实现其重试机制。
  • Redisson 中也大量使用了 Netty 实现的异步工具 FutureFutureListener,使得异步调用执行完成后能够立刻做出对应的操作。
  • RedissonConnection 是基于 Netty 实现的,发送命令的 send 方法实现是使用 Netty 的 Channel.writeAndFlush 方法。

Redisson使用了Netty链接redis的服务,并依赖Netty异步工具来实现异步通信、重试、阻塞等特性,之后再补全Netty的知识再继续更新!