对消息的处理有些麻烦,要保证各种确认。为了确保消息的100%发送成功,笔者在之前的基础上做了一些改进。其中要用到多线程,用于重复发送信息。
所以查了很多关于线程安全的东西,也看到了阻塞队列,发现这个模式很不错,可惜我目前用不到。
关于这个的讲解已经很多了,阻塞这个,就是当队列中没有数据的时候,线程读取的话会等待。当队列中的数据满的时候,线程添加数据的时候,也会等待。
有个例子很生动形象,往盘子里面放鸡蛋,只能放固定数目的。盘子里面没有鸡蛋,无法从中拿出来。当盘子里满了,也放不进去。直到被拿出去才能在放。
代码如下,这里设置的是一个盘子最多放10个鸡蛋:
package com.thread.two; import java.util.ArrayList; import java.util.List; public class Plate { List<Object> eggs=new ArrayList<Object>();public synchronized Object getEgg(){while(eggs.size()==0){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } Object egg=null; for (int i = 0; i < 10; i++) { egg=eggs.get(i); System.out.println("拿到鸡蛋........."); } //Object egg=eggs.get(0); eggs.clear(); notify(); //System.out.println("拿到鸡蛋........."); return egg; } public synchronized void putEgg(Object egg){ while(eggs.size()>9){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } eggs.add(egg); notify(); System.out.println("放入鸡蛋........."); } static class AddThread extends Thread{ private Plate plate; private Object egg=new Object(); public AddThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.putEgg(egg); } } } static class GetThread extends Thread{ private Plate plate; public GetThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.getEgg(); } } } public static void main(String[] args) throws InterruptedException { Plate plate=new Plate(); Thread add=new Thread(new AddThread(plate)); Thread get=new Thread(new GetThread(plate)); add.start(); get.start(); add.join(); get.join(); System.out.println("测试结束"); } }
这个例子很形象,用线程实现了上面所说的。
java现在有concurrent包,里面有很多现成的可以用的类,很多是线程安全的,这样,像上面写的put或者get,都不需要自己写同步方法了,这些类已经包装好了。
这里有一个ArrayBlockingQueue的例子,和上面实现的差不多。
首先是两个线程,分别是put和get。
ThreadPut:
package com.thread.three; import java.util.concurrent.ArrayBlockingQueue; public class ThreadPut implements Runnable{ private ArrayBlockingQueue<String> abq=null; public ThreadPut(ArrayBlockingQueue<String> abq){ this.abq=abq; } public void run() { // TODO Auto-generated method stub while(true){ System.out.println("要向队列中存数据了"); try { Thread.sleep(1000); abq.put("hi"); System.out.println("存入后,数据一共为:"+abq.size()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
ThreadGet:
package com.thread.three; import java.util.concurrent.ArrayBlockingQueue; public class ThreadGet extends Thread { ArrayBlockingQueue<String> abq=null; public ThreadGet(ArrayBlockingQueue<String> abq){ this.abq=abq; } @Override public void run() { // TODO Auto-generated method stub while(true){ try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("我要从队列中取数据了"); String msg=null; if (abq.size()>0) { msg=abq.remove(); } System.out.println("队列中取得的数据为:"+msg+",队列中还有一共:"+abq.size()); } } }
测试类:
public class ArrayBlockQueueApp { public static void main(String[] args) { ExecutorService es=Executors.newCachedThreadPool(); ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(10); ThreadGet tGet=new ThreadGet(abq); Thread tPut=new Thread(new ThreadPut(abq)); es.execute(tGet); es.execute(tPut); } }
这些队列放消息的话挺不错的。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11546.html