0%

1658763277173

只记录一下对我而言比较有意义的题目

本期总结:#

  1. 考虑用Pair包装而不是设计class实体类
  2. var特性可以简化变量名的打印
  3. 双点有序(>k)等问题,优先考虑用双指针而不是二分, 先确定固定哪个点,再看怎么移动,最多就4种情况。

#

1658763341991

看着很简单啊,我的想法是直接对每一行搞成一个字符串, 然后做成hasMap,记录这个字符串出现的次数

在拿列去map里匹配即可

结果忘记了数字的范围是1-10^9, 并非0-9,不能直接拼,还得加逗号。痛失五分钟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Solution {
public int equalPairs(int[][] grid) {
int ylen = grid.length;
int xlen = grid[0].length;
String[] rows = new String[ylen];
String[] cols = new String[xlen];
Map<String, List<Integer>> map = new HashMap<>();
for (int y = 0; y < ylen;y++) {
StringBuilder sb = new StringBuilder();
for (int x = 0; x < xlen;x++) {
sb.append(grid[y][x]).append(",");
}
String s = sb.toString();
if (!map.containsKey(s)) {
map.put(s, new ArrayList<>());
}
map.get(s).add(y);
}

int count = 0;
for (int x = 0; x < xlen;x++) {
StringBuilder sb = new StringBuilder();
for (int y = 0; y < ylen;y++) {
sb.append(grid[y][x]).append(",");
}
String s = sb.toString();
if (map.containsKey(s)) {
count += map.get(s).size();
}
}
return count;
}
}

另外java处理字符串和map问题确实比python要麻烦,如果要冲速度,我是不是得学一下python?

题解里的python,可以快速转数量统计的map

1
2
3
4
class Solution:
def equalPairs(self, grid: List[List[int]]) -> int:
cnt = Counter(tuple(row) for row in grid)
return sum(cnt[col] for col in zip(*grid))

#

1658764007681

很明显就是一个会更新的优先队列集合

需要用优先队列加一个检查更新的map实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    class FoodRatings {

class Food {
String name;
String cuisines;
int rate;

public Food(String name, String cuisines, int rate) {
this.name = name;
this.cuisines = cuisines;
this.rate = rate;
}
}

Map<String, Queue<Food>> queues;
Map<String, Food> nowFoodMap = new HashMap<>();
public FoodRatings(String[] foods, String[] cuisines, int[] ratings) {
int len = foods.length;
queues = new HashMap<>();
for (int i = 0;i<len;i++) {
Food food = new Food(foods[i], cuisines[i], ratings[i]);
nowFoodMap.put(foods[i], food);

if (!queues.containsKey(cuisines[i])) {
queues.put(cuisines[i], new PriorityQueue<>((a,b)->(
a.rate != b.rate ? (b.rate - a.rate): (a.name.compareTo(b.name)))));
}
queues.get(cuisines[i]).offer(food);
}
}

public void changeRating(String food, int newRating) {
String cu = nowFoodMap.get(food).cuisines;
Food food1 = new Food(food, cu, newRating);
nowFoodMap.put(food, food1);
queues.get(cu).offer(food1);
}

public String highestRated(String cuisine) {
Queue<Food> queue = queues.get(cuisine);
while (!queue.isEmpty()) {
Food food = queue.peek();
if (food != nowFoodMap.get(food.name)) {
queue.poll();
} else {
return food.name;
}
}
return null;
}
}


/**
* Your FoodRatings object will be instantiated and called as such:
* FoodRatings obj = new FoodRatings(foods, cuisines, ratings);
* obj.changeRating(food,newRating);
* String param_2 = obj.highestRated(cuisine);
*/

但是定义类的时候比较麻烦,后面可以考虑以下的效率优化:

  1. 用Pair做二元组,避免定义内部类麻烦

  2. 用var避免定义变量类型

1658764157594

#

1658764316137

其实这个脑筋急转弯通过纸上推演,很快能得到

num1 OR num2num1 AND num2 的1的个数,等同于num1和num2中1的个数总和

就变成就一个数组中,a[x] + a[y]共有多少对

