Java消息队列–ActiveMq详解编程语言

1.下载安装 ActiveMq 


 

  activemq的官方下载地址:http://activemq.apache.org/download.html

  Java消息队列--ActiveMq详解编程语言

  本次选择apache-activemq-5.15.4-bin(需要JDK1.8)版本下载,还提供了Windows 和Linux、Unix 等几个版本本次选择了Linux 版本

 Java消息队列--ActiveMq详解编程语言

  下载解压后,apache-activemq-5.15.4目录下的内容:

  Java消息队列--ActiveMq详解编程语言

  目录结构:

    bin存放的是脚本文件

    conf存放的是基本配置文件

    data存放的是日志文件

    docs存放的是说明文档

    examples存放的是简单的实例

    lib存放的是activemq所需jar包

    webapps用于存放项目的目录

2.启动activemq


  进入到apache-activemq-5.15.4安装目录的bin目录,linux 下输入 ./activemq start 启动activemq 服务

   输入命令之后,会提示我们创建了一个进程IP 号,这时候说明服务已经成功启动了

  Java消息队列--ActiveMq详解编程语言

  activemq 默认启动时,启动了内置的jetty服务器,提供一个用于监控activemq 的admin应用。 
  admin:IT虾米网

  如果用浏览器访问链接,显示无法访问网站,可能是端口的防火墙没开,需要开通下activemq的8161 和61616端口的防火墙,再进行访问就可以了

  账号/密码:admin/admin  Java消息队列--ActiveMq详解编程语言

 如上图,,activemq服务端就启动成功了

 activemq在linux 下的终止命令是 ./activemq stop

3.创建activemq工程


  本次创建的maven项目结构:

  Java消息队列--ActiveMq详解编程语言

  在官网下载activemq的时候,在目录下有一个jar包:

  Java消息队列--ActiveMq详解编程语言

  这个jar 是项目中进行开发中使用到的相关依赖,pom如下:

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>

</build>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
  创建生产者:
package com.activemq; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import javax.jms.*; 
import java.util.concurrent.atomic.AtomicInteger; 
/** 
 * @author ceshi 
 * @Title: Producer 
 * @ProjectName activemq 
 * @Description: 生产者 
 * @date 2018/6/2214:51 
 */ 
public class Producer { 
 
    //ActiveMq 的默认用户名 
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 
    //ActiveMq 的默认登录密码 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 
    //ActiveMQ 的链接地址 
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; 
 
