由人民邮电出版社出版的《Java 7并发编程实战手册》终于出版了,译者是俞黎敏和申绍勇,该书将于近期上架。之前并发编程网组织翻译过此书,由于邮电出版社在并发网联系他们之前就找到了译者,所以没有采用并发网的译稿,但邮电出版社将于并发网展开合作,发布该书的样章(样章由并发网挑选,你也可以回帖告诉我们你想看哪一章的样章),并组织赠书活动回馈给活跃读者。活动详情请时刻关注并发网的微博和微信(微信号:ifeves),最后祝各位用餐愉快!:)
本章将介绍下列内容:
- 使用非阻塞式线程安全列表
- 使用阻塞式线程安全列表
- 使用按优先级排序的阻塞式线程安全列表
- 使用带有延迟元素的线程安全列表
- 使用线程安全可遍历映射
- 生成并发随机数
- 使用原子变量
- 使用原子数组
6.1 简介
数据结构(Data Structure)是编程中的基本元素,几乎每个程序都使用一种或多种数据结构来存储和管理数据。Java API提供了包含接口、类和算法的Java集合框架(Java Collection Framework),它实现了可用在程序中的大量数据结构。
当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制。如果一些并发任务共享了一个不适用于并发任务的数据结构,将会遇到数据不一致的错误,并将影响程序的准确运行。这类数据结构的一个例子是ArrayList类。
Java提供了一些可以用于并发程序中的数据集合,它们不会引起任何问题。一般来说,Java提供了两类适用于并发应用的集合。
- 阻塞式集合(Blocking Collection):这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。
- 非阻塞式集合(Non-Blocking Collection):这类集合也包括添加和移除数据的方法。如果方法不能立即被执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。
通过本章的各个小节,你将学会如何在并发应用中使用一些Java集合。
- 非阻塞式列表对应的实现类:ConcurrentLinkedDeque类;
- 阻塞式列表对应的实现类:LinkedBlockingDeque 类;
- 用于数据生成或消费的阻塞式列表对应的实现类:LinkedTransferQueue类;
- 按优先级排序列表元素的阻塞式列表对应的实现类:PriorityBlockingQueue类;
- 带有延迟列表元素的阻塞式列表对应的实现类:DelayQueue类;
- 非阻塞式可遍历映射对应的实现类:ConcurrentSkipListMap类;
- 随机数字对应的实现类:ThreadLocalRandom类;
- 原子变量对应的实现类:AtomicLong和AtomicIntegerArray类。
6.2 使用非阻塞式线程安全列表
最基本的集合类型是列表(List)。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,将会学到如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被执行的操作不能够立即运行(例如,在列表为空时,从列表取出一个元素),方法会抛出异常或返回null。Java 7引入了ConcurrentLinkedDeque类来实现非阻塞式并发列表。
将要实现的范例包括以下两个不同的任务:
- 添加大量的数据到一个列表中;
- 从同一个列表中移除大量的数据。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为AddTask的类,实现Runnable接口。
public class AddTask implements Runnable {
2.声明一个私有的ConcurrentLinkedDeque属性list,并指定它的泛型参数是String型的。
private ConcurrentLinkedDeque list;
3.实现类的构造器来初始化属性。
public AddTask(ConcurrentLinkedDeque list) { this.list=list; }
4.实现run()方法。这个方法将10,000个字符串存放到列表中,这些字符串由当前执行任务的线程的名称和数字组成。
@Override public void run() { String name=Thread.currentThread().getName(); for (int i=0; i<10000; i++){ list.add(name+": Element "+i); } }
5.创建名为PollTask的类,并实现Runnable接口。
public class PollTask implements Runnable {
6.声明一个私有的ConcurrentLinkedDeque属性list,并指定它的泛型参数是String型的。
private ConcurrentLinkedDeque list;
7.实现类的构造器来初始化属性。
public PollTask(ConcurrentLinkedDeque list) { this.list=list; }
8.实现run()方法。这个方法将列表中的10,000个字符串取出,总共取5,000次,每次取两个元素。
@Override public void run() { for (int i=0; i<5000; i++) { list.pollFirst(); list.pollLast(); } }
9.创建范例的主类Main,并添加main()方法。
public class Main { public static void main(String[] args) {
10.创建ConcurrentLinkedDeque对象,并指定它的泛型参数是String型的。
ConcurrentLinkedDeque list=new ConcurrentLinkedDeque<>();
11.创建线程数组threads,它包含100个线程。
Thread threads[]=new Thread[100];
12.创建100个AddTask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。
for (int i=0; i AddTask task=new AddTask(list); threads[i]=new Thread(task); threads[i].start(); } System.out.printf("Main: %d AddTask threads have been launched/n",threads.length);
13.使用join()方法等待线程完成。
for (int i=0; i<threads.length; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } }
14.将列表的元素数量打印到控制台。
System.out.printf("Main: Size of the List: %d/n",list.size());
15.创建100个PollTask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。
for (int i=0; i< threads.length; i++){ PollTask task=new PollTask(list); threads[i]=new Thread(task); threads[i].start(); } System.out.printf("Main: %d PollTask threads have been launched/n",threads.length);
16.使用join()方法等待线程完成。
for (int i=0; i<threads.length; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } }
17.将列表的元素数量打印到控制台。
System.out.printf("Main: Size of the List: %d/n",list.size());
工作原理
本节使用的泛型参数是String类的ConcurrentLinkedDeque对象,用来实现一个非阻塞式并发数据列表。下面的截屏显示了程序的运行结果。
首先,执行100个AddTask任务将元素添加到ConcurrentLinkedDeque对象list中。每个任务使用add()方法向这个列表中插入10,000个元素。add()方法将新元素添加到列表尾部。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有1,000,000个元素。
接下来,执行100个PollTask任务将元素从列表中移除。每个任务使用pollFirst()和pollLast()方法从列表中移除10,000个元素。pollFirst()方法返回并移除列表中的第一个元素,pollLast()方法返回并移除列表中的最后一个元素。如果列表为空,这些方法返回null。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有0个元素。
使用size()方法输出列表中的元素数量。需要注意的是,这个方法返回的值可能不是真实的,尤其当有线程在添加数据或移除数据时,这个方法需要遍历整个列表来计算元素数量,而遍历过的数据可能已经改变。仅当没有任何线程修改列表时,才能保证返回的结果是准确的。
更多信息
ConcurrentLinkedDeque类提供了其他从列表中读取数据的方法。
- getFirst()和getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这两个方法抛出NoSuchElementExcpetion异常。
- peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这些方法返回null。
- remove()、removeFirst()和removeLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,这些方法抛出NoSuchElementExcpetion异常。
6.3 使用阻塞式线程安全列表
最基本的集合类型是列表。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,你会学到如何在并发程序中使用阻塞式列表。阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和删除操作时,如果列表已满或为空,操作不会被立即执行,而是将调用这个操作的线程阻塞队列直到操作可以执行成功。Java引入了LinkedBlocking Deque类来实现阻塞式列表。
将要实现的范例包括以下两个不同的任务:
- 添加大量的数据到一个列表中;
- 从同一个列表中移除大量的数据。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建名为Client的类,并实现Runnable接口。
public class Client implements Runnable{
2.声明一个私有的LinkedBlockingDeque属性requestList,并指定它的泛型参数是String型的。
private LinkedBlockingDeque requestList;
3.实现类的构造器来初始化属性。
public Client (LinkedBlockingDeque requestList) { this.requestList=requestList; }
4.实现run()方法。使用requestList对象的put()方法,每两秒向列表requestList中插入5个字符串。重复3次。
@Override public void run() { for (int i=0; i<3; i++) { for (int j=0; j<5; j++) { StringBuilder request=new StringBuilder(); request.append(i); request.append(":"); request.append(j); try { requestList.put(request.toString()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Client: %s at %s./n",request,new Date()); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("Client: End./n"); }
5. 创建范例的主类Main,并添加main()方法。
public class Main { public static void main(String[] args) throws Exception {
6. 声明并创建LinkedBlockingDeque属性list,并指定它的泛型参数是String型的。
LinkedBlockingDeque list=new LinkedBlockingDeque<>(3);
7.将client作为传入参数创建线程Thread并启动。
Client client=new Client(list); Thread thread=new Thread(client); thread.start();
8.使用list对象的take()方法,每300毫秒从列表中取出3个字符串对象,重复5次。在控制台输出字符串。
for (int i=0; i for (int j=0; j<3; j++) { String request=list.take(); System.out.printf("Main: Request: %s at %s. Size: %d/n",request,new Date(),list.size()); } TimeUnit.MILLISECONDS.sleep(300); }
9.输出一条表示程序结束的消息。
System.out.printf("Main: End of the program./n");
工作原理
本节使用的泛型参数是String的LinkedBlockingDeque对象,用来实现一个阻塞式并发数据列表。
Client类使用put()方法将字符串插入到列表中。如果列表已满(列表生成时指定了固定的容量),调用这个方法的线程将被阻塞直到列表中有了可用的空间。
Main类使用take()方法从列表中取字符串。如果列表为空,调用这个方法的线程将被阻塞直到列表不为空(即有可用的元素)。
这个例子中使用了LinkedBlockingDeque对象的两个方法,调用它们的线程可能会被阻塞,在阻塞时如果线程被中断,方法会抛出InterruptedException异常,所以必须捕获和处理这个异常。
更多信息
LinkedBlockingDeque类也提供了其他存取元素的方法,这些方法不会引起阻塞,而是抛出异常或返回null。
- takeFirst()和takeLast():分别返回列表中第一个和最后一个元素,返回的元素会从列表中移除。如果列表为空,调用方法的线程将被阻塞直到列表中有可用的元素出现。
- getFirst()和getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出NoSuchElementExcpetinon异常。
- peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,返回null。
- poll()、pollFirst()和pollLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,返回null。
- add()、addFirst()和addLast(): 分别将元素添加到列表中第一位和最后一位。如果列表已满(列表生成时指定了固定的容量),这些方法将抛出IllegalStateException异常。
参见
参见6.3节。
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/140804.html