一、线程池
a、固定大小的线程池
import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; /** * Java线程:线程池- * * */ public class Test { public static void main(String[] args) { //创建一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThreadPool(2); //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口 Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //将线程放入池中进行执行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //关闭线程池 pool.shutdown(); } } class MyThread extends Thread{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"正在执行..."); } }
运行结果:
pool-1-thread-1正在执行... pool-1-thread-1正在执行... pool-1-thread-2正在执行... pool-1-thread-1正在执行... pool-1-thread-2正在执行...
如果将线程池的大小改为4,则运行结果如下:
pool-1-thread-2正在执行... pool-1-thread-3正在执行... pool-1-thread-3正在执行... pool-1-thread-2正在执行... pool-1-thread-1正在执行...
b、单任务线程池
在上例的基础上改一行创建pool对象的代码为:
//创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 ExecutorService pool = Executors.newSingleThreadExecutor();
则,运行结果为:
pool-1-thread-1正在执行... pool-1-thread-1正在执行... pool-1-thread-1正在执行... pool-1-thread-1正在执行... pool-1-thread-1正在执行...
c、可变尺寸的线程池
与上面的类似,只是改动下pool的创建方式:
//创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。 ExecutorService pool = Executors.newCachedThreadPool();
运行结果如下:
pool-1-thread-1正在执行... pool-1-thread-5正在执行... pool-1-thread-4正在执行... pool-1-thread-3正在执行... pool-1-thread-2正在执行...
d、延迟连接池
package concurrent; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Java线程:线程池- * * */ public class Test { public static void main(String[] args) { //创建一个线程池,它可那排在给定延迟后运行命令或者定期地执行 ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口 Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //将线程放入池中进行执行 pool.execute(t1); pool.execute(t2); pool.execute(t3); //使用延迟执行风格的方法 pool.schedule(t4, 5, TimeUnit.SECONDS); pool.schedule(t5, 10, TimeUnit.SECONDS); //关闭线程池 pool.shutdown(); } } class MyThread extends Thread{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"正在执行..."); } }
e、单任务连接线程池
在d的代码基础上,做改动
//创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
运行时,会发现,t4延迟5s后得到执行,t5延迟10s后得到执行。运行结果如下:
pool-1-thread-2正在执行... pool-1-thread-1正在执行... pool-1-thread-1正在执行... pool-1-thread-2正在执行... pool-1-thread-1正在执行...
f、自定义线程池
package concurrent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Java线程:线程池-自定义线程池 * * */ public class Test { public static void main(String[] args) { //创建等待队列 BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bqueue); //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口 Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); Thread t6 = new MyThread(); Thread t7 = new MyThread(); //将线程放入池中进行执行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); //关闭线程池 pool.shutdown(); } } class MyThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + "正在执行..."); try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结构如下:
创建自定义线程池的构造方法很多,本例中参数的含义如下:
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
-
用给定的初始参数和默认的线程工厂及处理程序创建新的
ThreadPoolExecutor。使用
Executors
工厂方法之一比使用此通用构造方法方便得多。- 参数:
-
corePoolSize
– 池中所保存的线程数,包括空闲线程。 -
maximumPoolSize
– 池中允许的最大线程数。 -
keepAliveTime
– 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 -
unit
– keepAliveTime 参数的时间单位。 -
workQueue
– 执行前用于保持任务的队列。此队列仅保持由
execute 方法提交的
Runnable 任务。 - 抛出:
-
IllegalArgumentException
– 如果 corePoolSize 或 keepAliveTime 小于零,或者 maximumPoolSize 小于或等于零,或者 corePoolSize 大于 maximumPoolSize。 -
NullPointerException
– 如果
workQueue 为 null
二、有返回值的线程
下面是一个简单的例子:
package MultiThread; import java.util.concurrent.*; /** * Java线程:有返回值的线程 * * */ public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(2); //创建两个有返回值的任务 Callable<String> c1 = new MyCallable("A"); Callable<String> c2 = new MyCallable("B"); //执行任务并获取Future对象 Future<String> f1 = pool.submit(c1); Future<String> f2 = pool.submit(c2); //从Future对象上获取任务的返回值,并输出到控制台 System.out.println(">>>"+f1.get().toString()); System.out.println(">>>"+f2.get().toString()); //关闭线程池 pool.shutdown(); } } class MyCallable implements Callable<String>{ private String oid; MyCallable(String oid) { this.oid = oid; } @Override public String call() throws Exception { return oid+"任务返回的内容"; } }
运行结果:
>>>A任务返回的内容
>>>B任务返回的内容
比较简单,要深入了解还需要看Callable和Future接口的API啊。
三、并发库的锁
在Java5中,专门提供了锁对象,利用锁可以方便的实现资源的封锁,用来控制对竞争资源并发访问的控制,这些内容主要集中在java.util.concurrent.locks 包下面,里面有三个重要的接口Condition、Lock、ReadWriteLock。
接口摘要 | |
---|---|
Condition | Condition 将 Object 监视器方法(wait 、notify 和 notifyAll )分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。 |
Lock | Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。 |
ReadWriteLock | ReadWriteLock 维护了一对相关的锁 ,一个用于只读操作,另一个用于写入操作。 |
有关锁的介绍,API文档解说很多,看得很烦,还是看个例子再看文档比较容易理解
a、普通锁
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Java线程:锁 * * */ public class Test { public static void main(String[] args) { //创建并发访问的账户 MyCount myCount = new MyCount("95599200901215522", 10000); //创建一个锁对象 Lock lock = new ReentrantLock(); //创建一个线程池 ExecutorService pool = Executors.newCachedThreadPool(); //创建一些并发访问用户,一个信用卡,存的存,取的取,好热闹啊 UserThread ut1 = new UserThread("取款线程1", myCount, -4000, lock); UserThread ut2 = new UserThread("存款线程1", myCount, 6000, lock); UserThread ut3 = new UserThread("取款线程2", myCount, -8000, lock); UserThread ut4 = new UserThread("存款线程2", myCount, 800, lock); //在线程池中执行各个用户的操作 pool.execute(ut1); pool.execute(ut2); pool.execute(ut3); pool.execute(ut4); //关闭线程池 pool.shutdown(); } } /** * 信用卡的用户 线程 * 多个用户线程操作该信用卡 */ class UserThread implements Runnable { private String threadName; //用户线程 private MyCount myCount; //所要操作的账户 private int iocash; //操作的金额,当然有正负之分了 private Lock myLock; //执行操作所需的锁对象 UserThread(String name, MyCount myCount, int iocash, Lock myLock) { this.threadName = name; this.myCount = myCount; this.iocash = iocash; this.myLock = myLock; } public void run() { //获取锁 myLock.lock(); //执行现金业务 System.out.println(threadName + "正在操作" + myCount + "账户,操作金额为" + iocash + ",当前金额为" + myCount.getCash()); myCount.setCash(myCount.getCash() + iocash); System.out.println("/t操作成功,操作金额为" + iocash + ",当前金额为" + myCount.getCash()); //释放锁,否则别的线程没有机会执行了 myLock.unlock(); } } /** * 信用卡账户,可随意透支 */ class MyCount { private String oid; //账号 private int cash; //账户余额 MyCount(String oid, int cash) { this.oid = oid; this.cash = cash; } public String getOid() { return oid; } public void setOid(String oid) { this.oid = oid; } public int getCash() { return cash; } public void setCash(int cash) { this.cash = cash; } @Override public String toString() { return "MyCount{" + "oid='" + oid + '/'' + ", cash=" + cash + '}'; } }
运行结果:
取款线程1正在操作MyCount{oid='95599200901215522', cash=10000}账户,操作金额为-4000,当前金额为10000 操作成功,操作金额为-4000,当前金额为6000 存款线程1正在操作MyCount{oid='95599200901215522', cash=6000}账户,操作金额为6000,当前金额为6000 操作成功,操作金额为6000,当前金额为12000 存款线程2正在操作MyCount{oid='95599200901215522', cash=12000}账户,操作金额为800,当前金额为12000 操作成功,操作金额为800,当前金额为12800 取款线程2正在操作MyCount{oid='95599200901215522', cash=12800}账户,操作金额为-8000,当前金额为12800 操作成功,操作金额为-8000,当前金额为4800
b、读写锁
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Java线程:锁 * * */ public class Test { public static void main(String[] args) { //创建并发访问的账户 MyCount myCount = new MyCount("95599200901215522", 10000); //创建一个锁对象 ReadWriteLock lock = new ReentrantReadWriteLock(); //创建一个线程池 ExecutorService pool = Executors.newCachedThreadPool(); //创建一些并发访问用户线程,一个信用卡,存的存,取的取,好热闹啊 UserThread ut1 = new UserThread("取款线程1", myCount, -4000, lock,false); UserThread ut2 = new UserThread("存款线程1", myCount, 6000, lock,false); UserThread ut3 = new UserThread("取款线程2", myCount, -8000, lock,false); UserThread ut4 = new UserThread("存款线程2", myCount, 800, lock,false); UserThread ut5 = new UserThread("查询", myCount, 0, lock,true); //在线程池中执行各个用户的操作 pool.execute(ut1); pool.execute(ut2); pool.execute(ut3); pool.execute(ut4); pool.execute(ut5); //关闭线程池 pool.shutdown(); } } /** * 信用卡的用户 线程 * 多个用户线程操作该信用卡 */ class UserThread implements Runnable { private String threadName; //用户线程 private MyCount myCount; //所要操作的账户 private int iocash; //操作的金额,当然有正负之分了 private ReadWriteLock myLock; //执行操作所需的锁对象 private boolean ischeck; //是否查询 UserThread(String name, MyCount myCount, int iocash, ReadWriteLock myLock,boolean ischeck) { this.threadName = name; this.myCount = myCount; this.iocash = iocash; this.myLock = myLock; this.ischeck=ischeck; } public void run() { if(ischeck){ //获取读锁 myLock.readLock().lock(); //执行查询 System.out.println("读:"+threadName + "正在查询" + myCount + "账户,,当前金额为" + myCount.getCash()); //释放获取到的读锁 myLock.readLock().unlock(); }else{ //获取写锁 myLock.writeLock().lock(); myCount.setCash(myCount.getCash() + iocash); System.out.println("写:"+threadName+"操作成功,操作金额为" + iocash + ",当前金额为" + myCount.getCash()); //释放锁获取到的写锁 myLock.writeLock().unlock(); } } } /** * 信用卡账户,可随意透支 */ class MyCount { private String oid; //账号 private int cash; //账户余额 MyCount(String oid, int cash) { this.oid = oid; this.cash = cash; } public String getOid() { return oid; } public void setOid(String oid) { this.oid = oid; } public int getCash() { return cash; } public void setCash(int cash) { this.cash = cash; } @Override public String toString() { return "MyCount{" + "oid='" + oid + '/'' + ", cash=" + cash + '}'; } }
运行结果:
写:取款线程1操作成功,操作金额为-4000,当前金额为6000 写:取款线程2操作成功,操作金额为-8000,当前金额为-2000 写:存款线程1操作成功,操作金额为6000,当前金额为4000 读:查询正在查询MyCount{oid='95599200901215522', cash=4000}账户,,当前金额为4000 写:存款线程2操作成功,操作金额为800,当前金额为4800
在实际开发中,最好在能用读写锁的情况下使用读写锁,而不要用普通锁,以求更好的性能。
四、信号量
因此,本人认为,这个信号量类如果能返回数目,还能知道哪些对象在等待,哪些资源可使用,就非常完美了,仅仅拿到这些概括性的数字,对精确控制意义不是很大。目前还没想到更好的用法。
下面是一个简单的例子:
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * Java线程:信号量 * * */ public class Test { public static void main(String[] args) { MyPool myPool = new MyPool(20); //创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(2); MyThread t1 = new MyThread("任务A", myPool, 3); MyThread t2 = new MyThread("任务B", myPool, 12); MyThread t3 = new MyThread("任务C", myPool, 7); //在线程池中执行任务 threadPool.execute(t1); threadPool.execute(t2); threadPool.execute(t3); //关闭池 threadPool.shutdown(); } } /** * 一个池 */ class MyPool { private Semaphore sp; //池相关的信号量 /** * 池的大小,这个大小会传递给信号量 * * @param size 池的大小 */ MyPool(int size) { this.sp = new Semaphore(size); } public Semaphore getSp() { return sp; } public void setSp(Semaphore sp) { this.sp = sp; } } class MyThread extends Thread { private String threadName; //线程的名称 private MyPool pool; //自定义池 private int x; //申请信号量的大小 MyThread(String threadName, MyPool pool, int x) { this.threadName = threadName; this.pool = pool; this.x = x; } public void run() { try { //从此信号量获取给定数目的许可 pool.getSp().acquire(x); //todo:也许这里可以做更复杂的业务 System.out.println(threadName + "成功获取了" + x + "个许可!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放给定数目的许可,将其返回到信号量。 pool.getSp().release(x); System.out.println(threadName + "释放了" + x + "个许可!"); } } }
运行结果:
任务A成功获取了3个许可!
任务B成功获取了12个许可!
任务B释放了12个许可!
任务C成功获取了7个许可!
任务A释放了3个许可!
任务C释放了7个许可!
从结果可以看出,信号量仅仅是对池资源进行监控,但不保证线程的安全,因此,在使用时候,应该自己控制线程的安全访问池资源。
五、阻塞队列
package MultiThread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; /** * Java线程:并发库-阻塞队列 * * */ public class Test { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> bqueue = new ArrayBlockingQueue<Integer>(20); for (int i = 0; i < 30; i++) { //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)。 bqueue.put(i); System.out.println("向阻塞队列中添加了元素:" + i); } System.out.println("程序到此运行结束,即将退出----"); } }
运行结果:
六、阻塞栈
package MultiThread; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; class HelloWorldThread { /** * Java线程:并发库-阻塞栈 * * */ public static void main(String[] args) { BlockingDeque<Integer> bstack=new LinkedBlockingDeque<Integer>(20) ; for(int i=0;i<30;i++){ //将指定元素添加到阻塞栈中,如果没有可用空间,将一直等待(如果有必要)。 bstack.push(i); System.out.println("向阻塞栈中添加了元素:" + i); if(bstack.size()==20){ System.out.println("队列满,弹出"+bstack.pop()); System.out.println("队列满,弹出"+bstack.pop()); System.out.println("队列满,弹出"+bstack.pop()); } } System.out.println("程序到此运行结束,即将退出----"); } }
程序的运行结果和阻塞队列的运行结果一样,程序并没结束,二是阻塞住了,原因是栈已经满了,后面追加元素的操作都被阻塞了。
七、条件变量
而在Java5中,一个锁可以有多个条件,每个条件上可以有多个线程等待,通过调用await()方法,可以让线程在该条件下等待。当调用signalAll()方法,又可以唤醒该条件下的等待的线程。有关Condition接口的API可以具体参考JavaAPI文档。
条件变量比较抽象,原因是他不是自然语言中的条件概念,而是程序控制的一种手段。
任何试图透支的操作都将等待里面有足够存款才执行操作。
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Java线程:条件变量 * * @author leizhimin 2009-11-5 10:57:29 */ public class Test { public static void main(String[] args) { //创建并发访问的账户 MyCount myCount = new MyCount("95599200901215522", 10000); //创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(2); Thread t1 = new SaveThread("张三", myCount, 2000); Thread t2 = new SaveThread("李四", myCount, 3600); Thread t3 = new DrawThread("王五", myCount, 2700); Thread t4 = new SaveThread("老张", myCount, 600); Thread t5 = new DrawThread("老牛", myCount, 1300); Thread t6 = new DrawThread("胖子", myCount, 800); //执行各个线程 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); //关闭线程池 pool.shutdown(); } } /** * 存款线程类 */ class SaveThread extends Thread { private String name; //操作人 private MyCount myCount; //账户 private int x; //存款金额 SaveThread(String name, MyCount myCount, int x) { this.name = name; this.myCount = myCount; this.x = x; } public void run() { myCount.saving(x, name); } } /** * 取款线程类 */ class DrawThread extends Thread { private String name; //操作人 private MyCount myCount; //账户 private int x; //存款金额 DrawThread(String name, MyCount myCount, int x) { this.name = name; this.myCount = myCount; this.x = x; } public void run() { myCount.drawing(x, name); } } /** * 普通银行账户,不可透支 */ class MyCount { private String oid; //账号 private int cash; //账户余额 private Lock lock = new ReentrantLock(); //账户锁 private Condition _save = lock.newCondition(); //存款条件 private Condition _draw = lock.newCondition(); //取款条件 MyCount(String oid, int cash) { this.oid = oid; this.cash = cash; } /** * 存款 * * @param x 操作金额 * @param name 操作人 */ public void saving(int x, String name) { lock.lock(); //获取锁 if (x > 0) { cash += x; //存款 System.out.println(name + "存款" + x + ",当前余额为" + cash); } _draw.signalAll(); //唤醒所有取款等待线程 lock.unlock(); //释放锁 } /** * 取款 * * @param x 操作金额 * @param name 操作人 */ public void drawing(int x, String name) { lock.lock(); //获取锁 try { if (cash - x < 0) { _draw.await(); //阻塞取款操作 } else { cash -= x; //取款 System.out.println(name + "取款" + x + ",当前余额为" + cash); } _save.signalAll(); //唤醒所有存款操作线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); //释放锁 } } }
当然,除了使用并发库来实现存取款操作,我们也可以使用synchronized的方法、synchronized的代码块来实现。对比并发库、synchronized方法、synchronized代码块,第一种最灵活,第二种代码最简单,第三种容易犯错。
八、原子量
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** * Java线程:新特征-原子量 * * */ public class Test { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); Runnable t1 = new MyRunnable("张三", 2000); Runnable t2 = new MyRunnable("李四", 3600); Runnable t3 = new MyRunnable("王五", 2700); Runnable t4 = new MyRunnable("老张", 600); Runnable t5 = new MyRunnable("老牛", 1300); Runnable t6 = new MyRunnable("胖子", 800); //执行各个线程 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); //关闭线程池 pool.shutdown(); } } class MyRunnable implements Runnable { private static AtomicLong aLong = new AtomicLong(10000); //原子量,每个线程都可以自由操作 private String name; //操作人 private int x; //操作数额 MyRunnable(String name, int x) { this.name = name; this.x = x; } public void run() { System.out.println(name + "执行了" + x + ",当前余额:" + aLong.addAndGet(x)); } }
运行结果一:
李四执行了3600,当前余额:13600 张三执行了2000,当前余额:15600 老张执行了600,当前余额:18900 老牛执行了1300,当前余额:20200 胖子执行了800,当前余额:21000 王五执行了2700,当前余额:18300
运行结果二:
张三执行了2000,当前余额:12000 王五执行了2700,当前余额:14700 老张执行了600,当前余额:15300 老牛执行了1300,当前余额:16600 胖子执行了800,当前余额:17400 李四执行了3600,当前余额:21000
运行结果三:
张三执行了2000,当前余额:12000 王五执行了2700,当前余额:18300 老张执行了600,当前余额:18900 老牛执行了1300,当前余额:20200 胖子执行了800,当前余额:21000 李四执行了3600,当前余额:15600
package MultiThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Java线程:并发库-原子量 * * */ public class Test { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); Lock lock=new ReentrantLock(); Runnable t1 = new MyRunnable("张三", 2000,lock); Runnable t2 = new MyRunnable("李四", 3600,lock); Runnable t3 = new MyRunnable("王五", 2700,lock); Runnable t4 = new MyRunnable("老张", 600,lock); Runnable t5 = new MyRunnable("老牛", 1300,lock); Runnable t6 = new MyRunnable("胖子", 800,lock); //执行各个线程 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); //关闭线程池 pool.shutdown(); } } class MyRunnable implements Runnable { private static AtomicLong aLong = new AtomicLong(10000); //原子量,每个线程都可以自由操作 private String name; //操作人 private int x; //操作数额 private Lock lock; MyRunnable(String name, int x,Lock lock) { this.name = name; this.x = x; this.lock=lock; } public void run() { lock.lock(); System.out.println(name + "执行了" + x + ",当前余额:" + aLong.addAndGet(x)); lock.unlock(); } }
运行结果:
张三执行了2000,当前余额:12000 李四执行了3600,当前余额:15600 王五执行了2700,当前余额:18300 老张执行了600,当前余额:18900 老牛执行了1300,当前余额:20200 胖子执行了800,当前余额:21000
九、障碍器
Java5中,添加了障碍器类,为了适应一种新的设计需求,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择障碍器了。障碍器是多线程并发控制的一种手段,用法很简单。
下面给个例子:
package MultiThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * Java线程:新特征-障碍器 * * */ public class Test { public static void main(String[] args) { //创建障碍器,并设置MainTask为所有定数量的线程都达到障碍点时候所要执行的任务(Runnable) CyclicBarrier cb = new CyclicBarrier(7, new MainTask()); new SubTask("A", cb).start(); new SubTask("B", cb).start(); new SubTask("C", cb).start(); new SubTask("D", cb).start(); new SubTask("E", cb).start(); new SubTask("F", cb).start(); new SubTask("G", cb).start(); } } /** * 主任务 */ class MainTask implements Runnable { public void run() { System.out.println(">>>>主任务执行了!<<<<"); } } /** * 子任务 */ class SubTask extends Thread { private String name; private CyclicBarrier cb; SubTask(String name, CyclicBarrier cb) { this.name = name; this.cb = cb; } public void run() { System.out.println("[子任务" + name + "]开始执行了!"); for (int i = 0; i < 999999; i++) ; //模拟耗时的任务 System.out.println("[子任务" + name + "]开始执行完成了,并通知障碍器已经完成!"); try { //通知障碍器已经完成,让出锁(并使得,跳跃的障碍数目-1) cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
运行结果:
[子任务B]开始执行了!
[子任务E]开始执行了!
[子任务C]开始执行了!
[子任务D]开始执行了!
[子任务A]开始执行了!
[子任务E]开始执行完成了,并通知障碍器已经完成!
[子任务B]开始执行完成了,并通知障碍器已经完成!
[子任务A]开始执行完成了,并通知障碍器已经完成!
[子任务C]开始执行完成了,并通知障碍器已经完成!
[子任务D]开始执行完成了,并通知障碍器已经完成!
[子任务F]开始执行了!
[子任务F]开始执行完成了,并通知障碍器已经完成!
[子任务G]开始执行了!
[子任务G]开始执行完成了,并通知障碍器已经完成!
>>>>主任务执行了!<<<<
从执行结果可以看出,所有子任务完成的时候,主任务执行了,达到了控制的目标
总结:
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/16452.html