多线程和并发编程是Java里面的核心内容,通常有以下一些概念需要重点掌握。
- 线程;
- 锁;
- 同步器;
- 并发容器和框架;
- Java并发工具类;
- 原子操作类;
- Executor框架(执行机制);
并发基础概念
可见性和原子性
可见性:一个线程修改了共享变量的值,另一个线程可以读到这个修改的值。
原子性:不可被中断的一个或一系列操作。
保证线程的原子性主要有两种方式:使用总线锁保证原子性和使用缓存锁保证原子性。
原子操作的三种实现方式
CAS(Compare And Swap缩写)
此种实现方式需要输入两个数值(一个旧值和一个新值),在操作期间先比较旧值有没有发生变化,如果没有发生变化,才交换成新值,如果发生了变化就不交换。
CAS通常会遇到三个问题:
- ABA问题:如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际是发生了变化。解决方案:1.使用版本号,在变量前面追加版本号,每次变量更新都把版本号加1。JDK提供的类:AtomicStampedReference;
- 循环时间长开销大;
- 只能保证一个共享变量的原子操作。
对于CAS的问题,可以使用下面的解决方案:JDK提供AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里进行CAS操作。
JDK并发包的支持
JDK本身提供的开发包就提供了原子性操作,
如:AtomicBoolean(用原子方式更新的boolean值),AtomicInteger(用原子方式更新的int值),AutomicLong(用原子方式更新的long值)。
线程同步
此处主要讲两个涉及线程同步的关键字:volatile和synchronized。
volatile
使用volatile关键字修饰的线程具有如下的一些特性:
- 可见性:对一个volatile变量的读,总是能看到任意线程对这个volatile变量最后的写入。
- 原子性:对任意单个volatile变量的读/写具有原子性。
- 从内存语义角度:volatile的写-读与锁的释放-获取有相同的内存效果。
- 为了实现volatile的内存语义,编译期在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
- 从编译器重排序规则和处理器内存屏障插入策略来看,只要volatile变量与普通变量之间的重排序可能会破坏volatile的内存语义,这种重排序就会被编译器重排序规则和处理器内存屏障插入策略禁止。
synchronized
synchronized锁的对象有以下几种情况:
- 对于普通同步方法,锁是当前实例对象;
- 对于静态同步方法,锁是当前类的Class对象;
- 对于同步方法块,锁是Synchronized括号里配置的对象。
重排序
Java的重排序有以下几种情况:
- 编译器优化的重排序;
- 指令级并行的重排序;
- 内存系统的重排序。
编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。
顺序一致性
顺序一致性内存模型两大特征:
– 一个线程中的所有操作必须按照程序的顺序来执行;
– (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。
双重检查锁定与延迟初始化
看一个简单的例子:
public class DoubleCheckedLocking {
private static DoubleCheckedLocking instance;
public static DoubleCheckedLocking getInstance() {
if(null == instance) {
synchronized(DoubleCheckedLocking.class) {
if(null == instance) {
instance = new DoubleCheckedLocking(); }
}
}
return instance;
}
}
生产者与消费者模型
生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图:
编码实现
生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列。那生产者和消费者之间怎么进行通信呢?这里面就涉及到多线程之间的协作,其本质就是多线程通信的一个范例。
在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。所以,代码实现如下:
生产者
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;// 内存缓冲区
private static AtomicInteger count = new AtomicInteger();// 总数 原子操作
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producting id:" + Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data + " 加入队列");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.err.println(" 加入队列失败");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
消费者
package ProducterAndConsumer.Version1;
/**
* 消费者
* @author ctk
*/
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id :"+Thread.currentThread().getId());
Random r = new Random();
try{
while(true){
PCData data = queue.take();
if(data != null)
{
int re = data.getData() * data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
主函数
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
Thread.sleep(10*1000);
p1.stop();
p2.stop();
p3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
涉及的PCData数据类:
public class PCData {
private final int intData;
public PCData(int d){
intData = d;
}
public PCData(String d){
intData = Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}
因为BlockingQueue是一个阻塞队列,它的存取可以保证只有一个线程在进行,所以根据逻辑,生产者在内存满的时候进行等待,并且唤醒消费者队列,反过来消费者在饥饿状态下等待并唤醒生产者进行生产。
线程
众所周知,操作系统在运行一个程序时会为其创建一个进程。而进程调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器,堆栈和局部变量等属性。
在Java中,创建线程主要有三种方式:Thread、Runnable和Callable。
Thread
@Testpublic void testThread() {
Thread thread = new Thread("myThread");
thread.start();}
Runnable
@Testpublic void testRunnable() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("myThread"); } });
thread.start();}
Callable
@Testpublic void testFutureTask() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello,World!!!"; } });
String result = future.get();
System.out.println(result);
executorService.shutdown();
}
Daemon线程
Daemon即守护线程,Java将线程分为用户线程 (User Thread)和守护线程 (Daemon Thread)。
所谓守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者。用户进程和守护进程的区别在于,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
将普通线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。
线程等待/通知
等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。
等待方遵循如下规则:
- 获取对象的锁;
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件;
- 条件满足则执行对应的逻辑。
而通知方遵循如下规则:
- 获得对象的锁;
- 改变条件;
- 通知所有等待在对象上的线程。
Thread.join()
ThreadLocal
ThreadLocal,即线程变量,是一个以ThreadLocal对象为键,任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定到这个线程上的一个值。
线程的终止/中断
Thread.interrupt(中断线程)
中断线程往往需要满足以下条件:
- 除非线程正在进行中断它自身,否则都会接受这个方法的中断,并且会调用Thread.checkAccess(),可能会抛出SecurityException。
- 如果线程调用了Object.wait(),Thread.sleep(),Thread.join()处于阻塞状态,那它的堵塞状态会被清除,并得到一个InterruptedException。
- 如果线程在InterruptibleChannel上的I/O操作中被中断,通道会被关闭,线程的中断状态会被设置,并得到一个ClosedByInterruptedException。
Thread.interrupted
Thread.interrupted用于测试当前线程是否被中断。如果连续调用两次调用这个方法都返回为false,则说明线程已经被中断。
Thread.isInterrupted
Thread.isInterrupted用于测试某个线程是否被中断。
锁
锁是Java并发编程中最重要的同步实现机制,锁除了让临界区处于互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。在学习Java的并发编程中会遇到各种各样的锁的概念:公平锁、非公平锁、自旋锁、可重入锁、偏向锁、轻量级锁、重量级锁、读写锁、互斥锁等待。
重入锁
重进入是指任意线程在获取到锁之后,再次获取该锁而不会被该锁所阻塞,重入锁支持获取锁时的公平性和非公平性选择。重入锁用于解决两个问题:
- 线程再次获取锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次获取锁。
- 锁的最终释放:锁的最终释放要求锁对于锁获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经释放。
排他锁
排他锁(ReentrantLock)又称为写锁、独占锁,是一种基本的锁类型。
公平锁
公平锁和非公平锁的队列都基于锁内部维护的一个双向链表,表结点Node的值就是每一个请求当前锁的线程,公平锁则在于每次都是依次从队首取值。
公平锁需要满足以下特点:
- 表结点Node和状态state的volatile关键字;
- sum.misc.Unsafe.compareAndSet的原子操作;
- 公平锁获取时,首先会去读volatile变量;
- 公平锁释放时,最后要写一个volatile变量state。
非公平锁
在等待锁的过程中, 如果有任意新的线程妄图获取锁,都是有很大的几率直接获取到锁的。
公平锁VS非公平锁
此类的构造方法接受一个可选的公平 参数。当设置为 true 时,在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程。否则此锁将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁)相比,使用公平锁的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁和保证锁分配的均衡性时差异较小。不过要注意的是,公平锁不能保证线程调度的公平性。因此,使用公平锁的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁时。还要注意的是,未定时的 tryLock 方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁是可用的,此方法就可以获得成功。
就获取锁的概率而言:
- 公平锁:如果一个锁是公平的,那么获取锁的顺序就应该符合请求的绝对时间顺序,也就是FIFO。
- 非公平锁:刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。
Lock
读写锁
读写锁(ReentrantReadWriteLock),读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被堵塞。
读写锁的实现主要有以下几种:
- 读写状态的设计:同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
- 写锁的获取与释放:写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
- 读锁的获取与释放:如果当前线程已经获取了读锁,就增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
- 锁降级:锁降级指的是写锁降级为读锁。指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)读锁的过程。
锁分为四种状态:无锁,偏向锁,轻量级锁,重量级锁。关于这方面的内容大家可以自行百度。
Condition接口
Condition接口提供了类似Object的监视器方法(包括wait(),wait(long timeout),notify(),以及notifyAll()方法),与Lock配合可以实现等待/通知模式。
Condition的实现上,主要有以下几种类型:
- 等待队列:等待队列是一个FIFO队列,在队列中的每一个节点都包含了一个线程引用,该线程是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程会释放锁,构造成节点加入等待队列并进入等待状态。
- 等待:调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关的锁。
- 通知:调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移步到同步队列。
死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
死锁的避免
对于死锁,可以使用下面的情况进行避免:
- 加锁顺序(线程按照一定的顺序加锁);
- 加锁时限(线程尝试获取锁的时候加上一定的时限,超过时限则放弃对该锁的请求,并释放自己占有的锁);
- 死锁检测。
对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。
Java并发工具类
CyclicBarrier
一组线程在到达一个屏障(同步点)前被堵塞,直到最后一个线程到达屏障时,屏障才会放行,这组线程才能继续执行。
应用场景:可以用于多线程计算数据,最后合并计算结果。
CyclicBarrier与CountDownLatch的区别:CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CountDownLatch的计数是减法,CyclicBarrier的计数是加法。
Semaphore
用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。
应用场景:可以用于流量控制,特别是公共资源有限的应用场景,比如数据库连接。
CountDownLatch
允许一个或多个线程等待其他线程完成操作。
Exchanger
Exchanger是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
原子操作类
原子更新基本类型类
- AtomicBoolean
- AtomicInteger
- AtomicLong
原子更新数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
原子更新引用类型
- AtomicReference
- AtomicReferenceFieldUpdater 原子更新引用类型里的字段
- AtomicMarkableReference 原子更新带有标记位的引用类型
原子更新字段类
- AtomicIntegerFieldUpdater 原子更新整型的字段的更新器
- AtomicLongFieldUpdater 原子更新长整型字段的更新器
- AtomicStampedReference原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。
Executor框架
Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor简介
Executor的结构如下图:
- Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command);
- ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法;
- AbstractExecutorService:ExecutorService执行方法的默认实现;
- ScheduledExecutorService:一个可定时调度任务的接口;
- ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池;
- ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象。
Executor的生命周期
Executor的生命周期包括创建,提交,开始,完成,而ExecutorService提供了管理Executor生命周期的方法,ExecutorService的生命周期包括了:运行 关闭和终止三种状态。Executor生命周期对应的方法有:
- ExecutorService在初始化创建时处于运行状态;
- shutdown方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭;
- shutdownNow方法将强制终止所有运行中的任务并不再允许提交新任务。
ExecutorService.submit()和ExecutorService.execute()
调用的方法作用execute用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且通过Future的get()方法来获取返回值,get()方法会阻塞线程直到任务完成。
ExecutorService执行流程
使用Exectors.newFiexedThreadPool()创建一个具有固定线程数线程池,在大多数时候,线程会主动执行任务,当所有的线程都在执行任务时,有新的任务加入进来,就会进入等待队列,当有空闲的线程,等待队列中的任务就会被执行。
如果有线程在执行过程中因为执行失败要关闭,新创建的线程会替失败的线程执行接下来的任务;
如果想要关闭这个线程池,可以调用ExecutorService的shutDown方法或者shutDownNow方法。
Executors.newCachedThreadPool()则适用于执行很多短期异步任务的小程序,或者是负载较轻的服务器。
Executors.newScheduledThreadPool():适用于需要多个后台线程执行周期任务,同时为了满足管理的需求而需要限制后台线程的数量的应用场景。
Executors.newSingleThreadScheduledExecutor():需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务。
当然,除了上面介绍的一些基础概念和知识之外,还有一些并发编程的知识本文并未讲解,如多线程编程必然涉及到“上下文切换”,而如何优雅的进行上下文切换,也是多线程的核心内容。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/13023.html