线程协作

1、线程之间的协作

可以通过多个任务去解决某一个问题,这便是线程之间的协作。这里重在强调的便是线程和线程之间的协调,并不是彼此之间的冲突。有点类似于一个项目工程,有前置条件和后置条件,比如只有在前置条件满足的情况下,后置条件才可以触发,比如一个厨师做菜,如果简化一下,可以分出几个小的任务,比如有

  • 买菜
  • 摘菜
  • 洗菜
  • 热锅
  • 炒菜
  • 洗米
  • 蒸米
  • 盛米
  • 盛菜

可以把做饭分为以上就个小的任务,以上就有前置条件只说,比如炒菜必须是在热锅和洗菜结束之后才可以进行的,但是洗菜和热锅却是可以同步进行的,这便是前置条件,想要几个小的任务协作的有条理,必须我们开发人员巧妙的设计才可以实现,上述的这9件小任务,我们开理一下,不涉及代码,只是设计一个流程。具体流程如下所示 :

买菜
 |
摘菜
 |
洗菜    热锅    洗米
 |              |
炒菜           蒸米
 |              |
盛菜             |
                |
               盛米

大体就是这样的,那么我们应该如何控制线程的等待,开始呢?通过线程提供的wait()方法和notifAll()方法。

2、wait()与notifyAll()

wait方法的含义便是等待的意思,这里的意思便是由当前线程进行等待,也就是让当前线程挂起的意思,比如一开始的时候我们便要让洗菜任务挂起,因为此时买菜任务和摘菜任务都没执行完毕,所以洗菜任务不能开始。如果没有wait方法,我们是不是就要设置一个boolean,在摘菜任务结束的时候将boolean设置为true,然后让洗菜任务在一个死循环当中不断的进行boolean判断的轮循呢?好像也只能是这个样子了,这种情况被称为“忙等待”,而忙等待是一种不良的CPU周期使用方式。不提倡使用。这里需要注意的一点的是,wait使当前的线程挂起,同时也会释放线程的锁。如果我们调用wait方法的时候,其实就是在告诉线程调度器,告诉线程调度器“我已经刚刚做完了能做的所有事情,因此我需要等待一会,但是我希望其他的synchorized操作在条件适当的情况下可以执行”

有两种调用wait的方法,一个是通过指定相应的毫秒数值,有点类似于sleep,一个是没有参数,则无限制等待下去。直到唤醒。

notifAll和notify方法的含义便是叫醒正处于等待途中的线程,比如上述例子当中的洗菜任务一开始由于条件不满足,使得洗菜任务挂起,随着时间的推移,最后摘菜任务已经结束,这个时候需要洗菜任务进行执行,但是现在洗菜任务处于挂起状态,这个时候遍可以使用notify来叫醒洗菜任务,让其开始执行。

最后需要注意一点的就是,调用wait、notifAll、notifity都必须是在同步块当中,如果不是同步块当中会出现相应的错误。下面我们来一个简单的例子,来学习一下,我们应该如何使用上述所说的三种方法。

2.1 、线程协作

