Phaser

介绍

1
以下从java jdk1.8的api中摘录

​ 一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。

注册机制

​ 与其他barrier的情况不同,Phaser中的“注册的同步者(parties)”会随时间而变化。Phaser可以通过构造器初始化parties个数,也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties。运行时可以随时加入、注销parties,只会影响Phaser内部的计数器,它建立任何内部的bookkeeping(账本),因此task不能查询自己是否已经注册了,当然你可以通过实现子类来达成这一设计要求。

1
2
3
4
5
6
7
//伪代码  
Phaser phaser =new Phaser();
phaser.register();//parties count: 1
....
phaser.arriveAndDeregister()://count : 0;
....

CyclicBarrier、CountDownLatch需要在初始化的构造函数中指定同步者的个数,且运行时无法再次调整。

1
2
3
4
5
6
CountDownLatch countDownLatch = new CountDownLatch(12);  
//count deregister parties after all
//parties count is 12 all the times
//if you want change the number of parties, you should create a new instance.
CyclicBarrier cyclicBarrier =new CyclicBarrier(12);

同步机制

​ 类似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果类似于CyclicBarrier的await()。Phaser的每个周期(generation)都有一个phase数字,phase 从0开始,当所有的已注册的parties都到达后(arrive)将会导致此phase数字自增(advance),当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既可以标记和控制parties的wait行为、唤醒等待的时机。

到达

​ Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞(block),但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。

等待

​ Phaser中的awaitAdvance()方法,需要指定一个phase数字,表示此Thread阻塞直到phase推进到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期开始(或者当前phase结束)。不像CyclicBarrier,即使等待Thread已经interrupted,awaitAdvance方法会继续等待。Phaser提供了Interruptible和Timout的阻塞机制,不过当线程Interrupted或者timout之后将会抛出异常,而不会修改Phaser的内部状态。如果必要的话,你可以在遇到此类异常时,进行相应的恢复操作,通常是在调用forceTermination()方法之后。

Phaser通常在ForJoinPool中执行tasks,它可以在有task阻塞等待advance时,确保其他tasks的充分并行能力。

终止

​ Phaser可以进入Termination状态,可以通过isTermination()方法判断;当Phaser被终止后,所有的同步方法将会立即返回(解除阻塞),不需要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。当然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即所有的parties都会被deregister,即register个数为0。

分层

​ Phaser可以“分层”,以tree的方式构建Phaser来降低“竞争”。如果一个Phaser中有大量parties,这会导致严重的同步竞争,所以我们可以将它们分组并共享一个parent Phaser,这样可以提高吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。当Child Phaser中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;当Child Phaser中的parties变为0时(比如由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

监控

​ 同步的方法只会被register操作调用,对于当前state的监控方法可以在任何时候调用,比如getRegisteredParties()获取已经注册的parties个数,getPhase()获取当前phase周期数等;因为这些方法并非同步,所以只能反映当时的瞬间状态。

官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
@Override
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}

// allow threads to start and deregister self
phaser.arriveAndDeregister();
}

void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
@Override
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}

void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated()) {
// ... deal with unexpected termination
} else {
p = phaser.arriveAndAwaitAdvance();
}
}
phaser.arriveAndDeregister();
}

API

构造器

public Phaser()

Creates a new phaser with no initially registered parties, no parent, and initial phase number 0. Any thread using this phaser will need to first register for it.

创建一个没有初始化值得phaser,初始阶段为0,没有parent的phaser(就是可以根据一个phaser来生成一个phaser),之后任何线程要使用必须先注册。

public Phaser(int parties)

Creates a new phaser with the given number of registered unarrived parties, no parent, and initial phase number 0.

创建指定数量parties的Phaser,初始阶段为0

public Phaser(Phaser parent,int parties)

Creates a new phaser with the given parent and number of registered unarrived parties. When the given parent is non-null and the given number of parties is greater than zero, this child phaser is registered with its parent.

根据phaser创建一个phaser,并且能够指定注册的parties数量

public Phaser(Phaser parent)

相当于Phaser(parent,0)

方法

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class MyPhaser {
static Random r = new Random();
static MarriagePhase phaser = new MarriagePhase();

public static void main(String[] args) {
phaser.bulkRegister(7);
for (int i = 0; i < 5; i++) {
new Thread(new Person("P"+i)).start();
}
new Thread(new Person("新娘")).start();
new Thread(new Person("新郎")).start();
}

private static class MarriagePhase extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase){
case 0:
System.out.println("所有人都到齐了");
return false;
case 1:
System.out.println("所有人都吃完了");
return false;
case 2:
System.out.println("所有人都离开了");
return false;
case 3:
System.out.println("婚礼结束了");
return false;
default:
return true;
}
}
}

private static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}

@Override
public void run() {
// phrase中的阶段控制主要是这里面的阶段控制
arrive();
eat();
leave();
hug();
}

public void arrive(){
try {
Thread.sleep(r.nextInt(1000));
System.out.println(name+"到达现场");
// 阶段控制实际上是这个产生效果,arriveAndAwaitAdvance说明到达了
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void eat(){
try {
Thread.sleep(r.nextInt(1000));
System.out.println(name+"吃完");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void leave(){
try {
Thread.sleep(r.nextInt(1000));
System.out.println(name+"离开");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void hug(){
try {
if ("新郎".equals(name) || "新娘".equals(name)){
Thread.sleep(r.nextInt(1000));
System.out.println(name+"拥抱");
phaser.arriveAndAwaitAdvance();
}else {
//
phaser.arriveAndDeregister();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
P0到达现场
新郎到达现场
P1到达现场
P2到达现场
新娘到达现场
P3到达现场
P4到达现场
所有人都到齐了
P1吃完
P0吃完
新娘吃完
P3吃完
新郎吃完
P2吃完
P4吃完
所有人都吃完了
P4离开
P3离开
P2离开
新娘离开
P1离开
P0离开
新郎离开
所有人都离开了
新郎拥抱
新娘拥抱
婚礼结束了

参考文章:

https://blog.csdn.net/m0_37556444/article/details/98784594?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-4.no_search_link&spm=1001.2101.3001.4242