JMS学习之理论基础详解编程语言

本文代码使用ActiveMq5.6

一、什么是JMS

JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。它给消息中间件生产商提供了一个统一API的标准。
第一个版本1998年,目前最新的2.0版本(2013年第一季度发布)
JMS规范地址:
http://www.oracle.com/technetwork/java/docs-136352.html(1.1版本)
http://download.oracle.com/otndocs/jcp/jms-2_0-fr-eval-spec/index.html(2.0版本)

中间件(MOM):中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。也就是应用程序A与应用程序B之间不是直接交互,而是通过中间件进行资源共享。

JMS学习之理论基础详解编程语言

Active MQ的主要功能就是实现JMS Provider,用来实现高可用、高性能、可伸缩、易用、安全的企业级面向消息服务的系统。

消息中间件的功能:将信息按照消息的形式,从一个应用程序传输到另一个或多个应用程序。

消息中间件的特点:

  •   消息的接受是异步,犹如发短信,发送消息的人不必等待接受消息人的响应,以此减少多系统之间的耦合度。
  •        消息的可靠性,为确保消息在中间节可靠保存,只有在接受方收到消息后,才删除消息,多个消息也可以组册原子事务。

应用场景:多个系统间通讯的时候,一般需要保证:

  •   可靠传输,数据不可以丢失,有的时候,也会保证数据不可以重复传输。
  •        异步传输,多个系统同步调用会因为互相等待而造成系统瓶颈。

二、JMS消息模型

1、JMS元素

  • JMS provider:An implementation of the JMS interface for a Message Oriented Middleware (MOM)  面向消息中间件的JMS接口的实现

  • JMS client:An application or process that produces and/or receives messages. 产生和/或接收消息的应用程序或过程

  • JMS producer/publisher:A JMS client that creates and sends messages. 创建和发送消息的JMS客户端。

  • JMS consumer/subscriber:A JMS client that receives messages. 接收消息的JMS客户机。

  • JMS message(header/payload):An object that contains the data being transferred between JMS clients. 包含JMS客户机之间传输数据的对象

  • JMS queue:A staging area that contains messages that have been sent and are waiting to be read. A JMS queue only guarantees that each message is processed only once.包含已发送并等待被读取的消息的暂存区域。JMS队列只保证每个消息只处理一次。

  • JMS topic:A distribution mechanism for publishing messages that are delivered to multiple subscribers. 发布消息传递给多个订阅服务器的分发机制。

2、JMS消息模型

a、点对点的消息模型: 每个消息只能有一个消费者。

 JMS学习之理论基础详解编程语言

消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

b、发布订阅消息模型   每个消息可以有多个消费者

JMS学习之理论基础详解编程语言

生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

三、JMS消息接口

JMS支持两种消息类型P -to- P 和 Pub-Sub 这两种消息类型都继承统一接口JMS Parent,主要接口是

JMS Parent

P -to -P

Pub – Sub

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

QueueDestination

TopicDestination

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

1、接口描述:

ConnectionFactory:连接工厂,JMS创建连接的方式

Connection:JMS客户端与JMS Provider的连接(通过ConnectionFactory创建的)。

Destination:消息的目的地

Session:一个接收或发送消息的一次回话

MessageProducer:由session对象创建的用于发送消息的对象

MessageConsumer:由session对象创建的用来接收消息的对象

2、JMS的创建流程

JMS学习之理论基础详解编程语言

  • 创建ConnectionFactory工厂
  • 通过工厂创建Connection连接
  • 通过连接创建一个连接回话Session
  • 通过Session创建消息的生产者MessageProducer和消息的消费者MessageConsumer;同时Session也创建一个消息Mesage。
  • 消息的生产者MessageProducer将该消息发送到目的地中Destination;消费者同时监听该消息目的地Destination。

P -to- P模型:

JMS学习之理论基础详解编程语言

生产者:

