前言背景
在做新项目,作为中间件的项目,主要做数据服务。这次想把项目做的简洁一些,之前用的什么ActiveMq等中间件产品,这次全部不用,能自己实现就自己实现。自己用BlockingQueue阻塞队列,按照自己的数据量,1G内存也能存上两千多万数据。设计上,需要一个线程去阻塞队列中拿数据,必须是系统启动的时候就去取。没有则阻塞,直到有数据来。
首先一个问题是,在spring项目中,自定义的New对象和线程,是不受spring管理的。所以在以前的处理中,经常是写一个单例,获取ApplicationContext,这种方式是可行的。但是在用注解的时候,使用这种方式,看上去会比较丑陋。所以,我们还是希望,在使用线程的时候,可以 像以前那样,可以自动的注入我们需要的bean。
开始动手
还是用实际的项目来举个栗子。
作为数据服务,入口是一个webservices的接口,这里因为历史原因,还是采用soap方式。
入口如下:
@WebMethod public FeedResult send(NocPacket nocPacket) { logger.info("Receive data"); QueueManager.getInstance().put(nocPacket,Const.nocQueue); return "successful"; }
这里采用生产者和消费者的模式,其中的Const.nocQueue,是我们的BlockingQueue,作为数据缓冲区。
既然如此,还应该有一个消费者。
之前说过,我们要在系统启动的时候,就去queue中取我们的数据,有则拿出来,作为业务处理,没有则阻塞,等待数据到来。同时,拿到数据后,将这些数据入库。
这里需要两个类,一个是随系统一起启动的,暂且叫守护程序吧。另一个是做主业务的。
守护程序代码:
/** * Created by Wl on 15-6-17. * 业务消费者 */ @Component public class Business { private Logger logger = Logger.getLogger(Business.class); @Autowired private FlowDataService<FlowData> flowDataService; public void doBusi() { logger.info("业务线程开始"); ExecutorService executor = Executors.newCachedThreadPool(); BusiTask task = new BusiTask(Const.nocQueue,flowDataService); executor.submit(task); FlowData data = flowDataService.queryById(1); logger.info("Get Data :"+data.getPlateNo()); } }
处理业务的类:
/** * Created by Wl on 15-6-17. * 业务处理主方法 */ public class BusiTask implements Callable<BlockingQueue<NocPacket>> { private Logger logger = Logger.getLogger(BusiTask.class); private BlockingQueue<NocPacket> queue; private FlowDataService<FlowData> flowDataService; public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) { this.queue = queue; this.flowDataService = flowDataService; } public BlockingQueue<NocPacket> call() throws Exception { NocPacket nocPacket = queue.take(); logger.info("Data:"+nocPacket.getGWName()); Payload sc = NocUtil.getPayload(nocPacket); FlowData data = new FlowData(); flowDataService.add(data); logger.info("成功插入数据"); return null; } }
此处,守护程序是随系统一起启动的,这一点可以用spring配置。这个守护程序不能有特殊性,它不能是我们自建的一个线程,也不能new一个方法,再去调用doBusi,因为这样就不受spring管理了。它的第一个目的是拿到注入的service。
这两个类,采用Future和Callable的方式进行异步线程处理的。如果直接去处理BusiTask中的任务,就会阻塞到queue的take上。同时,通过守护程序,BusiTask也可以拿到services,进行入库等操作。
结尾
spring是可以进行多线程开发的,目前没使用过,以后有时间好好研究下。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11544.html