这篇文章给大家分享的是有关ActiveMQ中Network的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
一、在一台服务器上启动多个Broker
-
步骤如下(为集群做准备):
1:把整个conf文件夹复制一份,比如叫做conf2
2:修改里面的activemq.xml文件
(1)里面的brokerName不能跟原来的重复
(2)数据存放的文件名称不能重复,比如:
<hahaDB directory=”${activemq.data}/kahadb_2”/>
(3)所有设计的transportConnectors的端口,都要跟前面的不一样
3:修改jetty.xml,主要就是修改端口,比如:
<property name=”port” value=”8181”/>端口必须和前面的不一样
4:到bin下面,复制一个activemq,比如叫做activemq2;
(1)修改程序的id,不能和前面的重复
ACTIVEMQ_PIDFILE=”$ACTIVEMQ_DATA/activemq2-hostname.pid”
(2)修改配置文件路径
ACTIVEMQ_CONF=”$ACTIVEMQ_BASE/conf2”
(3)修改端口,里面有个tcp的61616的端口,要改成不一样的,最好跟 activemq.xml里面的ctp的端口一致
(4)然后就可以执行了。
二、ActiveMQ的静态网络链接
-
ActiveMQ的networkConnector是什么
在某些场景下,需要多个ActiveMQ的Broker做集群,那么久涉及到Broker的通信,这个被称为ActiveMQ的networkConnector。
ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息。这就是所谓的“桥接”。ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下: -
“discovery”的概念
一般情况下,discovery是被用来发现远程的服务,客户端通常想去发现所有可利用的brokers;另一层意思,它是基于现有的网络Broker去发现其他可用的Brokers。
有两种配置Client到Broker的链接方式,一种方式:Client通过Statically配置的方式去连接Broker,另一种方式:Client通过discovery agents来dynamically发现Brokers -
Static networks
Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker。这种协议用于复合url,一个复合url包括多个url地址。格式如下:
static:(uri1,uri2,uri3,…)?Key=value
1:配置示例如下(activemq.xml–注释掉persistenceFactory节点):<networkConnectors> <networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/> </networkConnectors>
-
Static networkConnector的基本原来示意图:
上图中,两个Brokers是通过一个static的协议来网络链接的。一个Consumer链接到BrokerB的一个地址上,当Producer在BrokerA上以相同的地址发送消息时,此时它将被转移到BrokerB上。也就是,BrokerA会转发消息到BrokerB上。 -
networkConnector配置的可用属性
1、name:默认是bridge
2、dynamicOnly:默认是false,如果为true,持久订阅被激活时才创建对应的网络持久订阅。默认是启动时激活
3、decreaseNetworkConsumerPriority:默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0
4、networkTTL:默认是1,网络中用于消息和订阅消费的broker数量
5、messageTTL:默认是1,网络中用于消息的broker数量
6、consumerTTL:默认是1,网络中用于消费的broker数量
7、conduitSubscriptions:默认true,是否把同一个broker的多个consumer当做一个来处理
8、dynamicallyIncludedDestinations:默认为空,要包括的动态消息地址,类似于excludedDestinations,如<dynamicallyIncludedDestinations> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </dynamicallyIncludedDestinations>
9、staticallyIncludedDestinations:默认为空,要包括的静态消息地址。类似于excludedDestinations,如:
<staticallyIncludedDestinations> <queue physicalName="always.include.queue"/> </staticallyIncludedDestinations>
10、excludedDestinations:默认为空,指定排除的地址,示例如下:
11、duplex:默认false,设置是否能双向通信
12、prefetchSize:默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息
13、suppressDuplicateQueueSubscriptions:默认false,如果为true,重复的订阅关系一产生即被阻止
14、bridgeTempDestinations:默认true,是否广播advisory messages来创建临时destination
15、alwaysSyncSend:默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker
16、staticBridge:默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理
三、“丢失”的消息
存在这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,消费broker2上的消息。消息先被broker1从broke2上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是又一部分他们还没有消费的消息被broker2已经分发到broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。
从5.6版起,在destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,示例如下:
<destinationPolicy> <policyMap> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyMap> </destinationPolicy>
四、容错的连接
-
Failover Protocol
之前的都是Client配置链接到指定的broker上。但是,如果Broker的链接失败怎么办?此时,Client有两个选项:要么立刻死掉,要么去连接到其它的broker上。
Failover协议实现了自动重新链接的逻辑。这里有两种方式提供了稳定的brokers列表对于Client链接。
第一种:提供一个static的可用的Brokers列表
第二种:提供一个dynamic发现的可用Brokers -
Failover Protocol的默认方式
failover:(uri1,…,uriN)?key=value或者failover:uri1,…,uriN
& Failover Protocol的默认配置
默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其它的Broker上。默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。所有的重新链接参数都是可以根据应用的需要而配置的。 -
Failover Protocol的使用示例,在客户端程序里面
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory ("failover:(tcp://localhost:61616,tcp://localhost:61617) ?randomize=false");
-
Failover Protocol可用的配置参数:
1、initialReconnectDelay:第一次尝试重连之前等待的时间(毫秒)默认10
2、maxReconnectDelay:最长重连的时间间隔(毫秒),默认30000
3、useExponentialBackOff:重连时间间隔是否以指数形式增长,默认true
4、backOffMultiplier:递增倍数,默认2.0
5、maxReconnectAttempts:默认-1|0,自版本5.6起,-1为默认值,代表不限重连次数;0代表从不重试(只尝试连接一次,并不重连),5.6版本之前,0为默认值,代表不限重试次数;大于0的数,代表最大重试次数。
6、startupMaxReconnectAttempts:初始化时的最大重连次数。一旦连接上,将使用maxReconnectAttempts的配置,默认0
7、Randomize:使用随机连接,以达到负载均衡的目的,默认true
8、Backup:提前初始化一个未使用连接,以便进行快速失败转移,默认false
9、timeout:设置发送操作的超时时间(毫秒),默认-1
10、trackMessages:设置是否缓存[故障发生时]尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺序传送,默认false
11、maxCacheSize:当trackMessages启用时,缓存的最大字节,默认为128*1024 字节
12、updateURIsSupported:设定是否可以动态修改broker rui(自版本5.4起),默认true
五、动态网络连接(纯理论)
多播协议multicast
ActiveMQ使用Multicast协议将一个Service和其他的Broker的Service连接起来。IP multicast是一个被用于网络中传输数据到其他一组接收者的技术。
IP multicast传统的概念成为组地址。组地址是ip地址在224.0.0.0到239.255.255.255之间的ip地址。ActiveMQ broker使用multicast协议去建立服务与远程的broker的服务的网络连接。
-
基本的格式配置
multicast://ip address:port?transportOptions
1、group:表示唯一的组名称,缺省值default
2、minmumWireFormatVersion:被允许的最小的wireformat版本,缺省为0
3、trace:是否追踪记录日志,默认false
4、useLocalHost:表示本地机器的名称是否为localhost,默认true
5、datagramSize:特定的数据大小,默认值41024
6、timeToLive:消息的声明周期值,默认值-1
7、loopBackMode:是否启用loopback模式,默认false
8、wireFormat:默认用wireFormat命名
9、wireFormat.:前缀是wireFormat -
配置示例
1:默认配置,默认情况下是不可靠的多播,数据包可能会丢失
multicast://default
2:特定的ip和端口
multicast://224.1.2.3:6255
3:特定的ip和端口以及组名
multicast://224.1.2.3:6255?group=mygroupname -
ActiveMQ使用multicast协议的配置格式如下:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.data}/data"> <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors> </broker>
说明:
1:uri="multicast://default"中的default是activemq默认的ip,默认动态 的寻找地址
2:"discoveryUri"是指在transport中用multicast的default的地址传递
3:"uri"指动态寻找可利用的地址
4:如果防止自动的寻找地址?
(1)名称为openwire的transport,移除discoveryUri="multicast:
//default"即可。传输连接用默认的名称openwire来配置broker的tcp多点连接,这将允许其他broker能够自动发现和连接到可用的broker中。
(2)名称为"default-nc"的networkConnector,注释掉或者删除即可。
ActiveMQ默认的networkConnector基于multicast协议的连接的默认名称是default-nc,而且自动的去发现其他broker。去停止这种行为,只需要注销或者删除掉default-nc网络连接。
(3)使brokerName的名字唯一,可以唯一识别Broker的实例,默认是localhost -
multicast协议和普通的tcp协议
它们是差不多的,不同的是multicast能够自动的发现其他broker,从而替代了使用static功能列表brokers。用multicast协议可以在网络中频繁的添加和删除ip不会有影响。
multicast协议
好处:能够适应动态变化的地址。
缺点:自动连接地址会过渡的消耗网络资源
Discovery协议
Discovery是在multicast协议的功能上定义的。功能类似与Failover功能。它将动态的发现multicast协议的broker的连接并且随机的连接其中一个broker。
-
基本配置如下:
discovery:(discoveryAgentURI)?transportOptions
1、reconnectDelay:再次寻址等待时间,缺省值10
2、initialReconnectDelay:初始化设定再次寻址等待时间,缺省值10
3、maxReconnectDelay:最大寻址等待时间,缺省值30000
4、useExponentialBackOff:是否尝试BackOff重连接,默认是true
5、backOffMultiplier:尝试Backoff的次数,默认是2
6、maxReconnectAttempts:如果异常,最大的重新连接个数,默认是0
7、Group:组唯一的地址,默认是default
示例:
discovery:(multicast://default)?initialReconnectDelay=100 -
Discovery协议的配置示例
<broker name="foo"> <transportConnectors> <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> </transportConnectors> </broker>
Peer协议
ActiveMQ提供了peer transport connector提供了更加容易的去嵌入broker网络中。它创建一个优于vm连接的p2p网络连接。默认格式如下:
peer://peergroup/brokerName?Key=value
-
Peer协议的基本使用
当我们启用了peer协议时,应用将自动的启动内嵌broker,也将会自动的去配置其它broker来建立连接,前提是必须要有一个组。配置如下:
peer://groupa/broker1?Persistent=false
另外,生产者和消费者都各自连接到嵌入到自己应用的broker,并且在本地的同一个组名中相互访问数据。 -
Peer协议的基本原理
在本地机器断网的情况下,本地的client访问本地brokerA将依然正常。
在断网的情况下发送消息到本地brokerA,然后网路连接正常后,所有的消息将重新发送并连接到brokerB。
Fanout协议
Fanout协议是同时连接到多个broker,默认的格式如下:
fanout:(fanoutURI)?key=value
示例:fanout:(static:(tcp://host1:61616,tcp://host2:61616))
表示client将试图连接到两个static列表中定义的三个URI
-
Fanout协议的配置参数如下:
1、initialReconnectDelay:重新连接的等待时间,默认是10
2、maxReconnectDelay:最大重新连接的等待时间,默认是30000
3、useExponentialBackOff:是否尝试BackOff重连接,默认是true
4、backOffMultiplier:尝试Backoff的次数,默认是2
5、maxReconnectAttempts:如果异常,最大的重新连接个数,默认是0
6、fanOutQueues:是否将topic消息转换queue消息,默认false
7、minAckCount:Broker连接的最小数,默认是2
配置示例:
fanout:(static:(tcp://localhost:61616,tcp://remotehost:61616))?
initialReconnectDelay=100
注意:
ActiveMQ并不推荐Consumer使用fanout协议。当Provider发送消息到多个broker中,测试Consumer可能受到重复的消息。
感谢各位的阅读!关于“ActiveMQ中Network的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/208049.html