给汽车进行美容,我们需要给车进行打蜡,然后清洗,清洗任务必须是在打蜡任务以后才可以进行,否则运行不了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 线程当中的协作,汽车打蜡-清洗Demo
* @author pengchengliu
*
*/
public class WaxOMatic {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOn(car));
exec.execute(new Buff(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Car {
private boolean isWaxFinish = false ;

public synchronized void waxOn () {
isWaxFinish = true ;
}

public synchronized void buffed () {
isWaxFinish = false ;
notifyAll();
}

public synchronized void waitForWaxing () throws InterruptedException {
while (!isWaxFinish) {
wait();
}
}

public synchronized void waitForBuffing () throws InterruptedException {
while (isWaxFinish) {
wait();
}
}
}
class WaxOn implements Runnable {
private Car car ;

public WaxOn (Car car) {
this.car = car ;
}

@Override
public void run() {
try {
while (!Thread.interrupted()){
System.out.println("Wax on!");
TimeUnit.MILLISECONDS.sleep(200);
car.waxOn();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("exit via interrupt");
}
System.out.println("ending wax On task");
}
}
class Buff implements Runnable{
private Car car ;

public Buff (Car car) {
this.car = car ;
}

@Override
public void run() {
try{
while (!Thread.interrupted()){
car.waitForWaxing();
System.out.println("Wax Off");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e){
System.out.println("exit via interrupt");
}
System.out.println("ending Wax off");
}

}
Wax on!
Wax Off
Wax on!
Wax Off
Wax on!
Wax Off
Wax on!
Wax Off
Wax on!
Wax Off
Wax on!

2.2、错失的信号量

上述例子当中就是典型的线程之间的协作问题,关于线程之间的协作问题。但是在线程协作的问题当中存在一个很严重的问题,那便是信号错失,如何造成信号的错失呢,我们知道线程的执行时没有任何规律可寻的,如果notyif运行在wait之前呢? 很明显这便是死锁,很严重的问题,下面我们来看一个示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
T1
synchorized(lockBoject){
someCondition = false ;
lockBoject.notify();
}


T2
someCondition = true
while(someCondition){
//setup01
synchorized(lockBoject){
wait();
}
}

上述例子当中,我们可以看出,应该是由T1去唤醒T2的操作,正常的流程便是,T2执行完毕,处于等待状态,然后T1开始执行,然后唤醒T2,但是线程的执行顺序是不可控的,现在我们来假设一种情况便是,T2开始执行,但是执行到setup01的时候,线程调度器开始调度到T1,然后T1执行完毕,完毕以后回来执行T1,我们发现T2将会无限制的挂起,最后导致死锁。

如何解决上述的问题呢,其实很简单,就是控制someCondition变量上面的竞争条件。从而达到不让信号错失的情况产生,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
T1
synchorized(lockBoject){
someCondition = false ;
lockBoject.notify();
}


T2
someCondition = true ;
//setup01
synchorized(lockBoject){
while(someCondition){
wait();
}
}

我们可以看到使用同步块将while包裹住,这样来避免错失的信号量,现在我们再次模拟一遍上述的特殊情况,到setup的时候切换线程,然后再回来,我们发现while的循环压根进不去,这样也就避免的错失信号量了。

2.3、notify和notifyAll的区别。

这两个方法都是把一个线程从等待状态唤醒,而notify是notifyAll的一种优化,正如它们的名字一样,一个是单纯的唤醒一个,而另外的一个是唤醒全部,notify是在众多的等待同一个锁的任务当中只有一个被唤醒,因此如果你希望使用notify,就必须保证被唤醒的是恰当的任务。另外如果使用了notify,就必须当前所有的等待任务都是在等待同一个条件,如果是不同的任务,那么我们就不知道唤醒的是否是恰当的任务。如果使用notify,当条件变化的时候,必须只有一个任务从当中收益。最后,这些限制对所有可能存在的子类都必须总是起作用的。如果这些规则当中有任何一条不满足。我们就必须使用All而不是notifty。

锁池和等待池

  • 锁池:假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。

  • 等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后,进入到了该对象的等待池中

notify和notifyAll的区别

  • 如果线程调用了对象的 wait()方法,那么线程便会处于该对象的等待池中,等待池中的线程不会去竞争该对象的锁。

  • 当有线程调用了对象的 notifyAll()方法(唤醒所有 wait 线程)或 notify()方法(只随机唤醒一个 wait 线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。也就是说,调用了notify后只要一个线程会由等待池进入锁池,而notifyAll会将该对象等待池内的所有线程移动到锁池中,等待锁竞争

  • 优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中,唯有线程再次调用 wait()方法,它才会重新回到等待池中。而竞争到对象锁的线程则继续往下执行,直到执行完了 synchronized 代码块,它会释放掉该对象锁,这时锁池中的线程会继续竞争该对象锁。

综上,所谓唤醒线程,另一种解释可以说是将线程由等待池移动到锁池,notifyAll调用后,会将全部线程由等待池移到锁池,然后参与锁的竞争,竞争成功则继续执行,如果不成功则留在锁池等待锁被释放后再次参与竞争。而notify只会唤醒一个线程。有了这些理论基础,后面的notify可能会导致死锁,而notifyAll则不会的例子也就好解释了

注意:notifyAll因某一个特定的锁被调用的时候,只有等待这个锁的任务才会被唤醒。

下面来看一个Demo来验证这一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.suansuan.cooperation;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 对比Notify和NotifyAll
* notify用得不好容易导致死锁.
* @author pengchengliu
*
*/
public class NotifyVsNotifyAll {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++){
executorService.execute(new Task01());
}
executorService.execute(new Task02());

Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask(){
boolean prod = true ;
public void run() {
if (prod) {
System.out.println("notify()");
Task01.blocker.prod();
prod = false ;
} else {
System.out.println("notifyAll()");
Task01.blocker.prodAll();
prod = true ;
}
}
}, 400, 400);


TimeUnit.SECONDS.sleep(2);
timer.cancel();
System.out.println("timer is cancel");

TimeUnit.MILLISECONDS.sleep(500);
System.out.println("Task02 blocker prodAll()");
Task02.blocker.prodAll();

TimeUnit.MILLISECONDS.sleep(500);
System.out.println("shutting down");
executorService.shutdownNow();
}
}
class Blocker {
synchronized void waitingCall () {
try{
while (!Thread.interrupted()) {
wait() ;
System.out.println(Thread.currentThread() + "");
}
} catch (InterruptedException e) {}
}

synchronized void prod () {
notify();
}

synchronized void prodAll () {
notifyAll();
}
}
class Task01 implements Runnable {
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
class Task02 implements Runnable {
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall();
}
}

notify()
Thread[pool-1-thread-1,5,main]
notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify()
Thread[pool-1-thread-1,5,main]
notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
timer is cancel
Task02 blocker prodAll()
Thread[pool-1-thread-6,5,main]
shutting down

上述例子展示了什么这两个方法的区别。现在举一个例子为什么说notify使用不恰当会引起死锁:比如,你是你家挣钱的,儿子和女儿是花钱的。儿子给家里要100,女儿要30。可是家里没钱,他们只能等。后来你出去打工,赚钱了,赚了50,这时你要在儿子和女儿之间选择一个人叫醒。如果不凑巧,你把儿子叫醒了,儿子发现钱还是不够,又去等。因为你只能叫一次,女儿就错过了使用这50块钱的机会。所以,你决定把所有的人都叫醒,虽然费劲一点。这样一来,儿子发现不够,接着等,女儿发现够了,就用了。

3、生产者和消费者

本例子从最基础的生产者和消费者开始讲起,不会使用什么共享队列,这个后面会有说。我们先来看看生产者和消费者分别代表着什么,

有这样一家饭店,有一个厨师,有一个服务员,当厨师做好饭菜以后,由服务员来将饭菜送到客人面前,没有准备好时,服务员等待,准备好时通知服务员,服务员端菜,送完以后,周而复始。

分析:厨师代表生产者,服务员代表消费者,两个任务必须在饭菜准备好时进行握手,而系统必须有序的进行关闭。下面为演示Demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

package com.suansuan.cooperation;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 最原始的生产者-消费者模型,不具有相应的共享队列
* @author pengchengliu
*
*/
public class Restaurant {
public Meal meal ;
public Consumer consumer = new Consumer(this);
public Chef chef = new Chef(this);
public ExecutorService exec = Executors.newCachedThreadPool();

public Restaurant(){
exec.execute(chef);
exec.execute(consumer);
}

public static void main(String[] args) {
new Restaurant();
}
}
/**
* 饭菜,也就是我们需要在两个协同任务当中传递的东西
* @author pengchengliu
*
*/
class Meal {
private final int orderNum ; //订购的数量
public Meal (int orderNumber) {
this.orderNum = orderNumber ;
}
public String toString () {
return "Meal " + this.orderNum ;
}
}
/**
* 厨师,生产者,
* 注意:生产者的唤醒服务员的时机, 还有生产等待的时机
* @author pengchengliu
*
*/
class Chef implements Runnable {
private Restaurant restaurant ;
private int count ;
public Chef (Restaurant restaurant) {
this.restaurant = restaurant ;
}
//run方法当中,要确保正确的关闭方式。
public void run () {
try{
while (!Thread.interrupted()) {

// 锁住本身,服务员遍可以通过这个锁来唤醒这个厨师。
synchronized (this) {
while (restaurant.meal != null){
wait();
}
}

if(++count == 10) {
System.out.println("Out of food close");
restaurant.exec.shutdownNow();
}

// 因为服务员就是在这个锁上面等待的,所以我们获取服务员的锁,来唤醒所有等待服务员锁的对象
synchronized (restaurant.consumer) {
restaurant.meal = new Meal(count);
System.out.println("Chef output " + restaurant.meal);
restaurant.consumer.notifyAll();
}

TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.out.println("Chef interrupt");
}
}
}
/**
* 服务员-消费者
* 注意:等待的条件,唤醒条件,以及唤醒的时候拿到的锁。
* @author pengchengliu
*
*/
class Consumer implements Runnable {
private Restaurant restaurant ;
public Consumer(Restaurant restaurant) {
this.restaurant = restaurant ;
}
public void run (){
try{
while (!Thread.interrupted()){
synchronized (this) {
while (restaurant.meal == null) {
wait();
}
}

System.out.println("Consumer input " + restaurant.meal);

synchronized (restaurant.chef) {
restaurant.meal = null ;
restaurant.chef.notifyAll();
}
}
} catch (Exception e) {
System.out.println("consumer interrupt");
}
}
}
Chef output Meal 1
Consumer input Meal 1
Chef output Meal 2
Consumer input Meal 2
Chef output Meal 3
Consumer input Meal 3
Chef output Meal 4
Consumer input Meal 4
Chef output Meal 5
Consumer input Meal 5
Chef output Meal 6
Consumer input Meal 6
Chef output Meal 7
Consumer input Meal 7
Chef output Meal 8
Consumer input Meal 8
Chef output Meal 9
Consumer input Meal 9
Out of food close
consumer interrupt
Chef output Meal 10
Chef interrupt

4、显示的Lock与Condition对象

我们可以通过显示的使用Lock与Condition对象来控制线程之间的协作,Condition这个类的作用主要就是使用锁可以使当前的任务挂起,也可以唤醒挂起的任务,或者唤醒所有的任务,使用await来挂起一个任务,使用signal来唤醒任务,还有通过signalAll来唤醒所有的任务,

注意:signalAll要比notifyAll更加的安全。下面我们通过本章节刚刚开始的汽车美容的例子,来使用一下这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

/**
* 线程当中的协作,汽车打蜡-清洗Demo 使用显示的Lock和Condition来控制
* @author pengchengliu
*
*/
public class WaxOMaticLockCondition {
public static void main(String[] args) throws InterruptedException {
Cars car = new Cars();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Waxon2(car));
exec.execute(new Buffed(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Buffed implements Runnable {
private Cars car ;
public Buffed (Cars car) {
this.car = car ;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("Buffed on");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
car.waitForWaxing();
}
} catch (InterruptedException e) {
System.out.println("exit via interrupt");
}
System.out.println("ending buffed on task");
}
}
class Waxon2 implements Runnable {
private Cars car ;
public Waxon2 (Cars car) {
this.car = car ;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("Wax on");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffed();
}
} catch (InterruptedException e) {
System.out.println("exit via interrupt");
}
System.out.println("ending wax on task");
}
}
/**
* 共享资源类
* @author pengchengliu
*
*/
class Cars {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean isWaxFinish = false ;

/**
* 使用try的原因就是怕在显示的锁 不能被释放,所以这么写,确保锁可以被正确的释放掉。
*/
public void waxed () {
lock.lock();
try {
isWaxFinish = true ;
condition.signalAll();
} finally {
lock.unlock();
}
}

public void buffed () {
lock.lock();
try{
isWaxFinish = false ;
condition.signalAll();
} finally {
lock.unlock();
}
}

public void waitForWaxing () throws InterruptedException {
lock.lock();
try{
while (!isWaxFinish) {
condition.await();
}
} finally {
lock.unlock();
}
}

public void waitForBuffed () throws InterruptedException {
lock.lock();
try{
while (isWaxFinish) {
condition.await();
}
} finally {
lock.unlock();
}
}
}
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
Wax on
Buffed on
exit via interrupt
ending wax on task
exit via interrupt
ending buffed on task

5、共享队列

上述生产者与消费者以一种非常低级的方式解决了任务互相操作的问题,及每次交互时都握手,我们可以使用更加高级的方式去处理这种问题,及使用同步队列来解决任务协作的问题,同步队列的特点就是任何时刻只允许一个任务进行插入或者移除元素。

JDK为我们提供了各种各样的同步队列,它们这些同步队列统一实现了BlockingQueue接口。比较常用的有两个队列,分别是

  • LinkedBlockingQueue:使用链表实现,没有固定大小,是一个无界队列
  • ArrayNBlockingQueue:使用顺序表实现,有固定的大小。

消费者试图在队列当中获取元素,如果获取不到元素,即队列为空的时候,挂起消费者任务的驱动线程,并且当队列当中存在元素的时候,及可以被消费者获取的时候,唤醒消费者任务的驱动线程。这个被称之为阻塞队列。组素队列可以解决大量的问题,而其方式与wait()与notiftyAll相比,阻塞队列简单的多。

5.1、共享队列使用

下面我们来看一个例子程序,有一个阻塞队列,将多个LiftOff对象串行执行,消费者为LiftOffRunner,消费者的任务便是将每一个LiftOff对象从阻塞队列中取出并且运行,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/**
* 探究共享队列:
* 有一个阻塞队列,将多个LiftOff对象串行执行,消费者为LiftOffRunner,消费者的任务便是将每一个LiftOff对象从阻塞队列中取出并且运行
* @author pengchengliu
*
*/
public class BlockingQueues {
static void getKey () {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (Exception e) {
throw new RuntimeException(e) ;
}
}

static void getKey (String message) {
System.out.println(message);
getKey();
}

static void test (String message, BlockingQueue<LiftOff> queue) {
System.out.println(message);
LiftOffRunner liftOffRunner = new LiftOffRunner(queue);
Thread thread = new Thread(liftOffRunner);
thread.start();

for (int i = 0; i < 5; i++) {
liftOffRunner.add(new LiftOff(5));
}

getKey("press 'Entry' (" + message + ")" );
thread.interrupt();
System.out.println("Finished " + message + " test");
}

public static void main(String[] args) {
test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());
test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));
test("SynchronousQueue", new SynchronousQueue<LiftOff>());
}
}
class LiftOffRunner implements Runnable {

private BlockingQueue<LiftOff> rockets ;
public LiftOffRunner(BlockingQueue<LiftOff> rocket) {
this.rockets = rocket ;
}
@Override
public void run() {
try {
while (! Thread.interrupted()) {
LiftOff take = rockets.take();
take.run();
}
} catch (InterruptedException e) {
System.out.println("waking form take");
}
System.out.println("exiting LiftOffRunner");
}

public void add (LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
class LiftOff implements Runnable{

protected int countDown = 10 ;
private static int taskCount = 0;
private final int id = taskCount ++ ;

public LiftOff(){}
public LiftOff(int countDown){
this.countDown = countDown ;
}

public String status(){
return "#" + this.id + "(" + (countDown > 0 ? countDown : "Liftoff!" ) + ").";
}
@Override
public void run() {
while(countDown-- > 0){
System.out.println(status());
Thread.yield();
}
}

}
LinkedBlockingQueue
press 'Entry' (LinkedBlockingQueue)
#0(4).
#0(3).
#0(2).
#0(1).
#0(Liftoff!).
#1(4).
#1(3).
#1(2).
#1(1).
#1(Liftoff!).
#2(4).
#2(3).
#2(2).
#2(1).
#2(Liftoff!).
#3(4).
#3(3).
#3(2).
#3(1).
#3(Liftoff!).
#4(4).
#4(3).
#4(2).
#4(1).
#4(Liftoff!).
Finished LinkedBlockingQueue test
waking form take
exiting LiftOffRunner
ArrayBlockingQueue
#5(4).
#5(3).
#5(2).
#5(1).
#5(Liftoff!).
#6(4).
press 'Entry' (ArrayBlockingQueue)
#6(3).
#6(2).
#6(1).
#6(Liftoff!).
#7(4).
#7(3).
#7(2).
#7(1).
#7(Liftoff!).
#8(4).
#8(3).
#8(2).
#8(1).
#8(Liftoff!).
#9(4).
#9(3).
#9(2).
#9(1).
#9(Liftoff!).
Finished ArrayBlockingQueue test
waking form take
exiting LiftOffRunner
SynchronousQueue
#10(4).
#10(3).
#10(2).
#10(1).
#10(Liftoff!).
#11(4).
#11(3).
#11(2).
#11(1).
#11(Liftoff!).
#12(4).
#12(3).
#12(2).
#12(1).
#12(Liftoff!).
#13(4).
#13(3).
#13(2).
#13(1).
#13(Liftoff!).
#14(4).
press 'Entry' (SynchronousQueue)
#14(3).
#14(2).
#14(1).
#14(Liftoff!).

各个任务由main放置到了BlockingQueue当中,并且由LiftOffRunner从BlockingQueue当中取出来执行(当取出来执行的时候使用的run方法不是重新开线程),注意:LiftOffRunner可以忽略同步问题,因为他们已经由BlockingQueue解决了。

5.2、吐司BlockingQueue

谈及共享队列,那么就不可能不谈及吐司例子,这个是共享队列当中比较有名的例子,下面来描述一下这个例子。

有一台机器一共具有三个任务,分别是制作吐司,一个是给吐司上黄油、另一个便是给上了黄油的吐司上果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作的程序。

下面使用程序来模拟上述制作吐司的过程。具体实现如下所示 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/**
* ToastBlockingQueue,
* 有一台机器一共具有三个任务,分别是
* 1、制作吐司,
* 2、给吐司上黄油、
* 3、给上了黄油的吐司上果酱。
*
* 我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作的程序。
* @author pengchengliu
*
*/
public class ToastOMatic {

/**
* 开启所有的制作Toast的相关任务,然后运行5秒钟,结束所有的任务
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue(),
butterQueue = new ToastQueue(),
finishQueue = new ToastQueue();

ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butter(dryQueue, butterQueue));
exec.execute(new Jammer(butterQueue, finishQueue));
exec.execute(new Eater(finishQueue));

TimeUnit.SECONDS.sleep(5);

exec.shutdownNow();
}
}
/**
* Toast本身,也就是我们的资源共享类,
* 思考:为什么关于资源类的状态改变没有加锁,这个类被三个任务所操作,不加锁安全么?
* @author pengchengliu
*
*/
class Toast {
public enum Status {
DRY,BUTTERED,JAMMED
}

private Status status = Status.DRY ;
private final int id ;

public Toast (int id) {
this.id = id ;
}

public void butter () {
this.status = Status.BUTTERED;
}

public void jam () {
this.status = Status.JAMMED ;
}

public Status getStatus () {
return this.status ;
}

public int getId () {
return this.id ;
}

public String toString () {
return "Toast " + id + " : " + status ;
}

public void recyleToast () {
this.status = null ;
}
}
/**
* 使用定制的相关队列,如果有特殊的需求,通过继承LinkedBlockingQueue来做自定义
* 遵守单一职责的原理,应该定制单一的属于队列的相关需求,
* 这里只是演示Demo,所以没有做任何定制服务。
* @author pengchengliu
*
*/
class ToastQueue extends LinkedBlockingQueue<Toast> {}
/**
* 第一步,制作Toast,将制作出来的Toast至于共享队列当中。
* 注意:只有一个共享队列
* @author pengchengliu
*
*/
class Toaster implements Runnable {

private ToastQueue toastQueue ;
private int count ;
private Random rand = new Random(47);

public Toaster (ToastQueue toastQueue) {
this.toastQueue = toastQueue ;
}

@Override
public void run() {
try {
while (! Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
Toast toast = new Toast(count ++) ;
System.out.println(toast);
toastQueue.add(toast);
}
} catch (InterruptedException e) {
System.out.println("Toaster interrupt");
}
System.out.println("Toaster off");
}
}
/**
* 第二步,上黄油,
* 注意:需要两个队列,一个是输入队列,一个是输出队列
* @author pengchengliu
*
*/
class Butter implements Runnable {

private ToastQueue dryQueue, butterQueue ;

public Butter (ToastQueue dryQueue, ToastQueue butterQueue) {
this.dryQueue = dryQueue ;
this.butterQueue = butterQueue ;
}

@Override
public void run() {
try {
while (! Thread.interrupted()) {
Toast take = dryQueue.take();
take.butter();
System.out.println(take);
butterQueue.add(take);
}
} catch (InterruptedException e) {
System.out.println("Butter interrupt");
}
System.out.println("Butter off");
}
}
/**
* 第三步:上果酱,基本操作与步骤二相似
* @author pengchengliu
*
*/
class Jammer implements Runnable {

private ToastQueue butterQueue, finishedQueue ;

public Jammer (ToastQueue butterQueue, ToastQueue finishedQueue) {
this.butterQueue = butterQueue ;
this.finishedQueue = finishedQueue ;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast take = butterQueue.take();
take.jam();
System.out.println(take);
finishedQueue.add(take);
}
} catch (InterruptedException e) {
System.out.println("Jammer interrupt");
}
System.out.println("Jammer off");
}
}
/**
* 最后的消费者
* @author pengchengliu
*
*/
class Eater implements Runnable {

private ToastQueue finishedQueue ;
private int counter = 0 ;

public Eater (ToastQueue finishedQueue) {
this.finishedQueue = finishedQueue ;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast take = finishedQueue.take();
if (take.getId() != counter ++ || take.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>> Error: " + take);
System.exit(1);
} else {
System.out.println("chomp! " + take);
take.recyleToast();
take = null;
}
}
} catch (InterruptedException e) {
System.out.println("Eater interrupt");
}
System.out.println("Eater off");
}
}
Toast 0 : DRY
Toast 0 : BUTTERED
Toast 0 : JAMMED
chomp! Toast 0 : JAMMED
Toast 1 : DRY
Toast 1 : BUTTERED
Toast 1 : JAMMED
chomp! Toast 1 : JAMMED
Toast 2 : DRY
Toast 2 : BUTTERED
Toast 2 : JAMMED
chomp! Toast 2 : JAMMED
Toast 3 : DRY
Toast 3 : BUTTERED
Toast 3 : JAMMED
chomp! Toast 3 : JAMMED
Toast 4 : DRY
Toast 4 : BUTTERED
Toast 4 : JAMMED
chomp! Toast 4 : JAMMED
Toast 5 : DRY
Toast 5 : BUTTERED
Toast 5 : JAMMED
chomp! Toast 5 : JAMMED
Toast 6 : DRY
Toast 6 : BUTTERED
Toast 6 : JAMMED
chomp! Toast 6 : JAMMED
Toast 7 : DRY
Toast 7 : BUTTERED
Toast 7 : JAMMED
chomp! Toast 7 : JAMMED
Toast 8 : DRY
Toast 8 : BUTTERED
Toast 8 : JAMMED
chomp! Toast 8 : JAMMED
Toast 9 : DRY
Toast 9 : BUTTERED
Toast 9 : JAMMED
chomp! Toast 9 : JAMMED
Toast 10 : DRY
Toast 10 : BUTTERED
Toast 10 : JAMMED
chomp! Toast 10 : JAMMED
Toast 11 : DRY
Toast 11 : BUTTERED
Toast 11 : JAMMED
chomp! Toast 11 : JAMMED
Toast 12 : DRY
Toast 12 : BUTTERED
Toast 12 : JAMMED
chomp! Toast 12 : JAMMED
Toast 13 : DRY
Toast 13 : BUTTERED
Toast 13 : JAMMED
chomp! Toast 13 : JAMMED
Toast 14 : DRY
Toast 14 : BUTTERED
Toast 14 : JAMMED
chomp! Toast 14 : JAMMED
Toaster interrupt
Toaster off
Butter interrupt
Butter off
Eater interrupt
Eater off
Jammer interrupt
Jammer off

