Spring与多线程详解编程语言

前言背景

在做新项目,作为中间件的项目,主要做数据服务。这次想把项目做的简洁一些,之前用的什么ActiveMq等中间件产品,这次全部不用,能自己实现就自己实现。自己用BlockingQueue阻塞队列,按照自己的数据量,1G内存也能存上两千多万数据。设计上,需要一个线程去阻塞队列中拿数据,必须是系统启动的时候就去取。没有则阻塞,直到有数据来。
首先一个问题是,在spring项目中,自定义的New对象和线程,是不受spring管理的。所以在以前的处理中,经常是写一个单例,获取ApplicationContext,这种方式是可行的。但是在用注解的时候,使用这种方式,看上去会比较丑陋。所以,我们还是希望,在使用线程的时候,可以 像以前那样,可以自动的注入我们需要的bean。

开始动手

还是用实际的项目来举个栗子。
作为数据服务,入口是一个webservices的接口,这里因为历史原因,还是采用soap方式。
入口如下:

      @WebMethod 
        public FeedResult send(NocPacket nocPacket) { 
                logger.info("Receive data"); 
                QueueManager.getInstance().put(nocPacket,Const.nocQueue); 
                return "successful"; 
        }

这里采用生产者和消费者的模式,其中的Const.nocQueue,是我们的BlockingQueue,作为数据缓冲区。
既然如此,还应该有一个消费者。
之前说过,我们要在系统启动的时候,就去queue中取我们的数据,有则拿出来,作为业务处理,没有则阻塞,等待数据到来。同时,拿到数据后,将这些数据入库。
这里需要两个类,一个是随系统一起启动的,暂且叫守护程序吧。另一个是做主业务的。
守护程序代码:

/** 
 * Created by Wl on 15-6-17. 
 * 业务消费者 
 */ 
 
@Component 
public class Business  { 
        private Logger logger = Logger.getLogger(Business.class); 
 
        @Autowired 
        private FlowDataService<FlowData> flowDataService; 
 
        public void doBusi() { 
                logger.info("业务线程开始"); 
                ExecutorService executor = Executors.newCachedThreadPool(); 
                BusiTask task = new BusiTask(Const.nocQueue,flowDataService); 
                executor.submit(task); 
                FlowData data = flowDataService.queryById(1); 
                logger.info("Get Data :"+data.getPlateNo()); 
        } 
}

处理业务的类:

/** 
 * Created by Wl on 15-6-17. 
 * 业务处理主方法 
 */ 
public class BusiTask implements Callable<BlockingQueue<NocPacket>> { 
        private Logger logger = Logger.getLogger(BusiTask.class); 
        private BlockingQueue<NocPacket> queue; 
        private FlowDataService<FlowData> flowDataService; 
 
        public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) { 
                this.queue = queue; 
                this.flowDataService = flowDataService; 
        } 
 
        public BlockingQueue<NocPacket> call() throws Exception { 
                NocPacket nocPacket = queue.take(); 
                logger.info("Data:"+nocPacket.getGWName()); 
                Payload sc = NocUtil.getPayload(nocPacket); 
                FlowData data = new FlowData(); 
                flowDataService.add(data); 
                logger.info("成功插入数据"); 
                return null; 
        } 
}

此处,守护程序是随系统一起启动的,这一点可以用spring配置。这个守护程序不能有特殊性,它不能是我们自建的一个线程,也不能new一个方法,再去调用doBusi,因为这样就不受spring管理了。它的第一个目的是拿到注入的service。
这两个类,采用Future和Callable的方式进行异步线程处理的。如果直接去处理BusiTask中的任务,就会阻塞到queue的take上。同时,通过守护程序,BusiTask也可以拿到services,进行入库等操作。

结尾

spring是可以进行多线程开发的,目前没使用过,以后有时间好好研究下。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11544.html

(0)
上一篇 2021年7月19日 11:23
下一篇 2021年7月19日 11:23

相关推荐

发表回复

登录后才能评论