这个子问题的解法我想复杂了,竟然想到用二分法。。。又因为很久没写了,对二分不熟了,导致浪费了大量时间确认二分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Solution {
public long countExcellentPairs(int[] nums, int k) {
Set<Integer> set = Arrays.stream(nums).mapToObj(Integer::valueOf).collect(Collectors.toSet());

int[] ontCounts = new int[set.size()];
int[] newNums = new int[set.size()];
int t = 0;
for (int s : set) {
newNums[t++] = s;
}
nums = newNums;

for (int i = 0; i < nums.length;i++) {
ontCounts[i] = Integer.bitCount(nums[i]);
}
Arrays.sort(ontCounts);

long result = 0;
for (int i = 0; i < nums.length;i++) {
int oneCount = ontCounts[i];
int needOneCount = k - oneCount;

int left = 0, right = nums.length;
while (left < right) {
int mid = left + (right - left)/2;
if (ontCounts[mid] < needOneCount) {
left = mid+1;
} else if (ontCounts[mid] >= needOneCount){
right = mid;
} else {
break;
}
}
int select = right;
result += Math.max(0, ontCounts.length - select);
}
return result;
}
}

比二分要更快速的可能是用双指针求解这类问题

但是双指针的方向可能要好好想想

即2个指针哪个优先固定, 再移动哪个,哪个方向移动

这题需要left=0, right=length, 固定住left, 让right移动找到一个位置后,right右边的所有点和left想家肯定都满足> k

1
2
3
4
5
6
7
8
long result = 0;
int left = 0, right = ontCounts.length - 1;
for (;left<ontCounts.length;left++) {
while (right >=0 && ontCounts[left] + ontCounts[right] >= k) {
right--;
}
result += (ontCounts.length - 1 - right);
}

[toc]

java中AQS究竟是做什么的?#

当你使用java实现一个线程同步的对象时,一定会包含一个问题:

你该如何保证多个线程访问该对象时,正确地进行阻塞等待,正确地被唤醒?

关于这个问题,java的设计者认为应该是一套通用的机制

因此将一套线程阻塞等待以及被唤醒时锁分配的机制称之为AQS

全称 AbstractQuenedSynchronizer

中文名即抽象的队列式同步器 。

基于AQS,实现了例如ReentenLock之类的经典JUC类。

AQS简要步骤#

  1. 线程访问资源,如果资源足够,则把线程封装成一个Node,设置为活跃线程进入CLH队列,并扣去资源

  2. 资源不足,则变成等待线程Node,也进入CLH队列

  3. CLH是一个如下图所示的双向链式队列

    1658065224905

AQS的资源state#

state定义#

AQS中的资源是一个int值,而且是volatile的,并提供了3个方法给子类使用:

1
2
3
4
5
6
7
8
9
10
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}

// cas方法
compareAndSetState(int oldState, int newState);

如果state上限只有1,那么就是独占模式Exclusive,例如 ReentrantLock

如果state上限大于1,那就是共享模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

已经有CAS方法了,为什么资源state还要定义成volatile的?#

对外暴露的getter/setter方法,是走不了CAS的。而且setter/getter没有被synchronized修饰。所以必须要volatile,保证可见性

这样基于AQS的实现可以直接通过getter/setter操作state变量,并且保证可见性,也避免重排序带来的影响。比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有体现(各种getState、setState)

对资源的操作什么时候用CAS,什么使用setState?#

volatile的state成员有一个问题,就是如果是复合操作的话不能保证复合操作的原子性

因此涉及 state增减的情况,采用CAS

如果是state设置成某个固定值,则使用setState

AQS的CLH队列#

为什么需要一个CLH队列#

这个队列的目的是为了公平锁的实现

即为了保证先到先得,要求每个线程封装后的Node按顺序拼接起来。

CLH本质?是一个Queue容器吗#

不是的,本质上是一个链表式的队列

因此核心在于链表节点Node的定义

1658066659779

除了比较容易想到的prev和next指针外

还包含了该节点内的线程

以及 waitStatus 等待状态

4种等待状态如下:

  • CANCELLED(1): 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
  • SIGNAL(-1):后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
  • CONDITION(-2) : 点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
  • PROPAGATE(-3) : 表示下一次共享式同步状态获取将会无条件地传播下去
  • INIT( 0):

入队是怎么保证安全的?#

入队过程可能引发冲突

因此会用CAS保障入队安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
//多次尝试,直到成功为止
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