package com.jalja.org.base.JMS; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 
public class ProducerJMS { 
    private static String url="http://localhost:61616";//ActiveMq的地址 
    private static String queueName="queue-ch2";//创建一个队列 
    public static void main(String[] args) throws JMSException { 
        // 
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(); 
        Connection conn = connFactory.createConnection(); 
        conn.start();//开启连接 
        //创建Session 
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
        //创建一个p -to -P 类型的消息队列(消息目的地) 
        Destination des = session.createQueue(queueName); 
        //将消息交给消息发送者 
        MessageProducer producer = session.createProducer(des); 
        //创建一个text类型的消息 
        TextMessage msg  = session.createTextMessage(); 
        msg.setText("Hello World!"); 
        //将消息发送到消息队列中 
        producer.send(msg); 
        //关闭资源 
        session.close(); 
        conn.close(); 
    } 
}

消费者

package com.jalja.org.base.JMS; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.MessageConsumer; 
import javax.jms.Session; 
import org.apache.activemq.ActiveMQConnectionFactory; 
public class ConsumerA { 
    private static String url = "tcp://localhost:61616"; 
    private static String queueName = "queue-ch2"; 
    public static void main(String[] args) throws Exception { 
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(); 
        Connection conn = connFactory.createConnection(); 
        conn.start();//启动连接 
        //创建Session 
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
        //创建一个p -to -P 类型的消息队列(消息目的地) 
        Destination des = session.createQueue(queueName); 
        //创建一个消息消费者 并指定其消费信息的目的地 
        MessageConsumer consumer  = session.createConsumer(des); 
        //监听消息队列 
        Listener listener = new Listener(); 
        listener.setForm("ConsumerA"); 
        consumer.setMessageListener(listener); 
         
    } 
}

消息监听器:

package com.jalja.org.base.JMS; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 
public class Listener implements MessageListener { 
    private String form = null; 
    /** 
     * 监听消息队列,如果该消息队列有消息,该监听器就可以获取消息 
     */ 
    @Override 
    public void onMessage(Message message) { 
        TextMessage msg  = (TextMessage)message; 
        try { 
            System.out.println(getForm() + ":" + msg.getText()); 
        } catch (JMSException e) { 
            e.printStackTrace(); 
        } 
    } 
    public String getForm() { 
        return form; 
    } 
    public void setForm(String form) { 
        this.form = form; 
    } 
 
}

 

 Pub – Sub 模型

JMS学习之理论基础详解编程语言

 

生产者:

package ch02.pubsub; 
 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 
 
/* 
 * pubsub MQ生产者 
 */ 
public class Publisher { 
    private static String url = "tcp://localhost:8161"; 
    private static String topicName = "topic.ch02"; 
    public static void main(String[] args) throws Exception { 
        // TODO Auto-generated method stub 
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(); 
        Connection conn = connFactory.createConnection(); 
        conn.start(); 
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
        Destination des = session.createTopic(topicName); 
        MessageProducer publisher = session.createProducer(des); 
        TextMessage msg  = session.createTextMessage(); 
        msg.setText("Hello World!"); 
        publisher.send(msg); 
        session.close(); 
        conn.close(); 
    } 
}

消费者:

package ch02.pubsub; 
 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageConsumer; 
import javax.jms.Session; 
 
import org.apache.activemq.ActiveMQConnectionFactory; 
 
/** 
 *  
 * PTP 消费者A 
 * 
 */ 
public class SubscriberA { 
     
    private static String url = "tcp://localhost:8161"; 
    private static String topicName = "topic.ch02"; 
     
    /** 
     * @param args 
     * @throws Exception  
     */ 
    public static void main(String[] args) throws Exception { 
        // TODO Auto-generated method stub 
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(); 
        Connection conn = connFactory.createConnection(); 
        conn.start(); 
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
        Destination des = session.createTopic(topicName); 
        MessageConsumer consumer  = session.createConsumer(des); 
        Listener listener = new Listener(); 
        listener.setForm("SubscriberA"); 
        consumer.setMessageListener(listener); 
         
    } 
 
}

消息监听器:

package ch02.pubsub; 
 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 
 
public class Listener implements MessageListener { 
     
    private String form = null; 
 
    @Override 
    public void onMessage(Message message) { 
        TextMessage msg  = (TextMessage)message; 
        try { 
            System.out.println(getForm() + ":" + msg.getText()); 
        } catch (JMSException e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
        } 
    } 
 
    public String getForm() { 
        return form; 
    } 
 
    public void setForm(String form) { 
        this.form = form; 
    } 
 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/16311.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论