CountDownLatch实现

CountDownLatch的核心是继承AbstractQueuedSynchronizer的Sync。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}
// 如果还未全部释放则获取失败,返回-1
// 释放完再获取则获取成功,返回1
// 可以看到获取共享锁不会变更state
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放共享锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}

构造器

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

使用流程:

  1. 初始count值就是 AQS(Sync) state 的值(锁引用次数);
  2. 等待同步线程:调用 await() 方法尝试获取共享锁;
  3. 通知线程:调用 countDown() 方法,释放共享锁;

核心原理

利用AQS的共享锁原理,AQS的共享锁未实现tryAcquireShared和tryReleaseShared。

获取锁 await

等待同步await()

1
2
3
4
5
6
7
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

获取锁是通过await() -> sync.acquireSharedInterruptibly(1)

1
2
3
4
5
6
7
8
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果tryAcquireShared获取失败,则执行doAcquireSharedInterruptibly进入等待获取状态
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared

  • 该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取。
  • 该方法返回值是个重点。其一、由上面的源码片段可以看出返回值小于0表示获取锁失败,需要进入等待队列。其二、如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。最后、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

**
doAcquireSharedInterruptibly() 获取锁失败进入等待状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次通过tryAcquireShared获取锁
int r = tryAcquireShared(arg);
// 获取成功
if (r >= 0) {
//设置当前结点为头结点,然后去唤醒后续结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

如果获取成功会执行setHeadAndPropagate

1
2
3
4
5
6
7
8
9
10
11
12
13
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置新的头节点,即把当前获取到锁的节点设置为头节点
setHead(node);
// 1. propagate的值传进来是剩余的锁资源值,大于0代表还能去获取
// 2. 头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

释放锁 countdown

1
2
3
public void countDown() {
sync.releaseShared(1);
}

tryReleaseShared由CountDownLatch实现,逻辑简单直接 cas 减 state。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

当释放锁后,tryReleaseShared()返回true则代表等待中的节点可以去获取锁,而tryReleaseShared由CountDownLatch实现的,只有当释放完锁后的state值为0时才返回true

tryReleaseShared()返回true时,继续执行doReleaseShared()唤醒等待队列中的头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 设置为0,然后唤醒头节点
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

总结

  • CountDownLatch 在初始化时需要传入计数值count,这个count会设置成AQS的state,可以理解为3个线程持有了锁;
  • 需要等待异步任务完成的线程执行 countDownLatch.await() 等待同步;
  • 完成任务的线程执行 countDownLatch.countDown() 释放锁引用并会唤醒等待队列头结点的线程;
  • 当所有任务完成时count值变为0,即AQS的state为0;
  • state为0时 countDownLatch.await() 获取共享锁成功,且获取共享锁成功不会修改state。

**
CountDownLatch直接依赖 AQS 的 state 来实现同步功能。

CyclicBarrier

核心原理

CyclicBarrier 的实现原理非常简单通过 ReentrantLock和Condition来实现任务同步,获取锁任何计数值减一,如果减完计数值还为到0则通过Condition进入等待队列,不然就执行唤醒。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// CyclicBarrier能重复使用,每次重置都会新new一个Generation,代表新的运行时代
private static class Generation {
Generation() {} // prevent access constructor creation
boolean broken; // initially false
}
// 锁,用于修改count和等待唤醒
private final ReentrantLock lock = new ReentrantLock();
// Condition 作等待队列
private final Condition trip = lock.newCondition();
// 初始计数值,用于重置时使用
private final int parties;
// 计数值到0后可以执行的命令
private final Runnable barrierCommand;
// 当前的时代
private Generation generation = new Generation();
// 计数值,当前有多少任务未达到等待点
private int count;

构造器

1
2
3
4
5
6
7
8
9
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

到达同步点 await

CyclicBarrier 的核心方法其实只有一个 await。

1
2
3
4
5
6
7
8
9
10
11
12
13
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

抽取了公共的dowait方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 中断
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 到达同步点则计数值减一
int index = --count;
// 如果计数值到0
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 更新时代
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 计数值还没到0,则通过Condition进入等待队列
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
// 已经更新时代了
// 1. 其他线程判断count到0了
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

重置计数值reset

CyclicBarrier和CountDownLatch最大的不同点就是CyclicBarrier可以循环使用。

1
2
3
4
5
6
7
8
9
10
11
12
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 退出当前时代
breakBarrier(); // break the current generation
// 构建新的时代
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

breakBarrier和nextGeneration都会唤醒等待队列

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

CountDownLatch和CyclicBarrier的区别

相同点:

  • CountDownLatch和CyclicBarrier两个类都能实现异步转同步;

不同点:

  • CountDownLatch对象只能用一次,CyclicBarrier能够重置循环使用;
  • CountDownLatch直接依赖于AQS的state来实现同步,CyclicBarrier利用 ReentrantLock和Condition来实现同步,两种实现上不一样;
  • CountDownLatch的完成和等待是两个方法,参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束;而CyclicBarrier的完成和等待是同一个方法,参与的线程职责相同。