出队过程会发生什么?#

一旦有节点出队,说明有线程释放资源了,队头的等待线程可以开始尝试获取了。

于是首节点的线程释放同步状态后,将会唤醒它的后继节点(next)

而后继节点将会在获取同步状态成功时将自己设置为首节点

**注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态 **

AQS详细资源获取流程#

1. tryAcquire尝试获取资源#

AQS使用的设计模式是模板方法模式。

具体代码如下:

1
2
3
4
5
6
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 发现中断过,则触发中断异常
selfInterrupt();
}

即AQS抽象基类AbstractQueuedSynchronizer给外部调用时,都是调的acquire(int arg)方法。这个方法的内容是写死的。
而acquire中,需要调用tryAcquire(arg), 这个方法是需要子类实现的,作用是判断资源是否足够获取arg个

(2条消息) AQS子类的tryAcquire和tryRelease的实现_Mutou_ren的博客-CSDN博客_aqs tryacquire

ReentrantLock中的tryAcquire实现#

这里暂时只谈论一种容易理解的tryAcuire实现,其他附加特性的tryAcquire先不提。

里面主要就做这几件事:

  1. 获取当前锁的资源数
  2. 资源数为0,说明可以抢, 确认是前置节点是头节点,进行CAS试图争抢,抢成功就返回true,并设置当前线程
  3. 没抢成功,返回false
  4. 如果是重入的,则直接set设置增加后的状态值,状态值此时不一定为0和1了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
// state==0代表当前没有锁,可以进行获取
if (c == 0) {
// 非公平才有的判断,会判断是否还有前驱节点,直接自己为头节点了或者同步队列空了才会继续后面的锁的获取操作
if (!hasQueuedPredecessors()
//CAS设置state为acquires,成功后标记exclusiveOwnerThread为当前线程
&& compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前占有线程等于自己,代表重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 出现负数,说明溢出了
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
// 因为是重入操作,可以直接进行state的增加,所以不需要CAS
setState(nextc);
return true;
}
return false;
}

2.addWaiter 添加到等待队列#

当获取资源失败,会进行addWaiter(Node.EXCLUSIVE), arg)。

目的是创建一个等待节点Node,并添加到等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通过CAS竞争队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 竞争队尾失败,于是进行CAS频繁循环竞争队尾
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

3. acquireQueued循环阻塞-竞争#

,并在 "处于头节点时尝试获取资源->睡眠->唤醒“中循环。

当已经跑完任务的线程释放资源时,会唤醒之前阻塞的线程。

当被唤醒后,就会检查自己是不是头节点,如果不是,且认为可以阻塞,那就继续睡觉去了

AQS(acquireQueued(Node, int) 3)–队列同步器 - 小窝蜗 - 博客园 (cnblogs.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final boolean acquireQueued(final Node node, int arg) {
// 标识是否获取资源失败
boolean failed = true;
try {
// 标识当前线程是否被中断过
boolean interrupted = false;
// 自旋操作
for (;;) {
// 获取当前节点的前继节点
final Node p = node.predecessor();
// 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源,若获取资源成功,则执行下述操作
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 说明前继节点已经释放掉资源了,将其next置空,好让虚拟机提前回收掉前继节点
p.next = null; // help GC
// 获取资源成功,修改标记位
failed = false;
// 返回中断标记
return interrupted;
}
// 若前继节点不是头结点,或者获取资源失败,
// 则需要判断是否需要阻塞该节点持有的线程
// 若可以阻塞,则继续执行parkAndCheckInterrupt()函数,
// 将该线程阻塞直至被唤醒
// 唤醒后会检查是否已经被中断,若返回true,则将interrupted标志置于true
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 最终获取资源失败,则当前节点放弃获取资源
if (failed)
cancelAcquire(node);

}
}

4.shouldParkAfterFailedAcquire 检查是否可以阻塞#

该方法不会直接阻塞线程,因为一旦线程挂起,后续就只能通过唤醒机制,中间还发生了内核态用户态切换,消耗很大。

因此会先不断确认前继节点的实际状态,在只能阻塞的情况下才会去阻塞。

并且会过滤掉cancel的线程节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前继节点的等待状态
int ws = pred.waitStatus;
// 如果等待状态为Node.SIGNAL(-1),则直接返回true即可以阻塞
// 因为这说明前继节点完成资源的释放或者中断后,会主动唤醒后继节点的(这也即是signal信号的含义),因此方法外面不用再反复CAS了,直接阻塞吧
if (ws == Node.SIGNAL) return true;

// 如果前继节点的等待值大于0即CANCELLED(1),说明前继节点的线程发生过cancel动作
// 那就继续往前遍历,直到当前节点的前继节点的状态不为cancel
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前继节点的等待状态不为SIGNAL(-1),也不为Cancel(1)
// 那么只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0)
// 直接设置成SIGNAL,下一次还没CAS成功,就直接睡觉了
// 因此在前面所有节点没辩护的情况下, 最多一次之后就会返回true让外面阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

5.parkAndCheckInterrupt() 阻塞线程#

使用LockSupport.park来阻塞当前这个对象所在的线程

1
2
3
4
5
6
7
8
9
10
11
12
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 确认是否是中断导致的park结束,并清除中断标记
return Thread.interrupted();
}

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

