线程
实现线程的两种方式:继承 Thread,实现 Runnable 接口
有了 Thread 不就够了?通过继承Thread来实现线程虽然比较简单,但 Java 中每个类最多只能有一个父类,如果类已经有父类了,就不能再继承 Thread。
启动线程调 start 而不是 run,一个线程对象只能启动一次
线程的状态
- NEW:没有调用 start 的线程状态为 NEW。
- TERMINATED:线程运行结束后状态为 TERMINATED。
- RUNNABLE:调用 start 后线程在执行 run 方法且没有阻塞时状态为 RUNNABLE,不过,RUNNABLE 不代表 CPU 一定在执行该线程的代码,可能正在执行也可能在等待操作系统分配时间片,只是它没有在等待其他条件。
- BLOCKED、WAITING、TIMED_WAITING:都表示线程被阻塞了,在等待某些条件。
除了main线程外,至少还有一个负责垃圾回收的线程,这个线程就是 daemon 线程,在 main 线程结束的时候,垃圾回收线程也会退出。
Thread 几个静态方法:
-
sleep 方法
用该方法会让当前线程睡眠指定的时间,单位是毫秒。睡眠期间,该线程会让出 ** CPU(CPU可以去干其他事了),睡眠期间,线程可以被中断**,如果被中断,sleep 会抛出 InterruptedException 异常。
-
yield 方法
调用该方法,是告诉操作系统的调度器:现在不着急占用 CPU,可以先让其他线程运行。不过,这对调度器也仅仅是建议,调度器如何处理是不一定的,它可能完全忽略该调用。
-
join 方法
可以让调用join的线程等待该线程结束,join 实际上就是调用了 wait 方法。
共享内存及可能存在的问题
每个线程表示一条单独的执行流,有自己的程序计数器,有自己的栈,但线程之间可以共享内存,它们可以访问和操作相同的对象。当多条执行流执行相同的程序代码时,每条执行流都有单独的栈,方法中的参数和局部变量都有自己的一份。当多条执行流可以操作相同的变量时,可能会出现一些意料之外的结果,包括竞态条件和内存可见性问题。
竞态条件
所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。
10个线程同时对一个变量counter执行加一,可能每次结果都不一样,因为counter++这个操作不是原子操作,它分为三个步骤:
- 取counter的当前值
- 在当前值基础上加1
- 将新值重新赋值给counter。
如何解决这个问题:
- synchronized关键字
- 显式锁
- 原子变量
内存可见性
多个线程可以共享访问和操作相同的变量,但一个线程对一个共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。这就是内存可见性问题。在计算机系统中,除了内存,数据还会被缓存在 CPU 的寄存器以及各级缓存中,当访问一个变量时,可能直接从寄存器或 CPU 缓存中获取,而不一定到内存中去取,当修改一个变量时,也可能是先写到缓存中,稍后才会同步更新到内存中。
怎么解决:
- volatile 关键字
- synchronized 关键字
- 显式锁
Synchronized关键字
synchronized 可以用于修饰类的实例方法、静态方法和代码块。
方法加了 synchronized 后,方法内的代码就变成了原子操作。
synchronized 实例方法实际保护的是同一个对象的方法调用,确保同时只能有一个线程执行。
synchronized 保护的是对象而非代码,只要访问的是同一个对象的 synchronized 方法,即使是不同的代码,也会被同步顺序访问。synchronized 方法不能防止非 synchronized 方法被同时执行,所以一般在保护变量时,需要在所有访问该变量的方法上加上 synchronized 。
-
实例方法
synchronized 实例方法保护的是当前实例对象,即this, this 对象有一个锁和一个等待队列,锁只能被一个线程持有,其他试图获得同样锁的线程需要等待。
执行synchronized实例方法的过程大致如下:
- 尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒
- 执行实例方法体代码
- 释放锁,如果等待队列上有等待的线程,从中取一个并唤醒,如果有多个等待的线程,唤醒哪一个是不一定的,不保证公平性。
当前线程不能获得锁的时候,它会加入等待队列等待,线程的状态会变为 BLOCKED。
-
静态方法
对静态方法,保护的是类对象。实际上,每个对象都有一个锁和一个等待队列,类对象也是。
-
代码块
synchronized 括号里面的就是保护的对象,因为任意对象都有一个锁和等待队列,或者说,任何对象都可以作为锁对象。
几个特征
-
可重入性
对同一个执行线程,它在获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。
可重入是通过记录锁的持有线程和持有数量来实现的,当调用被 synchronized 保护的代码时,检查对象是否已被锁,如果是,再检查是否被当前线程锁定,如果是,增加持有数量,如果不是被当前线程锁定,才加入等待队列,当释放锁时,减少持有数量,当数量变为0时才释放整个锁。
-
内存可见性
在释放锁时,所有写入都会写回内存,而获得锁后,都会从内存中读最新数据。
-
死锁
应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。
协作
多线程之间除了竞争访问同一个资源外,也经常需要相互协作,基本方式就是 wait/notify
。
Java 的根父类是 Object , Java 在 Object 类而非 Thread 类中定义了一些线程协作的基本方法,这些方法有两类,一类是 wait ,另一类是 notify 。
wait实际上做了什么?除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。
但调用wait时,线程会释放对象锁。
一个线程因为等待某个条件执行不下去,当这个条件改变之后就该调用 notify 方法了,notify 会从条件队列中选一个线程,将其从队列中移除并唤醒,选哪个是不确定的。而 notifyAll 会移除条件队列中所有的线程并全部唤醒。
调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁。
唤醒之后线程会重新尝试竞争获得锁:如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回,否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从 wait 调用中返回。
线程从wait调用中返回后,不代表其等待的条件就一定成立,它需要重新检查其等待的条件,这也是在条件附近看到 while 而不是 if 的原因。
中断
停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。
每个线程都有一个标志位,表示该线程是否被中断了。
中断相关的方法:
public void interrupt(); // 中断线程
public boolean isInterrupted(); // 线程的中断标志位是否为true
public static boolean interrupted(); // 线程的中断标志位是否为true + 清空中断标志位
注意:interrupt方法不一定会真正“中断”线程,
不同状态对中断信号的反应
-
RUNNABLE
线程在运行或具备运行条件只是在等待操作系统调度。
-
WAITING/TIMED_WAITING
线程在等待某个条件或超时。线程调用join/wait/sleep方法会进入WAITING或TIMED_WAITING状态。调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。InterruptedException是一个受检异常,线程必须进行处理。
-
BLOCKED
线程在等待锁,试图进入同步块。调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。
test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并调用join等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。
public static void test() throws InterruptedException { synchronized (lock) { A a = new A(); a.start(); Thread.sleep(1000); a.interrunpt(); a.join(); } }
注意:在使用 synchronized 关键字获取锁的过程中不响应中断请求,这是 synchronized 的局限性。
-
NEW/TERMINATE
线程还未启动或已结束。调用 interrupt() 对它没有任何效果,中断标志位也不会被设置。
取消/关闭线程的正确方式
原子操作
CAS
原子操作依赖一个很重要的方法:
public final boolean compareAndSet(int expect, int update)
这个方法就被成为CAS
。该方法有两个参数 expect 和 update ,以原子方式实现了如下功能:如果当前值等于 expect ,则更新为 update ,否则不更新,如果更新成功,返回 true,否则返回 false 。
以 AtomicInteger 为例,AtomicInteger 可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性。它的主要内部成员是:
public volatile int value; // 这个变量天生保证内存可见性
AtomicInteger 有个方法 incrementAndGet:
public final int incrementAndGet() {
for(;;) {
int current = get(); // 获取当前值value
int next = current + 1; // 计算期望的值next
// 调CAS方法进行更新,如果更新没有成功,说明value被别的线程改了,则再去取最新值并尝试更新直到成功为止。
if(compareAndSet(current, next)) {
return next;
}
}
}
与 synchronized 锁相比,这种原子更新方式代表一种不同的思维方式。synchronized 是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用 CAS 更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。
AQS
AQS是一个抽象类AbstractQueuedSynchronizer。
AQS封装了一个状态,给子类提供了查询和设置状态的方法:
public volatile int state;
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);
用于实现锁时,AQS 可以保存锁的当前持有线程,提供了方法进行查询和设置:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t);
protected final Thread getExclusiveOwnerThread();
AQS内部维护了一个等待队列,借助 CAS 方法实现了无阻塞算法进行更新。
BUG
使用 CAS 方式更新有一个 ABA 问题。该问题是指,假设当前值为A,如果另一个线程先将 A 修改成 B ,再修改回成 A ,当前线程的 CAS 操作无法分辨当前值发生过变化。
ABA 是不是一个问题与程序的逻辑有关,一般不是问题。而如果确实有问题,解决方法是使用 AtomicStampedReference
显示锁
简介
显式锁接口和类主要有:
- 锁接口Lock,主要实现类是 ReentrantLock
- 读写锁接口 ReadWriteLock,主要实现类是 ReentrantReadWriteLock
Lock 接口定义为:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
// 可以避免死锁。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
- lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。
- lockInterruptibly():与lock()的不同是,它可以响应中断。
- tryLock():只是尝试获取锁,立即返回,不阻塞
- tryLock(long time, TimeUnit unit):先尝试获取锁,如果能成功则立即返回true,否则阻塞等待
- newCondition:新建一个条件,一个Lock可以关联多个条件(见下文显式条件)
Lock接口的主要实现类是ReentrantLock,底层依赖CAS,AQS,ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:
- 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁
- 可以解决竞态条件问题
- 可以保证内存可见性
相较于synchronized
ReentrantLock 和 synchronized 都是默认不保证公平。使用显式锁,一定要记得调用 unlock。
相比 synchronized , ReentrantLock 可以实现与 synchronized 相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。
synchronized代表一种声明式编程思维,程序员更多的是表达一种同步声明,由 Java 系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维,程序员实现所有细节。
简单总结下,能用 synchronized 就用 synchronized,不满足要求时再考虑 ReentrantLock。
显式条件
显式锁与 synchronized 相对应,而显式条件与 wait/notify 相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。
Condition 表示条件变量,是一个接口,其中有 await、signal、signalAll 方法。
await 对应于 Object 的 wait , signal 对应于 notify, signalAll 对应于 notifyAll,语义也是一样的。
一般的 await 相关方法都是响应中断的,如果发生了中断,会抛出 InterruptedException,但中断标志位会被清空。awaitUnInterruptibly() 方法不会响应中断,它不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。
await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从 await 方法中退出。
示例:
static class MyBlockQueue<E> {
private Queue<E> queue = null;
private int limit;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public MyBlockQueue(int limit) {
this.limit = limit;
// ArrayDeque是线程不安全的
queue = new ArrayDeque<>();
}
private void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
// 队列满,在notFull等待,不让放
while (queue.size() == limit) {
notFull.await();
}
queue.add(e);
// 唤醒一下,现在不空了
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
// 队列空,在notEmpty等待,不让取
while (queue.isEmpty()) {
notEmpty.await();
}
E e = queue.poll();
// 唤醒一下,现在不满了
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}
上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知 notEmpty, take 操作后通知 notFull。这样,代码更清晰易读。
异步任务
基本接口:
- Runnable 和 Callable:表示要执行的异步任务。
- Executor 和 ExecutorService:表示执行服务。
- Future 表示异步任务的结果。
Runnable 没有返回结果,而 Callable 有,Runnable 不会抛出异常,而 Callable 会。
Executor 表示最简单的执行服务,可以执行一个 Runnable,没有返回结果。
ExecutorService 扩展了 Executor,其中的 submit 方法表示提交一个任务,返回值类型都是 Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。
Future中的 get 用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待;cancel 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel 返回 false,否则返回true。isDone 和 isCancelled 用于查询任务状态。isCancelled 表示任务是否被取消,只要 cancel 方法返回了 true,随后的isCancelled 方法都会返回 true,即使执行任务的线程还未真正结束。isDone 表示任务是否结束,不管什么原因都算。
Future 是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。
基本使用:
package AsyncTask;
import java.util.Random;
import java.util.concurrent.*;
public class AsyncTaskDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Task());
System.out.println("这是主线程");
Thread.sleep(100);
try {
System.out.println("任务结果" + future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
System.out.println("子线程开始休眠");
Thread.sleep(sleepSeconds);
System.out.println("子线程休眠结束");
return sleepSeconds;
}
}
}
示例中的大致步骤就是:
- 定义一个任务描述要做的事
- 创建ExecutorService实例
- ExecutorService实例提交任务
- 在别处获取任务结果
- 关闭ExecutorService
其中 ExecutorService 有两个关闭方法:ExecutorServicshutdown 和 shutdownNow。区别是,shutdown表示不再接受新任务,shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。shutdown 和 shutdownNow 不会阻塞等待,它们返回后不代表所有任务都已结束,调用者可以通过awaitTermination等待所有任务结束。
ExecutorService 有两组批量提交任务的方法:invokeAll 和 invokeAny。invokeAll 等待所有任务完成,返回的 Future 列表中,每个 Future 的 isDone 方法都返回true,不过 isDone 为true不代表任务就执行成功了,可能是被取消了。而对于 invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消
原理
好累啊不想写了
线程池
线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。( JavaScript 中的异步实现也是类似的套路哦)
- 它可以重用线程,避免线程创建的开销。
- 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。
线程池的实现类是 ThreadPoolExecutor,它继承自 AbstractExecutorService ,实现了 ExecutorService ,基本用法与上节异步任务介绍的类似。
构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
第二个构造方法多了两个参数 threadFactory 和 handler,这两个参数一般不需要,第一个构造方法会设置默认值。参数 corePoolSize、maximumPoolSize、keepAliveTime、unit 用于控制线程池中线程的个数,workQueue 表示任务队列,threadFactory 用于对创建的线程进行一些配置,handler表示任务拒绝策略。
-
线程池大小
- corePoolSize:核心线程个数
- maximumPoolSize:最大线程个数
- keepAliveTime和unit:空闲线程存活时间
一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSize,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于 corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了 maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到 maximumPoolSize。
keepAliveTime 的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于 corePoolSize 时额外空闲线程的存活时间。如果该值为 0 ,则表示所有线程都不会超时终止。
-
队列
这里要求队列类型是阻塞队列 BlockingQueue。
- LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
- ArrayBlockingQueue:基于数组的有界阻塞队列。
- PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
- SynchronousQueue:没有实际存储空间的同步阻塞队列。
注意:如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize,到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到 maximumPoolSize。
-
任务拒绝策略
如果队列有界,且 maximumPoolSize 有限,则当队列排满,线程个数也达到了 maximumPoolSize,这时,新任务会触发线程池的任务拒绝策略。
ThreadPoolExecuto r实现了4种处理方式。
- ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常。
- ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常,也不执行。
- ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
- ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。
拒绝策略可以在构造方法中进行指定,也可以通过 set 方法进行指定
-
工厂
线程池还可以接受一个参数:ThreadFactory。它是一个接口,由这个接口l来定义如何创建一个 Thread。
-
核心线程
线程个数小于等于 corePoolSiz e时,我们称这些线程为核心线程,默认情况下:
- 核心线程不会预先创建,只有当有任务时才会创建。
- 核心线程不会因为空闲而被终止,keepAliveTime 参数不适用。不过,ThreadPoolExecutor 可以调用方法可以改变这个默认行为。
-
死锁
提交给线程池的任务之间有如果依赖,这种情况可能会导致出现死锁。这个死锁不是说共享资源竞争的死锁,而是单纯的等待,比如任务A,在它的执行过程中,它给同样的任务执行服务提交了一个任务B,但需要等待任务B结束。
解决办法:可以使用 newCachedThreadPool 创建线程池,让线程数不受限制。另一个解决方法是使用 SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue 来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。
定时任务
TimerTask
示例
package AsyncTask;
import java.util.Timer;
import java.util.TimerTask;
public class TimerDemo {
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new DelayTask(), 10);
// 延迟指定时间后以固定时延执行
timer.schedule(new DelayTask2(), 100, 1000);
Thread.sleep(4000);
timer.cancel();
}
static class DelayTask extends TimerTask {
@Override
public void run() {
System.out.println("延迟1任务执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
static class DelayTask2 extends TimerTask {
@Override
public void run() {
System.out.println("延迟2任务执行");
}
}
}
创建一个 Timer 对象,先运行 DelayTask,再固定周期运行 DelayTask2,最后调用 Timer 的 cancel 方法取消所有定时任务。
这里会发现 DelayTask2 总是等 DelayTas k执行之后才开始输出,因为一个 Timer 对象只有一个 Timer 线程在执行,所以 DelayTask2 被 DelayTask 给强行延迟了。
注意:任务的延迟执行分为固定延时(fixed-delay)与固定频率(fixed-rate),二者都是重复执行,但后一次任务执行相对的时间是不一样的,对于固定延时,它是基于上次任务的“实际”执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时,而固定频率会尽量补够运行次数。
基本原理
Timer 内部主要由任务队列和 Timer 线程两部分组成,一个 Timer 对象只有一个 Timer 线程。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。Timer 线程主体是一个循环,从队列中获取任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。
在执行任何一个任务的 run 方法时,一旦 run 抛出异常,Timer 线程就会退出,从而所有定时任务都会被取消。
如果希望各个定时任务不互相干扰,一定要在 run 方法内捕获所有异常。
总之需要注意:
- 后台只有一个线程在运行
- 固定频率的任务被延迟后,可能会立即执行多次,将次数补够
- 固定延时任务的延时相对的是任务执行前的时间
- 不要在定时任务中使用无限循环
- 一个定时任务的未处理异常会导致所有定时任务被取消
ScheduledExecutorService
由于 Timer/TimerTask 的一些问题,Java 并发包引入了 ScheduledExecutorService。ScheduledExecutorService 的主要实现类是ScheduledThreadPoolExecutor,它是线程池 ThreadPoolExecutor 的子类,是基于线程池实现的。它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使 corePoolSize 设为 0,它也会至少运行一个线程。
与 Timer 不同,它不支持以绝对时间作为首次运行的时间。另外,单个定时任务的异常不会再导致全部定时任务被取消,即使后台只有一个线程执行任务。不过,需要强调的是,任务发生异常不会在任何地方体现,也就是说在 run 方法里 throw 了之后什么也看不见。所以,与 Timer 中的任务类似,应该捕获所有异常。
package AsyncTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService timer = Executors.newScheduledThreadPool(10);
timer.schedule(new LongRunTask(), 10, TimeUnit.MILLISECONDS);
timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS);
Thread.sleep(4000);
timer.shutdown();
}
static class LongRunTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("LongRunTask");
throw new RuntimeException();
}
}
static class FixedDelayTask implements Runnable {
@Override
public void run() {
System.out.println("FixedDelayTask");
}
}
}
原理
ScheduledThreadPoolExecutor 的实现思路与 Timer 基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:
- 它的背后是线程池,可以有多个线程执行任务。
- 它在任务执行后再设置下次执行的时间,对于固定延时的任务更为合理。
- 任务执行线程会捕获任务执行过程中的所有异常,一个定时任务的异常不会影响其他定时任务,不过,发生异常的任务(即使是一个重复任务)不会再被调度。
工具类
读写锁ReentrantReadWriteLock
synchronized 和显式锁 ReentrantLock,对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。
通过一个 ReadWriteLock 产生两个锁:一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。需要注意的是,只有“读-读”操作是可以并行的,“读-写”和“写-写”都不可以。
内部,它们使用同一个整数变量表示锁的状态,16 位给读锁用,16 位给写锁用,使用一个变量便于进行 CAS 操作,锁的等待队列其实也只有一个。写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了 0,如果是,唤醒等待队列中的下一个线程。
package AsyncTask;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MyCache {
private Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
public void clear() {
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
}
信号量Semaphore
有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。
一般锁只能由持有锁的线程释放,而 Semaphore 表示的只是一个许可数,任意线程都可以调用其 release 方法。主要的锁实现类 ReentrantLock 是可重入的,而 Semaphore 不是,每一次的 acquire 调用都会消耗一个许可,acquire 是会阻塞的。
package AsyncTask;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static class ConcurrentLimitException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
private static final int MAX_PERMITS = 10;
private Semaphore permits = new Semaphore(MAX_PERMITS);
public boolean login(String name, String pwd) {
if(!permits.tryAcquire()) {
throw new ConcurrentLimitException();
}
// TODO 校验密码
return true;
}
public void logout(String name) {
// TODO 登出操作
permits.release();
}
}
倒计时门栓CountDownLatch
门栓的两种应用场景:一种是同时开始,另一种是主从协作。
同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用 await,主线程调用 countDown
主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用 countDown,主线程调用 await 进行等待。
package AsyncTask;
import java.util.concurrent.CountDownLatch;
// 同时开始场景
public class RacerWithCountDwnLatch {
static class Racer extends Thread {
CountDownLatch latch;
public Racer(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await(); // 没有countDown信号就会卡在这
System.out.println(getName() + "开始正式运行" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < 10; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
// 发信号让线程一起开始动作
latch.countDown();
}
}
package AsyncTask;
import java.util.concurrent.CountDownLatch;
// 主从协作场景
public class MasterWorkerDemo {
static class Worker extends Thread {
CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
int sleepTime = (int) (Math.random() * 10);
System.out.println(sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(num);
Worker[] workers = new Worker[num];
for(int i = 0; i < num; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("全部结束");
}
}
循环栅栏CyclicBarrier
CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
与 CountDownLatch 类似,它也有一个数字,但表示的是参与的线程个数。
它有一个构造方法,接受一个 Runnable 参数,这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。
CyclicBarrier 的主要方法就是 await,await 在等待其他线程到达栅栏,调用 await 后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。
package AsyncTask;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
static class Tourist extends Thread {
CyclicBarrier barrier;
public Tourist(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep((int) (Math.random() * 10));
// 第一次集合
barrier.await();
System.out.println(getName() + "继续" + System.currentTimeMillis());
Thread.sleep((int) (Math.random() * 10));
// 第二次集合
barrier.await();
System.out.println(getName() + "继续" + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int num = 3;
Tourist[] tourists = new Tourist[num];
CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
@Override
public void run() {
System.out.println("全部集合了" + System.currentTimeMillis() + " 最后执行者:" + Thread.currentThread().getName());
}
});
for (int p = 0; p < num; p++) {
tourists[p] = new Tourist(barrier);
tourists[p].start();
}
}
}
ThreadLocal
线程本地变量是说,每个线程都有同一个变量的独有拷贝。
多个线程访问的虽然是同一个变量,但每个线程都有自己的独立的值,这就是线程本地变量的含义。
使用场景:日期处理、随机数和上下文信息。
-
日期处理
package Threads.ThreadLocal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * 每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销 */ public class ThreadLocalDateFormat { static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd"); } }; public static String date2String(Date date) { return sdf.get().format(date); } public static Date string2Date(String str) throws ParseException { return sdf.get().parse(str); } }
-
随机数
即使对象是线程安全的,使用 ThreadLocal 也可以减少竞争,它是 Random 的子类,利用了 ThreadLocal,它没有 public 的构造方法,通过静态方法current 获取对象,这个对象就是个就是一个 ThreadLocal 变量。
-
上下文信息
package Threads.ThreadLocal; public class ReqContext { public static class Req {}; private static ThreadLocal<String> localUserId = new ThreadLocal<>(); private static ThreadLocal<Req> localReq = new ThreadLocal<>(); public static String getCurrentUserId() { return localUserId.get(); } public static void setCurrentUserId(String userId) { localUserId.set(userId); } public static Req getCurrentReq() { return localReq.get(); } public static void setCurrentReq(Req req) { localReq.set(req); } }
在一个 Web 服务器中,一个线程执行用户的请求,在执行过程中,很多代码都会访问一些共同的信息,比如请求信息、用户身份信息,它们是线程执行过程中的全局信息,在首次获取到信息时,调用 set 方法如 setCurrentRequest/setCurrentUserId 进行设置,然后就可以在代码的任意其他地方调用 get 相关方法进行获取了。
原理
每个线程都有一个 Map,类型为 ThreadLocalMap ,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的 ThreadLocal 对象,值为 value。
每个线程都有一个 Map,对于每个 ThreadLocal 对象,调用其get/set实际上就是以 ThreadLocal 对象为键读写当前线程的 Map,这样,就实现了每个线程都有自己的独立副本的效果。
小结:
本章介绍了 Java 一些同步协作工具:
- 在读多写少的场景中使用 ReentrantReadWriteLock 替代 ReentrantLock,以提高性能。
- 使用 Semaphore 限制对资源的并发访问数。
- 使用 CountDownLatch 实现不同角色线程间的同步。
- 使用 CyclicBarrier 实现同一角色线程间的协调一致。
- CyclicBarrier 与 CountDownLatch 可能容易混淆,强调下它们的区别:
- CountDownLatch 的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为 0,负责倒计时和等待倒计时的线程都可以有多个,用于不同角色线程间的同步。
- CyclicBarrier 的参与线程角色是一样的,用于同一角色线程间的协调一致。
- CountDownLatch 是一次性的,而 CyclicBarrier 是可以重复利用的。
原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/277182.html