等待通知机制
前言:本系列将从零开始讲解java多线程相关的技术,内容参考于《java多线程核心技术》与《java并发编程实战》等相关资料,希望站在巨人的肩膀上,再通过我的理解能让知识更加简单易懂。
非等待通知
public void run() { try { for (int i = 0; i < 10; i++) { list.add(); System.out.println("添加了" + (i + 1) + "个元素"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { while (true) { if (list.size() == 5) { System.out.println("==5了,线程b要退出了!"); throw new InterruptedException(); } } } catch (InterruptedException e) { e.printStackTrace(); } }
-
两个线程实现了通信,但list大小为5的时候,线程B退出了,但是线程B不停地轮询是否为5,这个时候是很占资源的
-
如果轮询的时间间隔小,这个时候更加浪费资源
-
如果轮询的时间间隔大,那么还可能错过了想要的数据,比如可能错过了5
-
这里共享了list,所以实现了通信,但是因为不知道什么时候通信,所以不停地轮询,这种通信有缺点,一是浪费cpu资源,二是可能读取到错误的数据
什么是等待通知机制
-
线程A要等待线程B发出通知才执行,这个时候线程A可以执行wait方法,等待线程B执行notify方法唤醒线程A
等待通知机制实现
public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已发出通知!"); } System.out.println("添加了" + (i + 1) + "个元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } }
-
将上面的代码进行更改,当大小不等于5的时候,线程A处于wait状态,直到线程B发出通知,唤醒线程A,通过等待通知机制,避免了线程A不停轮询造成的资源浪费
消息通知机制注意点
-
wait和notify必须是在同步方法和同步代码块里面调用,要不然会抛出异常
-
notify方法是继承自Object类,可以唤醒在此对象监视器等待的线程,也就是说唤醒的是同一个锁的线程
-
notify方法调用之后,不会马上释放锁,而是运行完该同步方法或者是运行完该同步代码块的代码
-
调用notify后随机唤醒的是一个线程
-
调用wait方法后会将锁释放
-
wait状态下中断线程会抛出异常
-
wait(long),超过设置的时间后会自动唤醒,还没超过该时间也可以通过其他线程唤醒
-
notifyAll可以唤醒同一锁的所有线程
-
如果线程还没有处于等待状态,其他线程进行唤醒,那么不会起作用,此时会打乱程序的正常逻辑
案例:生产者消费者模式
一个生产者,一个消费者
public void setValue() { try { synchronized (lock) { if (!ValueObject.value.equals("")) { lock.wait(); } String value = System.currentTimeMillis() + "_" + System.nanoTime(); System.out.println("set"+ value); ValueObject.value = value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void getValue() { try { synchronized (lock) { if (ValueObject.value.equals("")) { lock.wait(); } System.out.println("get"+ ValueObject.value); ValueObject.value = ""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { while (true) { r.getValue(); } } public void run() { while (true) { p.setValue(); } }
-
如果我们创建一个生产线程,一个消费线程,那么这个时候会交替运行
多个生产者,多个消费者
public void getValue() { try { synchronized (lock) { while (ValueObject.value.equals("")) { System.out.println("消费者 " + Thread.currentThread().getName() + " WAITING了☆"); lock.wait(); } System.out.println("消费者 " + Thread.currentThread().getName() + " RUNNABLE了"); ValueObject.value = ""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { while (true) { r.getValue(); } } public void setValue() { try { synchronized (lock) { while (!ValueObject.value.equals("")) { System.out.println("生产者 " + Thread.currentThread().getName() + " WAITING了★"); lock.wait(); } System.out.println("生产者 " + Thread.currentThread().getName() + " RUNNABLE了"); String value = System.currentTimeMillis() + "_" + System.nanoTime(); ValueObject.value = value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { while (true) { p.setValue(); } }
-
如果这个时候创建多个生产者,多个消费者,如果连续唤醒的是同类线程,那么会出现假死状态,就是线程都处于waiting状态,因为notify随机唤醒一个线程,如果唤醒的同类的,那么就浪费了一次唤醒,如果这个时候无法再唤醒异类线程,那么就会假死。这种情况把notify改成notifyAll()就行了。
消息通知机制需要注意的地方
-
是否线程唤醒的是同类线程会造成影响
-
生产者消费模式,判断条件if和while应该使用哪一个
通过管道进行线程间通信
public class ThreadWrite extends Thread { private WriteData write; private PipedOutputStream out; public ThreadWrite(WriteData write, PipedOutputStream out) { super(); this.write = write; this.out = out; } @Override public void run() { write.writeMethod(out); } } public class ThreadRead extends Thread { private ReadData read; private PipedInputStream input; public ThreadRead(ReadData read, PipedInputStream input) { super(); this.read = read; this.input = input; } @Override public void run() { read.readMethod(input); } } public class Run { public static void main(String[] args) { try { WriteData writeData = new WriteData(); ReadData readData = new ReadData(); PipedInputStream inputStream = new PipedInputStream(); PipedOutputStream outputStream = new PipedOutputStream(); // inputStream.connect(outputStream); outputStream.connect(inputStream);//关键 ThreadRead threadRead = new ThreadRead(readData, inputStream); threadRead.start(); Thread.sleep(2000); ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream); threadWrite.start(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
-
PipedInputStream和PiepedOutputStream(对应字符流PipedReader和PipedOutputWriter)这几个类可以实现线程间流的通信,将管道输出流和输出流连接,实现一个线程往管道发送数据,一个线程从管道读取数据
join方法
public static void main(String[] args) { try { MyThread threadTest = new MyThread(); threadTest.start(); threadTest.join(); System.out.println("threadTest对象执行完,我再执行"); } catch (InterruptedException e) { e.printStackTrace(); } }
-
当前线程阻塞(main线程),调用线程(threadTest)正常执行,执行完后当前线程(main)继续执行
public class ThreadB extends Thread { @Override public void run() { try { ThreadA a = new ThreadA(); a.start(); a.join(); System.out.println("线程B在run end处打印了"); } catch (InterruptedException e) { System.out.println("线程B在catch处打印了"); e.printStackTrace(); } } }
-
如果线程B执行完了join方法,此时线程B被中断,那么这个时候抛出异常,但是线程A正常运行
join(long)和sleep(long)的区别
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
-
从join方法的源代码可以发现,他的核心方法是wait,在前面已经提到wait方法会释放锁,说明join方法也会释放锁,但是sleep是不会释放锁的。
-
join方法是非静态的,而sleep是静态的
ThreadLocal
-
解决变量在各个线程的隔离性,每个线程绑定自己的值
public void run() { try { for (int i = 0; i < 100; i++) { if (Tools.tl.get() == null) { Tools.tl.set("ThreadA" + (i + 1)); } else { System.out.println("ThreadA get Value=" + Tools.tl.get()); } Thread.sleep(200); } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { for (int i = 0; i < 100; i++) { if (Tools.tl.get() == null) { Tools.tl.set("ThreadB" + (i + 1)); } else { System.out.println("ThreadB get Value=" + Tools.tl.get()); } Thread.sleep(200); } } catch (InterruptedException e) { e.printStackTrace(); } } public class Tools { public static ThreadLocal tl = new ThreadLocal(); }
-
每个线程都设置了值,但是得到的值却是自己的,互相隔离
-
如果不开始不设置值,那么得到的值都是null,可以通过继承ThreadLocal,重载initalValue方法,设置初始值
public class ThreadLocalExt extends ThreadLocal { @Override protected Object initialValue() { return new Date().getTime(); } }
-
InheritableThreadLocal,子线程可以继承父线程的值
public class InheritableThreadLocalExt extends InheritableThreadLocal { @Override protected Object initialValue() { return new Date().getTime(); } } public static void main(String[] args) { try { for (int i = 0; i < 10; i++) { System.out.println(" 在Main线程中取值=" + Tools.tl.get()); Thread.sleep(100); } Thread.sleep(5000); ThreadA a = new ThreadA(); a.start(); } catch (InterruptedException e) { e.printStackTrace(); } } //main线程和A线程输出的一样
-
在上面代码的基础上,重写childValue方法可以设置子线程的值
-
作者:jiajun 出处: http://www.cnblogs.com/-new/
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/15336.html