lockSupport.park()和普通的wait|notify都有啥区别?#

  1. 面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
  2. 实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

底层实现原理
简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。
底层用的C语言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是针对了mutex和_cond两个变量进行加锁。

6.总体流程图#

1658647899304

频繁出现的interruptd中断标记是做什么用的?#

(2条消息) JUC并发编程基石AQS之中断_LuxBai的博客-CSDN博客_aqs中断

看!源码之AQS中断设计与实现(内涵jvm部分实现) - 简书 (jianshu.com)

对线程调用 t1.interrupt();时

会导致 LockSupport.park() 阻塞的线程重新被唤醒

即有两种唤醒情况: 被前置节点唤醒,或者被外部中断唤醒

这时候要根据调用的acuire类型决定是否在中断发生时结束锁的获取。

上面介绍的是不可中断锁。

在parkAndCheckInterrupt中,当park结束阻塞时时,使用的是 Thread.interrupted() 而不是 .isInterrupted() 来返回中断状态

因为前者会返回线程当前的中断标记状态同时清除中断标志位(置为false)

外层CAS循环时, 就不会让线程受中断标记影响,只是记录一下是否发生过中断

1658419508199

当获取锁成功后,如果发现有过线程中断,则会触发中断异常,

1658419648382

之后便由获取锁的调用者自己决定是否要处理线程中断。像下面这样:

1
2
3
4
5
6
7
8
9
reentrantLock.lock();
try {
System.out.println("t1");
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}

那么另一种情况就是可中断锁了。

ReentranLock有一个lockInterruptibly()方法就是这种情况

线程被唤醒时,如果发现自己被中断过,就会直接抛异常而不是继续获取锁

1658419756962

因此如果你的线程对中断很敏感,那么就是用可中断锁,及时响应。

如果不敏感,也要注意处理中断异常。

AQS的详细资源释放流程#

首先AQS提供的模板方法为release方法。

核心逻辑就是对资源进行尝试性释放

如果成功,就唤醒等待队列中的第一个头节点

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) {
// 是否释放成功,tryRelease是子类要实现的方法
if (tryRelease(arg)) {
Node h = head;
// 判断头节点是否正在阻塞中,是的话唤醒
if (h != null && h.waitStatus != 0)
// 唤醒头节点
unparkSuccessor(h);
return true;
}
return false;
}

看一下ReteenLock中的tryRelease实现

就是减一下资源值。

当资源值清零,则说明可以解除了对当前点的占用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 设置当前占用线程为null
setExclusiveOwnerThread(null);
}
// 不需要CAS,因为只有持有锁的人才能做释放,不担心竞争
setState(c);
return free;
}

AQS如何实现公平和非公平?#

以ReteenLock为例,它内部tryAcquire有两种同步器的实现

  • 非公平同步器NonfairSync

  • 公平同步器FairSync

公平同步器和非公平同步器都是ReentrantLock中定义的一个static内部类

ReentrantLock根据配置的不同,使用这2个同步器做资源的获取和同步操作

他们二者的提供的lock操作,本质上就是AQS的acquire(1)

1
2
3
4
5
6
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

二者在公平和非公平的实现区别上,就是唤醒线程后,只有等待队列的队头节点才会尝试竞争。

而非公平锁是只要唤醒了就可以尝试竞争。

