[并发进阶]——读写锁 原理

云惠网小编 2022年1月15日01:18:00
评论
5617字阅读18分43秒
摘要

通过图解流程以及阅读源码的方式带你深入理解读写锁,让你真正理解其原理

广告也精彩
public final void acquireShared(int arg) {
// tryAcquireShared 返回负数, 表示获取读锁失败
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}

2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

答案是必要的。主要是为了保证数据的可见性,如果 当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修 改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级 的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进 行数据更新。

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁(节点自旋)

image-20220112212146161

📌概念

😕锁降级中读锁的获取是否必要呢?

// 外部类 WriteLock 方法, 方便阅读, 放在此处
public void lock() {
sync.acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
// 尝试获得写锁失败
!tryAcquire(arg) &&
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
// 进入 AQS 队列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// AQS 继承过来的方法, 方便阅读, 放在此处
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
// 成功
if (r >= 0) {
// r 表示可用资源数, 在这里总是 1 允许传播
//(唤醒 AQS 中下一个 Share 节点)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (
// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
shouldParkAfterFailedAcquire(p, node) &&
// park 当前线程
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}

6、读线程释放锁

// AQS 继承过来的方法
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

image-20220112212129855

image-20220112210224611

5、紧接着唤醒下一个读线程

文章目录

    • 📌概念
    • 💻读写状态的设计
    • 🎬图解流程以及源码分析
      • 1、t1先上写锁,然后t2尝试获取读锁
      • 2、又 有t3加读锁和 t4加写锁
      • 3、t1释放锁
      • 4、t2恢复运行
      • 5、紧接着唤醒下一个读线程
      • 6、读线程释放锁
    • 🔒锁降级

我们假设有两个线程t1、t2,其中t1是写线程,t2是读线程

image-20220112211323960

注意这里要先将waitStatus 先改为 0, 防止 多线程状态下 unparkSuccessor 被多次执行

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

因为读线程是并行的,所以要继续唤醒下一个读线程

// Sync 继承过来的方法
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free) {
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}

3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

假设t3是读锁、t4是写锁

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

3、t1释放锁

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读 锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

读写锁将 变量切分成了两个部分,高16位表示读,低16位表示写

2、又 有t3加读锁和 t4加写锁

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒

实际上,在读写锁中,只会返回-1、1两种值

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

// AQS 继承过来的方法
public final boolean release(int arg) {
// 尝试释放写锁成功
if (tryRelease(arg)) {
// unpark AQS 中等待的线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

1、t1先上写锁,然后t2尝试获取读锁

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读 状态(S>>>16)大于0,即读锁已被获取

这时 t2 已经恢复运行 ,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

image-20220112210612334

🎬图解流程以及源码分析

image-20220112211020968

image-20220112202308470

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryReleaseShared(int unused) {
// ... 省略不重要的代码
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
// 计数为 0 才是真正释放
return nextc == 0;
}
}
}

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

image-20220112205731698

4、t2恢复运行

image-20220112211438439

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

⚠️注意事项:

image-20220112205739754

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

💡通过位运算。

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置自己为 head
setHead(node);
// propagate 表示有共享资源(例如共享读锁或信号量)
// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点
if (s == null || s.isShared()) {
doReleaseShared();
}
}
}

那如何才能实现一个整型变量可以维护多个状态?

🚩读写锁与排它锁不同在于它锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

假设当前同步状态 值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移 16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是 S+0x00010000。

// AQS 继承过来的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

image-20220112212200469

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

读写锁是如何迅速确定读和写各自的状态呢?

image-20220112211828461

image-20220112212017005

  • 读锁不支持条件变量

  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

  • 重入时降级支持:即持有写锁的情况下去获取读锁

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位

注意此处的t2不是设置为exclusiveOwnerThread,因为读线程是并行的

⚠️RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的 也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了 数据,则其更新对其他获取到读锁的线程是不可见的。
锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

读写锁同样是通过自定义同步器来实现同步功能,而不同于ReentrantLock中的同步状态仅能表示锁被一个线程重复获取的次数,读写锁状态(一个整型变量)可以表示多个读线程和一个写线程的状态。

image-20220112210911985

笔记来源于 黑马程序员全面深入学习Java并发编程,从《Java并发编程的艺术》中作为补充

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

🔒锁降级

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一,如下state从0_0变成了1_0

image-20220112211610042

💻读写状态的设计

5)如果没有成功,在 doAcquireShared 内 循环一次,把前驱节点的 waitStatus 改为 -1,再 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

本文转自 https://blog.csdn.net/weixin_65349299/article/details/122463278

腾讯云618
未分类
云惠网小编
SpringCloud -- Config、Bus解析

SpringCloud — Config、Bus解析

1、Config1.1、概述简介1. 分布式面临的问题微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现大量的服务。由于每个服务都需要必要...
Java数据结构-了解复杂度

Java数据结构-了解复杂度

2.实例分析与计算  四.写在最后  // 计算斐波那契递归fibonacci的时间复杂度 int fibonacci(int N) { return N < 2 ? N : fibonacci...
[深度解剖C语言] --关键字 static

[深度解剖C语言] –关键字 static

static ---最名不副实的关键字目录1.static修饰全局变量2.static修饰函数3.static修饰局部变量static的作用:1.static修饰全局变量我们创建两...
Java数据结构-认识顺序表

Java数据结构-认识顺序表

目录二.顺序表1.概念及结构2.顺序表的实现打印顺序表获取顺序表的有效长度在pos位置新增元素判断是否包含某个元素查找某个元素对应的位置获取/查找pos位置的元素给pos位置的元素...
腾讯云618

发表评论