通过上述的例子,我们得出两点我们需要学习的地方,

  • 通过一个Enum类型来控制共享资源当中的状态变化
  • 通过BlockingQueue来实现出来无锁的同步代码

上述我们说过一个共享队列任何时刻都只允许一个任务插入或者移除元素,通过这点,我们根本不需要给Toast这个共享资源类进行相关的加锁逻辑。

6、管道信号

如果我们把相应的队列换成一个管道,通过输入/输出在线程间进行通信。其实这种想法是在Sun公司没有引入BlockingQueue队列之前的一种方式。同样可以解决大部分的线程协作问题。

关于管道这里主要有两种管道,分别是PipedWriter(允许任务向相关管道当中写),PiepedReader(允许不同的任务从同一个管道当中读取)。管道基本上是一个阻塞队列,所以我们同样也不需要考虑相关加锁问题。下面我们通过一个Demo来学习一下这个吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 探究Piped 管道 控制任务间的协作
* @author pengchengliu
*
*/
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);

ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);

TimeUnit.SECONDS.sleep(4);
exec.shutdownNow() ;
}
}
/**
* 发送任务
* @author pengchengliu
*
*/
class Sender implements Runnable {

private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();

public PipedWriter getPepedWriter() {
return this.out ;
}

@Override
public void run() {
try {
while (true) {
for (char c = 'A'; c < 'Z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}
} catch (IOException e) {
System.out.println(e + " Sender writer execption");
} catch (InterruptedException e) {
System.out.println("Sender interrput");
}
}
}
/**
* 接收任务
* @author pengchengliu
*
*/
class Receiver implements Runnable {

private PipedReader reader ;

public Receiver(Sender sender) throws IOException {
reader = new PipedReader(sender.getPepedWriter());
}

@Override
public void run() {
try {
while (true) {
System.out.println("Read : " + (char) reader.read() + ".");
}
} catch (IOException e) {
System.out.println(e + "Receiver read exception");
}
}

}
Read : A.
Read : B.
Read : C.
Read : D.
Read : E.
Read : F.
Read : G.
Read : H.
Read : I.
Read : J.
Read : K.
Read : L.
Read : M.
Sender interrput
java.io.InterruptedIOExceptionReceiver read exception