定制并发类(十)实现一个基于优先级的传输队列

声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷

实现一个基于优先级的传输队列

Java 7 API 提供几种与并发应用相关的数据类型。从这里面,我们想来重点介绍以下2种数据类型:

  • LinkedTransferQueue:这个数据类型支持那些有生产者和消费者结构的程序。 在那些应用,你有一个或者多个数据生产者,一个或多个数据消费者和一个被生产者和消费者共享的数据类型。生产者把数据放入数据结构内,然后消费者从数据结构内提取数据。如果数据结构为空,消费者会被阻塞直到有数据可以消费。如果数据结构满了,生产者就会被阻塞直到有空位来放数据。
  • PriorityBlockingQueue:在这个数据结构,元素是按照顺序储存的。元素们必须实现 带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。

LinkedTransferQueue 的元素是按照抵达顺序储存的,所以越早到的越先被消耗。你有可能需要开发 producer/ consumer 程序,它的消耗顺序是由优先级决定的而不是抵达时间。在这个指南,你将学习如何实现在 producer/ consumer 问题中使用的数据结构,这些元素将被按照他们的优先级排序,级别高的会先被消耗。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans,打开并创建一个新的java任务。

怎么做呢…

按照这些步骤来实现下面的例子::

//1.   创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue 接口。
public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
//2.   声明一个私有 AtomicInteger 属性,名为 counter,用来储存正在等待元素的消费者的数量。
private AtomicInteger counter;
//3.   声明一个私有 LinkedBlockingQueue 属性,名为 transferred。
private LinkedBlockingQueue<E> transfered;
//4.   声明一个私有 ReentrantLock 属性,名为 lock。
private ReentrantLock lock;
//5.   实现类的构造函数,初始化它的属性值。
public MyPriorityTransferQueue() {
counter=new AtomicInteger(0);
lock=new ReentrantLock();
transfered=new LinkedBlockingQueue<E>();
}
//6.   实现 tryTransfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,此方法返回 false 值。
@Override
public boolean tryTransfer(E e) {
lock.lock();
boolean value;
if (counter.get()==0) {
value=false;
} else {
put(e);
value=true;
}
lock.unlock();
return value;
}
//7.	实现 transfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,
此方法把元素存入一个特殊queue,为了发送给第一个尝试获取一个元素的消费者并阻塞线程直到元素被消耗。
@Override
public void transfer(E e) throws InterruptedException {
lock.lock();
if (counter.get()!=0) {
put(e);
lock.unlock();
} else {
transfered.add(e);
lock.unlock();
synchronized (e) {
e.wait();
}
}
}
//8.   实现 tryTransfer() 方法,它接收3个参数: 元素,和需要等待消费者的时间(如果没有消费者的话),和用来注明时间的单位。如果有消费者在等待,立刻发送元素。否则,转化时间到毫秒并使用 wait() 方法让线程进入休眠。当消费者取走元素时,如果线程在 wait() 方法里休眠,你将使用 notify() 方法唤醒它。
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
if (counter.get()!=0) {
put(e);
lock.unlock();
return true;
} else {
transfered.add(e);
long newTimeout= TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.unlock();
e.wait(newTimeout);
lock.lock();
if (transfered.contains(e)) {
transfered.remove(e);
lock.unlock();
return false;
} else {
lock.unlock();
return true;
}
}
}
//9.   实现 hasWaitingConsumer() 方法。使用 counter 属性值来计算此方法的返回值。如果counter 的值大于0,放回 true。不然,返回 false。
@Override
public boolean hasWaitingConsumer() {
return (counter.get()!=0);
}
//10. 实现 getWaitingConsumerCount() 方法。返回counter 属性值。
@Override
public int getWaitingConsumerCount() {
return counter.get();
}
//11.实现 take() 方法。此方法是当消费者需要元素时被消费者调用的。首先,获取之前定义的锁并增加在等待的消费者数量。
@Override
public E take() throws InterruptedException {
lock.lock();
counter.incrementAndGet();
//12.如果在 transferred queue 中无任何元素。释放锁并使用 take() 方法尝试从queue中获取元素,此方法将让线程进入睡眠直到有元素可以消耗。
E value=transfered.poll();
if (value==null) {
lock.unlock();
value=super.take();
lock.lock();
//13. 否则,从transferred queue 中取走元素并唤醒正在等待要消耗元素的线程(如果有的话)。
} else {
synchronized (value) {
value.notify();
}
}
//14. 最后,增加正在等待的消费者的数量并释放锁。
counter.decrementAndGet();
lock.unlock();
return value;
}
//15. 实现一个类,名为 Event,扩展 Comparable 接口,把 Event 类参数化。
public class Event implements Comparable<Event> {
//16. 声明一个私有 String 属性,名为 thread,用来储存创建事件的线程的名字。
private String thread;
//17.  声明一个私有 int 属性,名为 priority,用来储存事件的优先级。
private int priority;
//18. 实现类的构造函数,初始化它的属性值。
public Event(String thread, int priority){
this.thread=thread;
this.priority=priority;
}
//19. 实现一个方法,返回 thread 属性值。
public String getThread() {
return thread;
}
//20. 实现一个方法,返回 priority  属性值。
public int getPriority() {
return priority;
}
//21. 实现 compareTo() 方法。此方法把当前事件与接收到的参数事件进行对比。返回 -1,如果当前事件的优先级的级别高于参数;返回 1,如果当前事件的优先级低于参数;如果相等,则返回 0。你将获得一个按优先级递减顺序排列的list。有高等级的事件就会被排到queue的最前面。
public int compareTo(Event e) {
if (this.priority>e.getPriority()) {
return -1;
} else if (this.priority<e.getPriority()) {
return 1;
} else {
return 0;
}
}
//22. 实现一个类,名为 Producer,它实现 Runnable 接口。
public class Producer implements Runnable {
//23. 声明一个私有 MyPriorityTransferQueue 属性,接收参数化的 Event  类属性,名为 buffer,用来储存这个生产者生成的事件。
private MyPriorityTransferQueue<Event> buffer;
//24. 实现类的构造函数,初始化它的属性值。
public Producer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
//25. 这个类的实现 run() 方法。创建 100 个 Event 对象,用他们被创建的顺序决定优先级(越先创建的优先级越高)并使用 put() 方法把他们插入queue中。
public void run() {
for (int i=0; i<100; i++) {
Event event=new Event(Thread.currentThread().getName(),i);
buffer.put(event);
}
}
//26. 实现一个类,名为 Consumer,它要实现 Runnable 接口。
public class Consumer implements Runnable {
//27.  声明一个私有 MyPriorityTransferQueue 属性,参数化 Event 类属性,名为 buffer,用来获取这个类的事件消费者。
private MyPriorityTransferQueue<Event> buffer;
//28. 实现类的构造函数,初始化它的属性值。
public Consumer(MyPriorityTransferQueue<Event> buffer) {
this.buffer=buffer;
}
//29. 实现 run() 方法。它使用 take() 方法消耗1002 Events (这个例子实现的全部事件)并把生成事件的线程数量和它的优先级别写入操控台。
@Override
public void run() {
for (int i=0; i<1002; i++) {
try {
Event value=buffer.take();
System.out.printf("Consumer: %s: %d/n",value. getThread(),value.getPriority());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//30. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。
public class Main {
public static void main(String[] args) throws Exception {
//31. 创建一个 MyPriorityTransferQueue 对象,名为 buffer。
MyPriorityTransferQueue<Event> buffer=new MyPriorityTransferQu eue<Event>();
//32. 创建一个 Producer 任务并运行 10 线程来执行任务。
Producer producer=new Producer(buffer);
Thread producerThreads[]=new Thread[10];
for (int i=0; i<producerThreads.length; i++) {
producerThreads[i]=new Thread(producer);
producerThreads[i].start();
}
//33.创建并运行一个 Consumer 任务。
Consumer consumer=new Consumer(buffer);
Thread consumerThread=new Thread(consumer);
consumerThread.start();
//34. 写入当前的消费者数量。
System.out.printf("Main: Buffer: Consumer count: %d/n",buffer. getWaitingConsumerCount());
//35. 使用 transfer() 方法传输一个事件给消费者。
Event myEvent=new Event("Core Event",0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered./n");
//36. 使用 join() 方法等待生产者的完结。
for (int i=0; i<producerThreads.length; i++) {
try {
producerThreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//37.  让线程休眠1秒。
TimeUnit.SECONDS.sleep(1);
//38.写入当前的消费者数量。
System.out.printf("Main: Buffer: Consumer count: %d/n",buffer. getWaitingConsumerCount());
//39. 使用 transfer() 方法传输另一个事件。
myEvent=new Event("Core Event 2",0);
buffer.transfer(myEvent);
//40. 使用 join() 方法等待消费者完结。
consumerThread.join();
//41. 写信息表明程序结束。
System.out.printf("Main: End of the program/n");

它是怎么工作的…

在这个指南,你已经实现了 MyPriorityTransferQueue 数据结构。这个数据类型是在 producer/consumer 问题中使用的,它的元素是按照优先级排列的。由于 Java 不支持多个继承,所以你首先要决定的是 MyPriorityTransferQueue 类的基类。你扩展了 PriorityBlockingQueue 类,来实现在结构中插入数据按照优先级排序。你也实现了 TransferQueue 接口,添加了与 producer/consumer 相关的3个方法。

MyPriortyTransferQueue 类有以下2个属性:

在 MyPriorityTransferQueue 中,你实现了一些方法。全部方法都在 TransferQueue 接口中声明了和在PriorityBlockingQueue 接口实现的 take() 方法。在之前已经描述了2个方法了。来看看剩下的方法的描述:

否则,把元素储存到已传输的元素list 并阻塞线程直到元素被消耗。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。

  • tryTransfer(E e, long timeout, TimeUnit unit): 此方法与 transfer() 方法相似,只是它的线程被阻塞的时间段是由参数决定的。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。
  • take(): 此方法返回下一个要被消耗的元素。如果在 transferred 元素list中有元素,就从list中取走元素。否则,就从 priority queue 中取元素。

一旦你实现了数据类型,你就实现了 Event 类。它就是在数据类型里储存的元素构成的类。Event 类有2个属性用来储存生产者的ID和事件的优先级,并实现了 Comparable 接口,为了满足你的数据类型的需要。

接着,你实现了 Producer 和 Consumer 类。在这个例子中,你有 10 个生产者和一个消费者,他们共享同一个 buffer。每个生产者生成100个事件,他们的优先级是递增的, 所以有高优先级的事件在越后面才生成。

例子的主类创建了一个 MyPriorityTransferQueue 对象,10个生产者,和一个消费者,然后使用MyPriorityTransferQueue buffer 的 transfer() 方法来传输2个事件到 buffer。

以下截图是程序运行的部分输出:

7-10

你可以发现有着高级别的事件如何先被消费,和一个消费者如何消费传输的事件。

参见
第六章,并发集:Using blocking thread-safe lists ordered by priority
第六章,并发集:Using blocking thread-safe lists

原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/140856.html

(0)
上一篇 2021年9月5日
下一篇 2021年9月5日

相关推荐

发表回复

登录后才能评论