Java使用阻塞队列BlockingQueue实现生产者消费者详解编程语言

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 1、支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 2、支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。
阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

Java中提供了几个对BlockingQueue的实现类,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等

在处理生产者/消费者问题上 我们将会使用ArrayBlockingQueue来实现,如下是我们需知道的重要方法:

  • put(E e): 这个方法用于向队列中插入元素,如果队列已满,需要等待可用的这间。
  • E take(): 这个方法用于从队列头部获取或者移除元素,如果队列为空则需要等待可用的元素。

使用BlockingQueue来解决生产者/消费者 示例

Mantou类

Producer产生的普通Java对象,并添加到队列中。

/** 
 * Producer产生的馒头类 
 * @author blog.ytso.com 
 *  
 */ 
public class Mantou { 
    private String mantou; 
 
    public Mantou(String mantou) { 
        this.mantou = mantou; 
    } 
 
    public String getMantou() { 
        return mantou; 
    } 
 
    public void setMantou(String mantou) { 
        this.mantou = mantou; 
    } 
}

Producer生产者类

Producer这个类会产生消息并将其放入队列中。

import java.util.concurrent.BlockingQueue; 
 
public class Producer implements Runnable { 
 
    BlockingQueue<Mantou> queue; 
 
    public Producer(BlockingQueue<Mantou> queue) { 
        this.queue = queue; 
    } 
 
    @Override 
    public void run() { 
        // 生产馒头 
        for (int i = 0; i < 100; i++) { 
            Mantou mt = new Mantou("" + i); 
            try { 
                Thread.sleep(100); 
                queue.put(mt); 
                System.out.println("生产馒头: " + mt.getMantou()); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
 
        // 添加退出消息 
        Mantou msg = new Mantou("exit"); 
        try { 
            queue.put(msg); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
 
}

Consumer消费者类

Consumer类会从队列获取消息进行处理。如果获取的是退出消息则结束。

import java.util.concurrent.BlockingQueue; 
 
public class Consumer implements Runnable { 
    BlockingQueue<Mantou> queue; 
 
    public Consumer(BlockingQueue<Mantou> queue) { 
        this.queue = queue; 
    } 
 
    @Override 
    public void run() { 
        try { 
            Mantou mantou; 
            // 获取并处理消息直到接收到“exit”消息 
            while (!(mantou = queue.take()).getMantou().equals("exit")) { 
                Thread.sleep(100); 
                System.out.println("消费馒头: " + mantou.getMantou()); 
            } 
 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
 
    } 
 
}

ProducerConsumerService

生产者/消费者的服务类将会产生固定大小的BlockingQueue,生产者和消费者同时共享该BlockingQueue,该服务类会起启动生产者和消费者线程。

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
 
/** 
 * @author blog.ytso.com 
 * 
 */ 
public class ProducerConsumerService { 
 
    public static void main(String[] args) { 
        // 创建大小为10的 BlockingQueue 
        BlockingQueue<Mantou> queue = new ArrayBlockingQueue<Mantou>(10); 
        Producer producer = new Producer(queue); 
        Consumer consumer = new Consumer(queue); 
        // 开启 producer线程向队列中生产消息 
        new Thread(producer).start(); 
        // 开启 consumer线程 中队列中消费消息 
        new Thread(consumer).start(); 
        System.out.println("Producer and Consumer has been started"); 
 
    } 
}

程序运行结果:

Producer and Consumer has been started 
生产馒头: 0 
生产馒头: 1 
消费馒头: 0 
消费馒头: 1 
生产馒头: 2 
消费馒头: 2 
生产馒头: 3 
消费馒头: 3 
生产馒头: 4 
消费馒头: 4 
生产馒头: 5 
消费馒头: 5 
生产馒头: 6 
消费馒头: 6 
......

参考

[1]: Java并发编程的艺术
[2]: http://www.cnblogs.com/tonyspark/p/3722013.html

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/14184.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论