    AtomicInteger count = new AtomicInteger(0); 
    //链接工厂 
    ConnectionFactory connectionFactory; 
    //链接对象 
    Connection connection; 
    //事务管理 
    Session session; 
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); 
 
    public void init(){ 
        try { 
            //创建一个链接工厂 
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); 
            //从工厂中创建一个链接 
            connection  = connectionFactory.createConnection(); 
            //开启链接 
            connection.start(); 
            //创建一个事务(这里通过参数可以设置事务的级别) 
            session = connection.createSession(true,Session.SESSION_TRANSACTED); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 
 
    public void sendMessage(String disname){ 
        try { 
            //创建一个消息队列 
            Queue queue = session.createQueue(disname); 
            //消息生产者 
            MessageProducer messageProducer = null; 
            if(threadLocal.get()!=null){ 
                messageProducer = threadLocal.get(); 
            }else{ 
                messageProducer = session.createProducer(queue); 
                threadLocal.set(messageProducer); 
            } 
            while(true){ 
                Thread.sleep(1000); 
                int num = count.getAndIncrement(); 
                //创建一条消息 
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:我是生产者,我现在正在生产东西!,count:"+num); 
                System.out.println(Thread.currentThread().getName()+ "productor:我是生产者,我现在正在生产东西!,count:"+num); 
                //发送消息 
                messageProducer.send(msg); 
                //提交事务 
                session.commit(); 
            } 
        } catch (JMSException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
}
  创建消费者:

  

package com.activemq; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import javax.jms.*; 
import java.util.concurrent.atomic.AtomicInteger; 
/** 
 * @author ceshi 
 * @Title: Consumer 
 * @ProjectName activemq 
 * @Description: 消费者 
 * @date 2018/6/2214:51 
 */ 
public class Consumer { 
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 
 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 
 
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; 
 
    ConnectionFactory connectionFactory; 
 
    Connection connection; 
 
    Session session; 
 
    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>(); 
    AtomicInteger count = new AtomicInteger(); 
 
    public void init(){ 
        try { 
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); 
            connection  = connectionFactory.createConnection(); 
            connection.start(); 
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 
        } catch (JMSException e) { 
            e.printStackTrace(); 
        } 
    } 
 
 
    public void getMessage(String disname){ 
        try { 
            Queue queue = session.createQueue(disname); 
            MessageConsumer consumer = null; 
 
            if(threadLocal.get()!=null){ 
                consumer = threadLocal.get(); 
            }else{ 
                consumer = session.createConsumer(queue); 
                threadLocal.set(consumer); 
            } 
            while(true){ 
                Thread.sleep(1000); 
                TextMessage msg = (TextMessage) consumer.receive(); 
                if(msg!=null) { 
                    msg.acknowledge(); 
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement()); 
                }else { 
                    break; 
                } 
            } 
        } catch (JMSException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
}
 activemq测试类

  

package com.activemq; 
import org.junit.Test; 
/** 
 * @author Ceshi 
 * @Title: JunitMq 
 * @ProjectName activemq 
 * @Description: Junit测试 
 * @date 2018/6/2214:53 
 */ 
public class JunitMq { 
 
    /** 
     * 测试生产者 
     */ 
    @Test 
    public void TestProducter(){ 
        Producer producter = new Producer(); 
        producter.init(); 
        JunitMq testMq = new JunitMq(); 
        try { 
            Thread.sleep(1000); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        //Thread 1 
        new Thread(testMq.new ProductorMq(producter)).start(); 
        //Thread 2 
        new Thread(testMq.new ProductorMq(producter)).start(); 
        //Thread 3 
        new Thread(testMq.new ProductorMq(producter)).start(); 
        //Thread 4 
        new Thread(testMq.new ProductorMq(producter)).start(); 
        //Thread 5 
        new Thread(testMq.new ProductorMq(producter)).start(); 
    } 
 
    /** 
     * 测试消费者 
     */ 
    @Test 
    public void TestConsumerMq(){ 
        Consumer comsumer = new Consumer(); 
        comsumer.init(); 
        JunitMq testConsumer = new JunitMq(); 
        new Thread(testConsumer.new ConsumerMq(comsumer)).start(); 
        new Thread(testConsumer.new ConsumerMq(comsumer)).start(); 
        new Thread(testConsumer.new ConsumerMq(comsumer)).start(); 
        new Thread(testConsumer.new ConsumerMq(comsumer)).start(); 
    } 
 
    /** 
     * 消费者 
     */ 
    private class ConsumerMq implements Runnable{ 
        Consumer comsumer; 
        public ConsumerMq(Consumer comsumer){ 
            this.comsumer = comsumer; 
        } 
        @Override 
        public void run() { 
            while(true){ 
                try { 
                    comsumer.getMessage("Jaycekon-MQ"); 
                    Thread.sleep(10000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
    } 
 
    private class ProductorMq implements Runnable{ 
        Producer producter; 
        public ProductorMq(Producer producter){ 
            this.producter = producter; 
        } 
        @Override 
        public void run() { 
            while(true){ 
                try { 
                    producter.sendMessage("Jaycekon-MQ"); 
                    Thread.sleep(10000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
    } 
}

  生产者开始生产消息

  Java消息队列--ActiveMq详解编程语言

Java消息队列--ActiveMq详解编程语言

  消费者开始消费消息

  Java消息队列--ActiveMq详解编程语言

查看运行结果,访问activemq 服务端:IT虾米网 里面的Queues 中查看生产的消息。

Java消息队列--ActiveMq详解编程语言

 4.activemq特性


 activemq的特性 

  支持多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

  通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  支持通过JDBC和journal提供高速的消息持久化
  从设计上保证了高性能的集群,客户端-服务器,点对点
  支持Ajax
  支持与Axis的整合

  自动重连
  可以很容易得调用内嵌JMS provider,进行测试
 
activemq使用场景

  多个项目之间集成
  (1) 跨平台
  (2) 多语言
  (3) 多项目
    降低系统间模块的耦合度,解耦
    (1) 软件扩展性
    系统前后端隔离
    (1) 前后端隔离,屏蔽高安全区

  

  

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/18543.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论