1、线程之间的协作 可以通过多个任务去解决某一个问题,这便是线程之间的协作。这里重在强调的便是线程和线程之间的协调,并不是彼此之间的冲突。有点类似于一个项目工程,有前置条件和后置条件,比如只有在前置条件满足的情况下,后置条件才可以触发,比如一个厨师做菜,如果简化一下,可以分出几个小的任务,比如有
买菜
摘菜
洗菜
热锅
炒菜
洗米
蒸米
盛米
盛菜
可以把做饭分为以上就个小的任务,以上就有前置条件只说,比如炒菜必须是在热锅和洗菜结束之后才可以进行的,但是洗菜和热锅却是可以同步进行的,这便是前置条件,想要几个小的任务协作的有条理,必须我们开发人员巧妙的设计才可以实现,上述的这9件小任务,我们开理一下,不涉及代码,只是设计一个流程。具体流程如下所示 :
买菜
|
摘菜
|
洗菜 热锅 洗米
| |
炒菜 蒸米
| |
盛菜 |
|
盛米
大体就是这样的,那么我们应该如何控制线程的等待,开始呢?通过线程提供的wait()方法和notifAll()方法。
2、wait()与notifyAll() wait方法的含义便是等待的意思,这里的意思便是由当前线程进行等待,也就是让当前线程挂起的意思,比如一开始的时候我们便要让洗菜任务挂起,因为此时买菜任务和摘菜任务都没执行完毕,所以洗菜任务不能开始。如果没有wait方法,我们是不是就要设置一个boolean,在摘菜任务结束的时候将boolean设置为true,然后让洗菜任务在一个死循环当中不断的进行boolean判断的轮循呢?好像也只能是这个样子了,这种情况被称为“忙等待”,而忙等待是一种不良的CPU周期使用方式。不提倡使用。这里需要注意的一点的是,wait使当前的线程挂起,同时也会释放线程的锁。如果我们调用wait方法的时候,其实就是在告诉线程调度器,告诉线程调度器“我已经刚刚做完了能做的所有事情,因此我需要等待一会,但是我希望其他的synchorized操作在条件适当的情况下可以执行”
有两种调用wait的方法,一个是通过指定相应的毫秒数值,有点类似于sleep,一个是没有参数,则无限制等待下去。直到唤醒。
notifAll和notify方法的含义便是叫醒正处于等待途中的线程,比如上述例子当中的洗菜任务一开始由于条件不满足,使得洗菜任务挂起,随着时间的推移,最后摘菜任务已经结束,这个时候需要洗菜任务进行执行,但是现在洗菜任务处于挂起状态,这个时候遍可以使用notify来叫醒洗菜任务,让其开始执行。
最后需要注意一点的就是,调用wait、notifAll、notifity都必须是在同步块当中,如果不是同步块当中会出现相应的错误。下面我们来一个简单的例子,来学习一下,我们应该如何使用上述所说的三种方法。
2.1 、线程协作 给汽车进行美容,我们需要给车进行打蜡,然后清洗,清洗任务必须是在打蜡任务以后才可以进行,否则运行不了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public class WaxOMatic { public static void main (String[] args) throws InterruptedException { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOn(car)); exec.execute(new Buff(car)); TimeUnit.SECONDS.sleep(5 ); exec.shutdownNow(); } } class Car { private boolean isWaxFinish = false ; public synchronized void waxOn () { isWaxFinish = true ; } public synchronized void buffed () { isWaxFinish = false ; notifyAll(); } public synchronized void waitForWaxing () throws InterruptedException { while (!isWaxFinish) { wait(); } } public synchronized void waitForBuffing () throws InterruptedException { while (isWaxFinish) { wait(); } } } class WaxOn implements Runnable { private Car car ; public WaxOn (Car car) { this .car = car ; } @Override public void run () { try { while (!Thread.interrupted()){ System.out.println("Wax on!" ); TimeUnit.MILLISECONDS.sleep(200 ); car.waxOn(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("exit via interrupt" ); } System.out.println("ending wax On task" ); } } class Buff implements Runnable { private Car car ; public Buff (Car car) { this .car = car ; } @Override public void run () { try { while (!Thread.interrupted()){ car.waitForWaxing(); System.out.println("Wax Off" ); TimeUnit.MILLISECONDS.sleep(200 ); car.buffed(); } } catch (InterruptedException e){ System.out.println("exit via interrupt" ); } System.out.println("ending Wax off" ); } } Wax on! Wax Off Wax on! Wax Off Wax on! Wax Off Wax on! Wax Off Wax on! Wax Off Wax on!
2.2、错失的信号量 上述例子当中就是典型的线程之间的协作问题,关于线程之间的协作问题。但是在线程协作的问题当中存在一个很严重的问题,那便是信号错失,如何造成信号的错失呢,我们知道线程的执行时没有任何规律可寻的,如果notyif运行在wait之前呢? 很明显这便是死锁,很严重的问题,下面我们来看一个示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 T1 synchorized(lockBoject){ someCondition = false ; lockBoject.notify(); } T2 someCondition = true ; while (someCondition){ synchorized(lockBoject){ wait(); } }
上述例子当中,我们可以看出,应该是由T1去唤醒T2的操作,正常的流程便是,T2执行完毕,处于等待状态,然后T1开始执行,然后唤醒T2,但是线程的执行顺序是不可控的,现在我们来假设一种情况便是,T2开始执行,但是执行到setup01的时候,线程调度器开始调度到T1,然后T1执行完毕,完毕以后回来执行T1,我们发现T2将会无限制的挂起,最后导致死锁。
如何解决上述的问题呢,其实很简单,就是控制someCondition变量上面的竞争条件。从而达到不让信号错失的情况产生,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 T1 synchorized(lockBoject){ someCondition = false ; lockBoject.notify(); } T2 someCondition = true ; synchorized(lockBoject){ while (someCondition){ wait(); } }
我们可以看到使用同步块将while包裹住,这样来避免错失的信号量,现在我们再次模拟一遍上述的特殊情况,到setup的时候切换线程,然后再回来,我们发现while的循环压根进不去,这样也就避免的错失信号量了。
2.3、notify和notifyAll的区别。 这两个方法都是把一个线程从等待状态唤醒,而notify是notifyAll的一种优化,正如它们的名字一样,一个是单纯的唤醒一个,而另外的一个是唤醒全部,notify是在众多的等待同一个锁的任务当中只有一个被唤醒,因此如果你希望使用notify,就必须保证被唤醒的是恰当的任务。另外如果使用了notify,就必须当前所有的等待任务都是在等待同一个条件,如果是不同的任务,那么我们就不知道唤醒的是否是恰当的任务。如果使用notify,当条件变化的时候,必须只有一个任务从当中收益。最后,这些限制对所有可能存在的子类都必须总是起作用的。如果这些规则当中有任何一条不满足。我们就必须使用All而不是notifty。
锁池和等待池
锁池:假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。
等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后,进入到了该对象的等待池中
notify和notifyAll的区别
如果线程调用了对象的 wait()方法,那么线程便会处于该对象的等待池中,等待池中的线程不会去竞争该对象的锁。
当有线程调用了对象的 notifyAll()方法(唤醒所有 wait 线程)或 notify()方法(只随机唤醒一个 wait 线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。也就是说,调用了notify后只要一个线程会由等待池进入锁池,而notifyAll会将该对象等待池内的所有线程移动到锁池中,等待锁竞争
优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中,唯有线程再次调用 wait()方法,它才会重新回到等待池中。而竞争到对象锁的线程则继续往下执行,直到执行完了 synchronized 代码块,它会释放掉该对象锁,这时锁池中的线程会继续竞争该对象锁。
综上,所谓唤醒线程,另一种解释可以说是将线程由等待池移动到锁池,notifyAll调用后,会将全部线程由等待池移到锁池,然后参与锁的竞争,竞争成功则继续执行,如果不成功则留在锁池等待锁被释放后再次参与竞争。而notify只会唤醒一个线程。有了这些理论基础,后面的notify可能会导致死锁,而notifyAll则不会的例子也就好解释了
注意:notifyAll因某一个特定的锁被调用的时候,只有等待这个锁的任务才会被唤醒。
下面来看一个Demo来验证这一点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package com.suansuan.cooperation;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class NotifyVsNotifyAll { public static void main (String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ; i < 5 ; i++){ executorService.execute(new Task01()); } executorService.execute(new Task02()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask(){ boolean prod = true ; public void run () { if (prod) { System.out.println("notify()" ); Task01.blocker.prod(); prod = false ; } else { System.out.println("notifyAll()" ); Task01.blocker.prodAll(); prod = true ; } } }, 400 , 400 ); TimeUnit.SECONDS.sleep(2 ); timer.cancel(); System.out.println("timer is cancel" ); TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("Task02 blocker prodAll()" ); Task02.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("shutting down" ); executorService.shutdownNow(); } } class Blocker { synchronized void waitingCall () { try { while (!Thread.interrupted()) { wait() ; System.out.println(Thread.currentThread() + "" ); } } catch (InterruptedException e) {} } synchronized void prod () { notify(); } synchronized void prodAll () { notifyAll(); } } class Task01 implements Runnable { static Blocker blocker = new Blocker(); @Override public void run () { blocker.waitingCall(); } } class Task02 implements Runnable { static Blocker blocker = new Blocker(); public void run () { blocker.waitingCall(); } } notify() Thread[pool-1 -thread-1 ,5 ,main] notifyAll() Thread[pool-1 -thread-1 ,5 ,main] Thread[pool-1 -thread-5 ,5 ,main] Thread[pool-1 -thread-4 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Thread[pool-1 -thread-2 ,5 ,main] notify() Thread[pool-1 -thread-1 ,5 ,main] notifyAll() Thread[pool-1 -thread-1 ,5 ,main] Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Thread[pool-1 -thread-4 ,5 ,main] Thread[pool-1 -thread-5 ,5 ,main] timer is cancel Task02 blocker prodAll () Thread[pool-1-thread-6,5,main] shutting down
上述例子展示了什么这两个方法的区别。现在举一个例子为什么说notify使用不恰当会引起死锁:比如,你是你家挣钱的,儿子和女儿是花钱的。儿子给家里要100,女儿要30。可是家里没钱,他们只能等。后来你出去打工,赚钱了,赚了50,这时你要在儿子和女儿之间选择一个人叫醒。如果不凑巧,你把儿子叫醒了,儿子发现钱还是不够,又去等。因为你只能叫一次,女儿就错过了使用这50块钱的机会。所以,你决定把所有的人都叫醒,虽然费劲一点。这样一来,儿子发现不够,接着等,女儿发现够了,就用了。
3、生产者和消费者 本例子从最基础的生产者和消费者开始讲起,不会使用什么共享队列,这个后面会有说。我们先来看看生产者和消费者分别代表着什么,
有这样一家饭店,有一个厨师,有一个服务员,当厨师做好饭菜以后,由服务员来将饭菜送到客人面前,没有准备好时,服务员等待,准备好时通知服务员,服务员端菜,送完以后,周而复始。
分析:厨师代表生产者,服务员代表消费者,两个任务必须在饭菜准备好时进行握手,而系统必须有序的进行关闭。下面为演示Demo。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 package com.suansuan.cooperation;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class Restaurant { public Meal meal ; public Consumer consumer = new Consumer(this ); public Chef chef = new Chef(this ); public ExecutorService exec = Executors.newCachedThreadPool(); public Restaurant () { exec.execute(chef); exec.execute(consumer); } public static void main (String[] args) { new Restaurant(); } } class Meal { private final int orderNum ; public Meal (int orderNumber) { this .orderNum = orderNumber ; } public String toString () { return "Meal " + this .orderNum ; } } class Chef implements Runnable { private Restaurant restaurant ; private int count ; public Chef (Restaurant restaurant) { this .restaurant = restaurant ; } public void run () { try { while (!Thread.interrupted()) { synchronized (this ) { while (restaurant.meal != null ){ wait(); } } if (++count == 10 ) { System.out.println("Out of food close" ); restaurant.exec.shutdownNow(); } synchronized (restaurant.consumer) { restaurant.meal = new Meal(count); System.out.println("Chef output " + restaurant.meal); restaurant.consumer.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100 ); } } catch (Exception e) { System.out.println("Chef interrupt" ); } } } class Consumer implements Runnable { private Restaurant restaurant ; public Consumer (Restaurant restaurant) { this .restaurant = restaurant ; } public void run () { try { while (!Thread.interrupted()){ synchronized (this ) { while (restaurant.meal == null ) { wait(); } } System.out.println("Consumer input " + restaurant.meal); synchronized (restaurant.chef) { restaurant.meal = null ; restaurant.chef.notifyAll(); } } } catch (Exception e) { System.out.println("consumer interrupt" ); } } } Chef output Meal 1 Consumer input Meal 1 Chef output Meal 2 Consumer input Meal 2 Chef output Meal 3 Consumer input Meal 3 Chef output Meal 4 Consumer input Meal 4 Chef output Meal 5 Consumer input Meal 5 Chef output Meal 6 Consumer input Meal 6 Chef output Meal 7 Consumer input Meal 7 Chef output Meal 8 Consumer input Meal 8 Chef output Meal 9 Consumer input Meal 9 Out of food close consumer interrupt Chef output Meal 10 Chef interrupt
4、显示的Lock与Condition对象 我们可以通过显示的使用Lock与Condition对象来控制线程之间的协作,Condition这个类的作用主要就是使用锁可以使当前的任务挂起,也可以唤醒挂起的任务,或者唤醒所有的任务,使用await来挂起一个任务,使用signal来唤醒任务,还有通过signalAll来唤醒所有的任务,
注意:signalAll要比notifyAll更加的安全。下面我们通过本章节刚刚开始的汽车美容的例子,来使用一下这个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 public class WaxOMaticLockCondition { public static void main (String[] args) throws InterruptedException { Cars car = new Cars(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Waxon2(car)); exec.execute(new Buffed(car)); TimeUnit.SECONDS.sleep(5 ); exec.shutdownNow(); } } class Buffed implements Runnable { private Cars car ; public Buffed (Cars car) { this .car = car ; } @Override public void run () { try { while (!Thread.interrupted()) { System.out.println("Buffed on" ); TimeUnit.MILLISECONDS.sleep(200 ); car.buffed(); car.waitForWaxing(); } } catch (InterruptedException e) { System.out.println("exit via interrupt" ); } System.out.println("ending buffed on task" ); } } class Waxon2 implements Runnable { private Cars car ; public Waxon2 (Cars car) { this .car = car ; } @Override public void run () { try { while (!Thread.interrupted()) { System.out.println("Wax on" ); TimeUnit.MILLISECONDS.sleep(200 ); car.waxed(); car.waitForBuffed(); } } catch (InterruptedException e) { System.out.println("exit via interrupt" ); } System.out.println("ending wax on task" ); } } class Cars { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean isWaxFinish = false ; public void waxed () { lock.lock(); try { isWaxFinish = true ; condition.signalAll(); } finally { lock.unlock(); } } public void buffed () { lock.lock(); try { isWaxFinish = false ; condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing () throws InterruptedException { lock.lock(); try { while (!isWaxFinish) { condition.await(); } } finally { lock.unlock(); } } public void waitForBuffed () throws InterruptedException { lock.lock(); try { while (isWaxFinish) { condition.await(); } } finally { lock.unlock(); } } } Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on Wax on Buffed on exit via interrupt ending wax on task exit via interrupt ending buffed on task
5、共享队列 上述生产者与消费者以一种非常低级的方式解决了任务互相操作的问题,及每次交互时都握手,我们可以使用更加高级的方式去处理这种问题,及使用同步队列来解决任务协作的问题,同步队列的特点就是任何时刻只允许一个任务进行插入或者移除元素。
JDK为我们提供了各种各样的同步队列,它们这些同步队列统一实现了BlockingQueue接口。比较常用的有两个队列,分别是
LinkedBlockingQueue:使用链表实现,没有固定大小,是一个无界队列
ArrayNBlockingQueue:使用顺序表实现,有固定的大小。
消费者试图在队列当中获取元素,如果获取不到元素,即队列为空的时候,挂起消费者任务的驱动线程,并且当队列当中存在元素的时候,及可以被消费者获取的时候,唤醒消费者任务的驱动线程。这个被称之为阻塞队列。组素队列可以解决大量的问题,而其方式与wait()与notiftyAll相比,阻塞队列简单的多。
5.1、共享队列使用 下面我们来看一个例子程序,有一个阻塞队列,将多个LiftOff对象串行执行,消费者为LiftOffRunner,消费者的任务便是将每一个LiftOff对象从阻塞队列中取出并且运行,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 public class BlockingQueues { static void getKey () { try { new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (Exception e) { throw new RuntimeException(e) ; } } static void getKey (String message) { System.out.println(message); getKey(); } static void test (String message, BlockingQueue<LiftOff> queue) { System.out.println(message); LiftOffRunner liftOffRunner = new LiftOffRunner(queue); Thread thread = new Thread(liftOffRunner); thread.start(); for (int i = 0 ; i < 5 ; i++) { liftOffRunner.add(new LiftOff(5 )); } getKey("press 'Entry' (" + message + ")" ); thread.interrupt(); System.out.println("Finished " + message + " test" ); } public static void main (String[] args) { test("LinkedBlockingQueue" , new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue" , new ArrayBlockingQueue<LiftOff>(3 )); test("SynchronousQueue" , new SynchronousQueue<LiftOff>()); } } class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets ; public LiftOffRunner (BlockingQueue<LiftOff> rocket) { this .rockets = rocket ; } @Override public void run () { try { while (! Thread.interrupted()) { LiftOff take = rockets.take(); take.run(); } } catch (InterruptedException e) { System.out.println("waking form take" ); } System.out.println("exiting LiftOffRunner" ); } public void add (LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { e.printStackTrace(); } } } class LiftOff implements Runnable { protected int countDown = 10 ; private static int taskCount = 0 ; private final int id = taskCount ++ ; public LiftOff () {} public LiftOff (int countDown) { this .countDown = countDown ; } public String status () { return "#" + this .id + "(" + (countDown > 0 ? countDown : "Liftoff!" ) + ")." ; } @Override public void run () { while (countDown-- > 0 ){ System.out.println(status()); Thread.yield(); } } } LinkedBlockingQueue press 'Entry' (LinkedBlockingQueue) #0(4). #0(3). #0(2). #0(1). #0(Liftoff!). #1(4). #1(3). #1(2). #1(1). #1(Liftoff!). #2(4). #2(3). #2(2). #2(1). #2(Liftoff!). #3(4). #3(3). #3(2). #3(1). #3(Liftoff!). #4(4). #4(3). #4(2). #4(1). #4(Liftoff!). Finished LinkedBlockingQueue test waking form take exiting LiftOffRunner ArrayBlockingQueue #5(4). #5(3). #5(2). #5(1). #5(Liftoff!). #6(4). press 'Entry' (ArrayBlockingQueue) #6(3). #6(2). #6(1). #6(Liftoff!). #7(4). #7(3). #7(2). #7(1). #7(Liftoff!). #8(4). #8(3). #8(2). #8(1). #8(Liftoff!). #9(4). #9(3). #9(2). #9(1). #9(Liftoff!). Finished ArrayBlockingQueue test waking form take exiting LiftOffRunner SynchronousQueue #10(4). #10(3). #10(2). #10(1). #10(Liftoff!). #11(4). #11(3). #11(2). #11(1). #11(Liftoff!). #12(4). #12(3). #12(2). #12(1). #12(Liftoff!). #13(4). #13(3). #13(2). #13(1). #13(Liftoff!). #14(4). press 'Entry' (SynchronousQueue) #14(3). #14(2). #14(1). #14(Liftoff!).
各个任务由main放置到了BlockingQueue当中,并且由LiftOffRunner从BlockingQueue当中取出来执行(当取出来执行的时候使用的run方法不是重新开线程),注意:LiftOffRunner可以忽略同步问题,因为他们已经由BlockingQueue解决了。
5.2、吐司BlockingQueue 谈及共享队列,那么就不可能不谈及吐司例子,这个是共享队列当中比较有名的例子,下面来描述一下这个例子。
有一台机器一共具有三个任务,分别是制作吐司,一个是给吐司上黄油、另一个便是给上了黄油的吐司上果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作的程序。
下面使用程序来模拟上述制作吐司的过程。具体实现如下所示 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 public class ToastOMatic { public static void main (String[] args) throws InterruptedException { ToastQueue dryQueue = new ToastQueue(), butterQueue = new ToastQueue(), finishQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butter(dryQueue, butterQueue)); exec.execute(new Jammer(butterQueue, finishQueue)); exec.execute(new Eater(finishQueue)); TimeUnit.SECONDS.sleep(5 ); exec.shutdownNow(); } } class Toast { public enum Status { DRY,BUTTERED,JAMMED } private Status status = Status.DRY ; private final int id ; public Toast (int id) { this .id = id ; } public void butter () { this .status = Status.BUTTERED; } public void jam () { this .status = Status.JAMMED ; } public Status getStatus () { return this .status ; } public int getId () { return this .id ; } public String toString () { return "Toast " + id + " : " + status ; } public void recyleToast () { this .status = null ; } } class ToastQueue extends LinkedBlockingQueue <Toast > {}class Toaster implements Runnable { private ToastQueue toastQueue ; private int count ; private Random rand = new Random(47 ); public Toaster (ToastQueue toastQueue) { this .toastQueue = toastQueue ; } @Override public void run () { try { while (! Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500 )); Toast toast = new Toast(count ++) ; System.out.println(toast); toastQueue.add(toast); } } catch (InterruptedException e) { System.out.println("Toaster interrupt" ); } System.out.println("Toaster off" ); } } class Butter implements Runnable { private ToastQueue dryQueue, butterQueue ; public Butter (ToastQueue dryQueue, ToastQueue butterQueue) { this .dryQueue = dryQueue ; this .butterQueue = butterQueue ; } @Override public void run () { try { while (! Thread.interrupted()) { Toast take = dryQueue.take(); take.butter(); System.out.println(take); butterQueue.add(take); } } catch (InterruptedException e) { System.out.println("Butter interrupt" ); } System.out.println("Butter off" ); } } class Jammer implements Runnable { private ToastQueue butterQueue, finishedQueue ; public Jammer (ToastQueue butterQueue, ToastQueue finishedQueue) { this .butterQueue = butterQueue ; this .finishedQueue = finishedQueue ; } @Override public void run () { try { while (!Thread.interrupted()) { Toast take = butterQueue.take(); take.jam(); System.out.println(take); finishedQueue.add(take); } } catch (InterruptedException e) { System.out.println("Jammer interrupt" ); } System.out.println("Jammer off" ); } } class Eater implements Runnable { private ToastQueue finishedQueue ; private int counter = 0 ; public Eater (ToastQueue finishedQueue) { this .finishedQueue = finishedQueue ; } @Override public void run () { try { while (!Thread.interrupted()) { Toast take = finishedQueue.take(); if (take.getId() != counter ++ || take.getStatus() != Toast.Status.JAMMED) { System.out.println(">>> Error: " + take); System.exit(1 ); } else { System.out.println("chomp! " + take); take.recyleToast(); take = null ; } } } catch (InterruptedException e) { System.out.println("Eater interrupt" ); } System.out.println("Eater off" ); } } Toast 0 : DRY Toast 0 : BUTTERED Toast 0 : JAMMED chomp! Toast 0 : JAMMED Toast 1 : DRY Toast 1 : BUTTERED Toast 1 : JAMMED chomp! Toast 1 : JAMMED Toast 2 : DRY Toast 2 : BUTTERED Toast 2 : JAMMED chomp! Toast 2 : JAMMED Toast 3 : DRY Toast 3 : BUTTERED Toast 3 : JAMMED chomp! Toast 3 : JAMMED Toast 4 : DRY Toast 4 : BUTTERED Toast 4 : JAMMED chomp! Toast 4 : JAMMED Toast 5 : DRY Toast 5 : BUTTERED Toast 5 : JAMMED chomp! Toast 5 : JAMMED Toast 6 : DRY Toast 6 : BUTTERED Toast 6 : JAMMED chomp! Toast 6 : JAMMED Toast 7 : DRY Toast 7 : BUTTERED Toast 7 : JAMMED chomp! Toast 7 : JAMMED Toast 8 : DRY Toast 8 : BUTTERED Toast 8 : JAMMED chomp! Toast 8 : JAMMED Toast 9 : DRY Toast 9 : BUTTERED Toast 9 : JAMMED chomp! Toast 9 : JAMMED Toast 10 : DRY Toast 10 : BUTTERED Toast 10 : JAMMED chomp! Toast 10 : JAMMED Toast 11 : DRY Toast 11 : BUTTERED Toast 11 : JAMMED chomp! Toast 11 : JAMMED Toast 12 : DRY Toast 12 : BUTTERED Toast 12 : JAMMED chomp! Toast 12 : JAMMED Toast 13 : DRY Toast 13 : BUTTERED Toast 13 : JAMMED chomp! Toast 13 : JAMMED Toast 14 : DRY Toast 14 : BUTTERED Toast 14 : JAMMED chomp! Toast 14 : JAMMED Toaster interrupt Toaster off Butter interrupt Butter off Eater interrupt Eater off Jammer interrupt Jammer off
通过上述的例子,我们得出两点我们需要学习的地方,
通过一个Enum类型来控制共享资源当中的状态变化
通过BlockingQueue来实现出来无锁的同步代码
上述我们说过一个共享队列任何时刻都只允许一个任务插入或者移除元素,通过这点,我们根本不需要给Toast这个共享资源类进行相关的加锁逻辑。
6、管道信号 如果我们把相应的队列换成一个管道,通过输入/输出在线程间进行通信。其实这种想法是在Sun公司没有引入BlockingQueue队列之前的一种方式。同样可以解决大部分的线程协作问题。
关于管道这里主要有两种管道,分别是PipedWriter(允许任务向相关管道当中写),PiepedReader(允许不同的任务从同一个管道当中读取)。管道基本上是一个阻塞队列,所以我们同样也不需要考虑相关加锁问题。下面我们通过一个Demo来学习一下这个吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class PipedIO { public static void main (String[] args) throws Exception { Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(receiver); TimeUnit.SECONDS.sleep(4 ); exec.shutdownNow() ; } } class Sender implements Runnable { private Random rand = new Random(47 ); private PipedWriter out = new PipedWriter(); public PipedWriter getPepedWriter () { return this .out ; } @Override public void run () { try { while (true ) { for (char c = 'A' ; c < 'Z' ; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep(rand.nextInt(500 )); } } } catch (IOException e) { System.out.println(e + " Sender writer execption" ); } catch (InterruptedException e) { System.out.println("Sender interrput" ); } } } class Receiver implements Runnable { private PipedReader reader ; public Receiver (Sender sender) throws IOException { reader = new PipedReader(sender.getPepedWriter()); } @Override public void run () { try { while (true ) { System.out.println("Read : " + (char ) reader.read() + "." ); } } catch (IOException e) { System.out.println(e + "Receiver read exception" ); } } } Read : A. Read : B. Read : C. Read : D. Read : E. Read : F. Read : G. Read : H. Read : I. Read : J. Read : K. Read : L. Read : M. Sender interrput java.io.InterruptedIOExceptionReceiver read exception