因此核心区别在于hasQueuedPredecessors方法!

1658665318338

公平和非公平锁的优点和缺点#

  1. 饥饿问题

非公平锁可能引发“饥饿”,即一个线程反复抢占获取,而其他线程一直拿不到。

而公平锁不存在饥饿,只要排上队了就一定能拿到

  1. 性能问题

    非公平锁的平均性能比公平锁要高, 因为非公平锁中所有人都可以CAS抢占,如果同步块的时间非常短,那么可能所有人都不需要阻塞,减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。

性能测试中公平锁的耗时是非公平锁的94.3倍, 总切换次数是133倍

Lock类是默认公平还是非公平?#

默认是非公平的,原因就是上文考虑的性能差距过大问题, 因此公平锁只能用于特定对性能要求不高且饥饿发生概率不大的场景中。

独占模式和共享模式的AQS区别#

  • 名字上, 共享模式都会带一个shard

  • 返回值上,独占模式相关acuire方法放回的是boolean类型, 而共享模式返回的是int值

  • 核心概念上, 区别在于同一时刻能否有多个线程可以获取到其同步状态

  • 释放时,共享模式需要用CAS进行释放, 而独占模式的release方法则不需要,直接setState即可。

  • 共享模式应用:信号量、读写锁

共享模式信号量Semaphore的Sync同步器#

先实现了一个静态内部类Sync

和上面的RLock类一个区别在于需要state初始化值,不一定为1

1
2
3
Sync(int permits) {
setState(permits);
}

再继承实现了FairSync和NoFairSync

使用CAS实现值的增加或者减少

公平/非公平的区别同样是hasQueuedPredecessors的判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected int tryAcquireShared(int acquires) {
for (;;) {
// 队头判断,公平锁核心
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 信号量不足,直接返回负数
if (remaining < 0 ||
// 能抢成功,返回修改后的值,抢失败则for循环继续
compareAndSetState(available, remaining))
return remaining;
}
}

AQS如何处理重入#

通过current == getExclusiveOwnerThread()来判断并进行非CAS的setState操作

1
2
3
4
5
6
7
8
9
if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 出现负数,说明溢出了
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
// 因为是重入操作,可以直接进行state的增加,所以不需要CAS
setState(nextc);
return true;
}

注意处理重入问题时,如果是独占锁,是可以直接setState而不需要CAS的,因为不会竞争式地重入!

ReentrantLock释放时,也会处理重入,关键点就是对getState() - release后的处理,是否返回true或者false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有资源数为0才会解锁
// 才算释放成功,否则这锁还是占住了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

AQS如何响应超时#

AQS提供的方法中带有Nanos后缀的方法就是支持超时中断的方法。

核心逻辑就是每次阻塞前,确认nanosTimeout是否已经超时了。

每次唤醒时,将nanosTimeout减去阻塞所花的时间,重新确认,并修改lastTime

关键部分见下图

1658648788193

spinForTimeoutThreshold是什么?#

首先这个值是写死的1000L即1000纳秒

1000纳秒是个非常小的数字,而小于等于1000纳秒的超时等待,无法做到十分的精确,那么就不要使用这么短的一个超时时间去影响超时计算的精确性,所以这时线程不做超时等待,直接做自旋就好了。

AQS的condition条件队列#

(2条消息) AQS条件队列及中断机制_glamour2015的博客-CSDN博客_aqs中断

condition的用法#

condition用于显式的等待通知,等待过程可以挂起并释放锁,唤醒后重新拿到锁。

和直接用lock\unlock去做等待通知的区别在于,lock是不会释放锁的,但是利用的condition的await则可以,且唤醒后会自动重新拿回锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
// if(xxxx)判断不满足条件,等待,释放锁
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
// 做完事情了,通知condition上等待的开始抢占
condition.signal();
} finally {
lock.unlock();
}
}

也提供了一些支持中断、支持超时的等待方法

condition 和 object.wait/notify的区别#

  1. object的wait依赖sync, 只能最多有一个等待队列。 而通过newCondition可以制造多个等待队列

  2. wait不支持中断,而condition支持

  3. condition支持等待特定时间

condition原理分析#

