Java中的线程同步工具:Phaser详解
题目描述
Phaser是Java并发包(java.util.concurrent)中一个强大的同步工具,它提供了一种灵活的屏障机制,允许一组线程在多个阶段(phase)上同步。与CountDownLatch和CyclicBarrier相比,Phaser支持动态调整参与的线程数,并且允许线程在任意阶段注册、到达、取消注册。你需要理解Phaser的核心概念、工作原理、常用方法及其适用场景。
核心概念
- 阶段(Phase):Phaser将同步过程划分为多个阶段,每个阶段有一个整数编号(从0开始递增)。所有参与的线程必须在每个阶段都到达屏障点,才能一起进入下一阶段。
- 参与者(Parties):注册到Phaser中的线程数量,可以动态增减。
- 到达(Arrive):线程完成当前阶段工作后,调用相关方法表示已到达屏障点。
- 屏障(Barrier):当所有参与者都到达后,Phaser会自动推进到下一阶段,并触发可选的动作。
逐步讲解
步骤1:Phaser的基本使用模式
创建一个Phaser时,可以指定初始参与者数量(不指定则为0)。线程通过调用arriveAndAwaitAdvance()方法,表示自己完成了当前阶段的工作,并等待其他参与者。当所有参与者都到达后,Phaser会自动将阶段编号加1,并释放所有等待的线程。
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 初始参与者为3个线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 完成阶段0");
phaser.arriveAndAwaitAdvance(); // 到达并等待其他线程
System.out.println(Thread.currentThread().getName() + " 进入阶段1");
}).start();
}
}
}
说明:这里创建了3个参与者,每个线程在阶段0完成后等待,当3个线程都到达后,一起进入阶段1。
步骤2:动态调整参与者数量
Phaser允许动态增加或减少参与者数量,这是它比CountDownLatch和CyclicBarrier更灵活的地方。常用方法:
register():增加一个参与者。bulkRegister(int parties):增加多个参与者。arriveAndDeregister():到达当前阶段,并注销自己(减少一个参与者)。
Phaser phaser = new Phaser(1); // 初始参与者为1(主线程)
// 增加3个新参与者
phaser.bulkRegister(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 到达");
phaser.arriveAndDeregister(); // 到达后注销自己
}).start();
}
// 主线程等待所有线程到达
phaser.arriveAndAwaitAdvance();
System.out.println("所有线程完成");
注意:这里主线程也注册为参与者,通过arriveAndAwaitAdvance()等待3个新线程到达并注销,最终参与者数为0,Phaser终止。
步骤3:Phaser的阶段推进与终止
Phaser进入终止状态的条件:
- 调用
forceTermination()强制终止。 - 所有参与者注销后,参与者数变为0(最后一个线程调用
arriveAndDeregister()时触发)。
当Phaser终止后,所有同步方法立即返回,不会阻塞。isTerminated()可检查终止状态。
Phaser phaser = new Phaser(2);
new Thread(() -> {
phaser.arriveAndDeregister();
System.out.println("线程1注销");
}).start();
new Thread(() -> {
phaser.arriveAndDeregister();
System.out.println("线程2注销");
}).start();
Thread.sleep(1000);
System.out.println("Phaser终止: " + phaser.isTerminated()); // 输出 true
步骤4:重写onAdvance方法实现阶段回调
你可以继承Phaser并重写onAdvance(int phase, int registeredParties)方法,在每个阶段推进时执行自定义逻辑。该方法返回true表示Phaser应该终止,否则继续。
class CustomPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("阶段 " + phase + " 完成,参与者数: " + registeredParties);
return registeredParties == 0; // 参与者为0时终止
}
}
使用示例:
CustomPhaser phaser = new CustomPhaser();
phaser.bulkRegister(2);
new Thread(() -> {
phaser.arriveAndAwaitAdvance();
}).start();
new Thread(() -> {
phaser.arriveAndAwaitAdvance();
}).start();
输出:阶段0完成,参与者数: 2
步骤5:复杂场景示例(多阶段任务协作)
假设有3个线程需要协作完成3个阶段的任务,每个阶段完成后都需要同步。
Phaser phaser = new Phaser(3);
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
for (int phase = 0; phase < 3; phase++) {
System.out.println("线程" + threadId + " 完成阶段" + phase);
phaser.arriveAndAwaitAdvance(); // 等待其他线程
}
}).start();
}
执行流程:
- 阶段0:3个线程各自完成阶段0,然后同步。
- 阶段1:所有线程到达后自动进入阶段1,重复过程。
- 阶段2:完成后Phaser终止。
关键点总结
- 灵活性:Phaser支持动态调整参与者数量,适用于线程数变化的任务(如分治任务)。
- 多阶段同步:适用于需要重复多轮同步的场景(如并行迭代计算)。
- 性能:Phaser内部使用CAS操作,通常比CyclicBarrier性能更好。
- 终止机制:通过参与者数归零或强制终止来结束同步。
常见面试问题
- Phaser与CountDownLatch、CyclicBarrier的区别?
答:CountDownLatch一次性使用,CyclicBarrier固定线程数可重复,Phaser支持动态调整线程数和多阶段。 - 什么场景适合使用Phaser?
答:分阶段并行任务(如遗传算法、并行数据处理),且线程数可能变化的情况。 - 如何防止Phaser提前终止?
答:确保在适当的时间点注册参与者,避免所有线程过早注销。