java阻塞队列详解编程语言

对消息的处理有些麻烦,要保证各种确认。为了确保消息的100%发送成功,笔者在之前的基础上做了一些改进。其中要用到多线程,用于重复发送信息。

所以查了很多关于线程安全的东西,也看到了阻塞队列,发现这个模式很不错,可惜我目前用不到。

关于这个的讲解已经很多了,阻塞这个,就是当队列中没有数据的时候,线程读取的话会等待。当队列中的数据满的时候,线程添加数据的时候,也会等待。

有个例子很生动形象,往盘子里面放鸡蛋,只能放固定数目的。盘子里面没有鸡蛋,无法从中拿出来。当盘子里满了,也放不进去。直到被拿出去才能在放。

代码如下,这里设置的是一个盘子最多放10个鸡蛋:

package com.thread.two; 
 
import java.util.ArrayList; 
import java.util.List; 
 
public class Plate { 
    List<Object> eggs=new ArrayList<Object>();public synchronized Object getEgg(){while(eggs.size()==0){ 
            try { 
                wait(); 
            } catch (InterruptedException e) { 
                // TODO Auto-generated catch block 
                e.printStackTrace(); 
            } 
        } 
        Object egg=null; 
        for (int i = 0; i < 10; i++) { 
            egg=eggs.get(i); 
            System.out.println("拿到鸡蛋........."); 
        } 
        //Object egg=eggs.get(0); 
        eggs.clear(); 
        notify(); 
        //System.out.println("拿到鸡蛋........."); 
        return egg; 
    } 
     
    public synchronized void putEgg(Object egg){ 
while(eggs.size()>9){ 
            try { 
                wait(); 
            } catch (InterruptedException e) { 
                // TODO Auto-generated catch block 
                e.printStackTrace(); 
            } 
        } 
        eggs.add(egg); 
        notify(); 
        System.out.println("放入鸡蛋........."); 
    } 
     
    static class AddThread extends Thread{ 
        private Plate plate; 
        private Object egg=new Object(); 
        public AddThread(Plate plate){ 
            this.plate=plate; 
        } 
        public void run(){ 
            for (int i = 0; i < 1000; i++) { 
                plate.putEgg(egg); 
            } 
        } 
    } 
     
    static class GetThread extends Thread{ 
        private Plate plate; 
        public GetThread(Plate plate){ 
            this.plate=plate; 
        } 
        public void run(){ 
            for (int i = 0; i < 1000; i++) { 
                plate.getEgg(); 
            } 
        } 
    } 
     
    public static void main(String[] args) throws InterruptedException { 
        Plate plate=new Plate(); 
        Thread add=new Thread(new AddThread(plate)); 
        Thread get=new Thread(new GetThread(plate)); 
        add.start(); 
        get.start(); 
        add.join(); 
        get.join(); 
        System.out.println("测试结束"); 
    } 
     
}

这个例子很形象,用线程实现了上面所说的。

java现在有concurrent包,里面有很多现成的可以用的类,很多是线程安全的,这样,像上面写的put或者get,都不需要自己写同步方法了,这些类已经包装好了。

这里有一个ArrayBlockingQueue的例子,和上面实现的差不多。

首先是两个线程,分别是put和get。

ThreadPut:

package com.thread.three; 
 
import java.util.concurrent.ArrayBlockingQueue; 
 
public class ThreadPut implements Runnable{ 
    private ArrayBlockingQueue<String> abq=null; 
    public ThreadPut(ArrayBlockingQueue<String> abq){ 
        this.abq=abq; 
    } 
    public void run() { 
        // TODO Auto-generated method stub 
        while(true){ 
            System.out.println("要向队列中存数据了"); 
            try { 
                Thread.sleep(1000); 
                abq.put("hi"); 
                System.out.println("存入后,数据一共为:"+abq.size()); 
            } catch (InterruptedException e) { 
                // TODO Auto-generated catch block 
                e.printStackTrace(); 
            } 
        } 
    } 
}

ThreadGet:

package com.thread.three; 
 
import java.util.concurrent.ArrayBlockingQueue; 
 
public class ThreadGet extends Thread { 
    ArrayBlockingQueue<String> abq=null; 
    public ThreadGet(ArrayBlockingQueue<String> abq){ 
        this.abq=abq; 
    } 
    @Override 
    public void run() { 
        // TODO Auto-generated method stub 
        while(true){ 
            try { 
                Thread.sleep(2000); 
            } catch (InterruptedException e) { 
                // TODO Auto-generated catch block 
                e.printStackTrace(); 
            } 
            System.out.println("我要从队列中取数据了"); 
            String msg=null; 
            if (abq.size()>0) { 
                msg=abq.remove(); 
            } 
            System.out.println("队列中取得的数据为:"+msg+",队列中还有一共:"+abq.size()); 
        } 
    } 
}

测试类:

public class ArrayBlockQueueApp { 
     
    public static void main(String[] args) { 
        ExecutorService  es=Executors.newCachedThreadPool(); 
        ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(10); 
        ThreadGet tGet=new ThreadGet(abq); 
        Thread tPut=new Thread(new ThreadPut(abq)); 
        es.execute(tGet); 
        es.execute(tPut); 
    } 
}

这些队列放消息的话挺不错的。

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11546.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论