超大原理流程图#

  • await(), 简单来讲就是把当前线程放入condition的等待队列中,然后调用LockSupport.park拉起线程。如果被其他线程通过signal唤醒,则放入同步队列中竞争锁,竞争成功则返回,否则继续竞争。

  • signal方法,就是拿到condition的等待队列头节点,用cas修改节点状态,改成功则唤醒线程。但有可能被别人抢先,所以需要cas操作。

dd

代码结构部分:#

​ Lock提供了newCondition接口给外部锁调用

​ 而newCondition()返回的Condition是一个接口

1659283503458

​ 这个接口的实现类是ConditionObject,放在AQS抽象类的内部类中

1659283517022

原理实现部分:#

等待队列:#

  • 每个condition都有一个属于自己的等待队列

  • 每次调用condition.await, 就插入到等待队列尾部

  • 等待队列插入封装线程的节点时不需要在尾部CAS, 因为必须先获取锁,才能调用await,因此不用CAS竞争

  • 每个Lock只有一个同步队列(用于lock()时阻塞和竞争用), 但是可能会有多个等待队列(用于condition的await)

等待过程#

  1. 添加线程到condition的等待队列尾部

  2. 释放占用的锁,并唤醒同步队列的后继节点

  3. 此时肯定不在aqs的同步队列中了, 用park方法进入阻塞状态

  4. 被唤醒,唤醒时可能是通过sign()被人放入了同步队列, 也可能是被中断唤醒,因此要做checkInterruptWhileWaiting检查看是否继续, 如果同意继续,就继续睡眠,直到进入同步队列

  5. 尝试acquireQueued竞争和抢占state同步状态

  6. 退出前,顺带用unlinkCancelledWaiters清理已经不是CONDITION状态的等待队列节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加本线程到等待队列尾部
Node node = addConditionWaiter();
// 释放锁,唤醒同步队列中的后继节点
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果已经在同步队列中了,说明被成功sign唤醒
while (!isOnSyncQueue(node)) {
// 阻塞挂起
LockSupport.park(this);
// 确认是否需要中断时就退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 在同步队列中,那就按同步队列的规则在队列中用CAS竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理已经不是CONDITION状态的等待队列节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

唤醒过程signal()#

  1. 检查调用signal时,是否当前线程获取了锁,不是则抛异常

    1
    2
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  2. 获取condition队列中的第一个等待节点

    1
    2
    3
    Node first = firstWaiter;
    if (first != null)
    doSignal(first);
  3. 用CAS清除CONDITION状态

    1
    2
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
    return false;
  4. 调用AQS的enq(firstWaitNode),将这个节点放入到同步队列的队尾(需要CAS支撑?因为可能是共享的,即使获取了锁也需要竞争)

    1
    Node p = enq(node);
  5. 移动入同步队列成功后(可能经历了几次CAS),再用unpark方法唤醒,那个线程就进入了上面代码中Park之后的部分了

    1
    2
    3
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  6. 如果是signalAll方法,则等待队列中每个节点都执行一次signal方法,全部移入同步队列中并唤醒(唤醒后他们很可能还会因为抢不到资源而阻塞,但队列位置不同了,也无法再通过sign唤醒了)

1
2
3
4
5
6
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);

用于高并发的AQS实现锁#

JAVA并发编程——ReentrantReadWriteLock锁降级和StampedLock - SegmentFault 思否

读写锁ReentrantReadWriteLock#

读写锁,顾名思义,用于读多写少的场景,这种情况下一般读是没必要强行做互斥的。

没有ReentrantReadWriteLock前,jdk是怎么实现读写机制的?#

jdk5之前是没有读写锁的。

因此需要手写一套等待和通知机制。

写操作开始时,所有晚于写操作的读操作会通过wait进入阻塞状态。

写操作完成后,主动用notifyAll进行通知,其他的读操作再继续执行

写操作和写操作之间的同步则通过sync关键字保障同步

用了ReentrantReadWriteLock后可以怎么做?#

提供了读锁和写锁

读的时候获取读锁

写的时候获取写锁

一旦写锁lock住,后面的读写都会阻塞住,直到写锁被释放。

代码示例: 一个用HashMap + 读写锁实现的安全Cache(代码出自《java并发编程艺术))

关注里面什么时候用r.lock,什么时候用的w.lock

且都用了finally来保证锁的释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 获取一个key对应的value
public static final Object get(String key) {
//
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

ReentrantReadWriteLock支持的特性#

支持公平和非公平锁的实现:提供了带Fair的读写锁,吞吐量仍旧是非公平优先

支持重进入: 写线程获取写锁后, 可以再次基于这个线程获取写锁或者读进行重入

读写锁的实现核心原理#

读写锁的state资源状态设计#

因为AQS只提供了getState()和setState()两个方法来表示状态, 为了不引入新的成员破坏设计,仍旧基于一个state值来实现读写状态。

state是一个32位的int值, 此时前16位表示写锁的重入数量, 后16位表示为写锁持有的数量。

可直接通过excludesiveCount(getState())得到写锁的数量值。

写锁的获取和释放#

核心关键点:

  1. 写锁属于独占锁,因此实现和调用的都是tryAcquire方法,不带shard

  2. 发现有读锁的时候,会直接false获取写锁失败

  3. 写锁支持重入,重入时同样不需要CAS

  4. 公平锁仍然会考虑头节点问题,非头节点直接获取写锁失败

1658673384503

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
return false;
}
setExclusiveOwnerThread(current);
return true;
}

读锁的获取和释放#

  1. 读锁属于共享锁,因此相关方法都带了一个shard

  2. 读锁中对state的处理都多了一个1<<16

  3. 读锁是共享锁,所以tryAcquire方法中就已经在一直做CAS试图获取读锁

    读锁之间不存在阻塞只有CAS,除非遇到写锁,才有可能在外层阻塞

  4. 存在写锁, 但如果是重入场景(即同一个线程),则允许读锁的重入,这是下文锁降级的应用基础

  5. JDK6中引入了当前线程读取读锁的次数getHoldCount,增加了threadLocal来保存各线程读锁的次数情况,实现比较复杂。下面只给出去掉hold的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final int tryAcquireShared(int unused) {
// 一直处理,读锁竞争都是CAS,不存在阻塞!
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c)
throw new Error("Maximum lock count exceeded");
// 有写锁且不是重入,返回-1获取失败
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
return -1;
// cas修改值
if (compareAndSetState(c, nextc))
return 1;
}
}

读写锁的锁降级#

说人话能一个线程同时加2个锁,支持逐步释放。就是当你在一个线程中需要操作完数据完后马上读数据,且希望数据在方法结束前不会变(避免脏读),可以先加写锁,在写完后加读锁, 然后释放写锁, 此时还会保留读锁(降级), 就能保证data的正常使用。

锁降级并非一个特性, 只是一种应用做法,即当你涉及写完要马上读数据操作的情况,可以先加读锁,再释放写锁!

1658670285725

邮戳锁StampedLock#

读写锁有什么缺点?#

读写锁容易造成写饥饿。例如读有9999+的时候,写动作就得等9999+结束才能触发

即读的动作占用了太多写动作的等待时间了。

邮戳锁为什么能解决饥饿#

邮戳锁提供了一个乐观读的方法

这个方法并不需解锁动作,它也不会去阻塞写动作

只有当发现邮戳(可以理解为数据库MVCC里的版本号)有变化

则升级为悲观读,以期待读到最新的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 返回当前邮戳,这个方法不会阻塞写方法!
long stamp = lock.tryOptimisticRead();
// 校验此时是否邮戳不匹配
if(!lock.validate(stamp)){
// 如果不匹配,说明发生了写操作!读动作最好等写动作结束后再触发!
// 锁升级 为悲观读readLock
stamp = stampedLock.readLock();//锁升级为悲观读
// 处理...

// 解锁
stampedLock.unlockRead(stamp);
}

什么时候用读写锁,什么时候用邮戳锁?#

读写锁是读可以阻塞写,读是优先的!

而邮戳锁中,只有写才可以阻塞读, 写是优先的!

因此读动作重要的话选读写锁, 如果写动作(写要及时响应)比较重要的话则选邮戳锁

邮戳锁的缺点#

  • StampedLock 不支持重入,如果写锁之后再做写,就阻塞了

  • StampedLock 的悲观读锁和写锁都不支持条件变量(Condition),这个也需要注意。

  • 使用 StampedLock一定不要调用中断操作,即不要调用interrupt() 方法

​ 如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly()和写锁writeLockInterruptibly()