分布式消息队列RocketMQ部署详解编程语言

一、RocketMQ简介:

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

1、支持严格的消息顺序;

2、支持Topic与Queue两种模式;

3、亿级消息堆积能力;

4、比较友好的分布式特性;

5、同时支持Push与Pull方式消费消息;

官网链接:

rocketmq下载地址:  https://github.com/alibaba/RocketMQ/releases

rocketmq github:     https://github.com/alibaba/RocketMQ

RocketMQ集群安装: 

RocketMQ有多种集群的方式,这里是双master的集群.

优点:优点:配置简单,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)性能最高

一、前期准备

主机(两台)为centOs6.5系统,ip对应主机名如下:

10.10.10.23 rocketmq-master1   #ip和主机名 
10.10.10.24 rocketmq-master2

所需软件:

jdk1.6.0_45.tar.gz          #jdk软件 
alibaba-rocketmq-3.2.6.tar.gz      #mq软件包

hosts信息添加:(两台主机均的添加hosts)

# vim /etc/hosts 
10.10.10.23     mqnameserver1      #ip对应brokerName 
10.10.10.24     mqnameserver2 
10.10.10.23     rocketmq-master1        #ip对应主机名 
10.10.10.24     rocketmq-master2        #ip对应主机名

二、安装

1.>安装JDK

 1 jdk1.6.0_45.tar.gz     #软件包 
 2 #tar -zxvf jdk1.6.0_45.tar.gz -C /apps/product/ 
 3 #ln -s /apps/product/jdk1.6.0_45 java 
 4  
 5 #环境变量配置 
 6 #vim /etc/profile 
 7 JAVA_HOME=/apps/product/java 
 8 CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar 
 9 PATH=$JAVA_HOME/bin:$PATH 
10 export JAVA_HOME CLASSPATH PATH  
11  
12 source /etc/profile 
13 查看jdk版本:java -version

2.>RocketMQ安装: (mq版本:3.2.6)

 1 >>tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /apps/product/ 
 2 >>cd /apps/product/ 
 3 >>mv alibaba-rocketmq-3.2.6  rocketmq 
 4  
 5 环境变量设置: 
 6 >>vim /etc/profile 
 7 JAVA_HOME=/apps/product/java 
 8 CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar 
 9 PATH=$JAVA_HOME/bin:$PATH 
10 ROCKETMQ_HOME=/apps/product/rocketmq 
11 export JAVA_HOME CLASSPATH PATH ROCKETMQ_HOME 
12  
13 >>source /etc/profile 
14  
15 >>useradd admin 
16 >>cd /apps/product/rocketmq/bin && sh os.sh

3.RocketMQ配置:

部署Broker:消息中转角色,负责存储消息,转发消息

Broker配置参数:

1.>获取Broker的默认配置
sh mqbroker -m
2.>Broker启动时,如何加载配置
生成Broker默认配置模版
sh mqbroker -m >broker.p

3.>修改配置文件.
broker.p

4.>加载修改过的配置文件
nohup sh mqbroker -c broker.p  #将broker.p文件复制到broker-a.properties文件,并可以添加其他配置.

(1)、Master1服务器(10.10.10.23)

分布式消息队列RocketMQ部署详解编程语言

 1 vim /apps/product/rocketmq/conf/2m-noslave/broker-a.properties 
 2  
 3 rocketmqHome=/apps/product/rocketmq 
 4 namesrvAddr=10.10.10.23:9876;10.10.10.24:9876 
 5 brokerIP1=10.10.10.23 
 6 brokerName=rocketmq-master1 
 7 brokerClusterName=DefaultCluster 
 8 brokerId=0 
 9 autoCreateTopicEnable=false 
10 autoCreateSubscriptionGroup=true 
11 rejectTransactionMessage=false 
12 fetchNamesrvAddrByAddressServer=false 
13 storePathRootDir=/root/store 
14 storePathCommitLog=/root/store/commitlog 
15 flushIntervalCommitLog=1000 
16 flushCommitLogTimed=false 
17 deleteWhen=04 
18 fileReservedTime=72 
19 diskMaxUsedSpaceRatio=88 
20 maxTransferBytesOnMessageInMemory=262144 
21 maxTransferCountOnMessageInMemory=32 
22 maxTransferBytesOnMessageInDisk=65536 
23 maxTransferCountOnMessageInDisk=8 
24 accessMessageInMemoryMaxRatio=40 
25 messageIndexEnable=true 
26 messageIndexSafe=false 
27 haMasterAddress= 
28 brokerRole=ASYNC_MASTER 
29 flushDiskType=ASYNC_FLUSH 
30 cleanFileForciblyEnable=true 
31 sendMessageThreadPoolNums=128 
32 pullMessageThreadPoolNums=128 
33 brokerPermission=6 
34 defaultTopicQueueNums=8 
35 clusterTopicEnable=true 
36 brokerTopicEnable=true 
37 adminBrokerThreadPoolNums=16 
38 clientManageThreadPoolNums=16 
39 flushConsumerOffsetInterval=5000 
40 flushConsumerOffsetHistoryInterval=60000 
41 sendThreadPoolQueueCapacity=100000 
42 pullThreadPoolQueueCapacity=100000 
43 filterServerNums=0 
44 longPollingEnable=true 
45 shortPollingTimeMills=1000 
46 notifyConsumerIdsChangedEnable=true 
47 offsetCheckInSlave=false 
48 listenPort=10911 
49 serverWorkerThreads=8 
50 serverCallbackExecutorThreads=0 
51 serverSelectorThreads=3 
52 serverOnewaySemaphoreValue=256 
53 serverAsyncSemaphoreValue=64 
54 serverChannelMaxIdleTimeSeconds=120 
55 serverSocketSndBufSize=131072 
56 serverSocketRcvBufSize=131072 
57 serverPooledByteBufAllocatorEnable=false 
58 clientWorkerThreads=4 
59 clientCallbackExecutorThreads=2 
60 clientOnewaySemaphoreValue=2048 
61 clientAsyncSemaphoreValue=2048 
62 connectTimeoutMillis=3000 
63 channelNotActiveInterval=60000 
64 clientChannelMaxIdleTimeSeconds=120 
65 clientSocketSndBufSize=131072 
66 clientSocketRcvBufSize=131072 
67 clientPooledByteBufAllocatorEnable=false 
68 mapedFileSizeCommitLog=1073741824 
69 mapedFileSizeConsumeQueue=6000000 
70 flushIntervalConsumeQueue=1000 
71 cleanResourceInterval=10000 
72 deleteCommitLogFilesInterval=100 
73 deleteConsumeQueueFilesInterval=100 
74 destroyMapedFileIntervalForcibly=120000 
75 redeleteHangedFileInterval=120000 
76 diskMaxUsedSpaceRatio=75 
77 putMsgIndexHightWater=600000 
78 maxMessageSize=524288 
79 checkCRCOnRecover=true 
80 flushCommitLogLeastPages=4 
81 flushConsumeQueueLeastPages=2 
82 flushCommitLogThoroughInterval=10000 
83 flushConsumeQueueThoroughInterval=60000 
84 maxHashSlotNum=5000000 
85 maxIndexNum=20000000 
86 maxMsgsNumBatch=64 
87 haListenPort=10912 
88 haSendHeartbeatInterval=5000 
89 haHousekeepingInterval=20000 
90 haTransferBatchSize=32768 
91 haSlaveFallbehindMax=268435456 
92 syncFlushTimeout=5000 
93 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 
94 flushDelayOffsetInterval=10000

broker-a.properties

(2)、Master2服务器(10.10.10.24)

分布式消息队列RocketMQ部署详解编程语言

 1 vim /apps/product/rocketmq/conf/2m-noslave/broker-b.properties  
 2  
 3 rocketmqHome=/apps/product/rocketmq 
 4 namesrvAddr=10.10.10.24:9876;10.10.10.23:9876 
 5 brokerIP1=10.10.10.24 
 6 brokerName=rocketmq-master2 
 7 brokerClusterName=DefaultCluster 
 8 brokerId=0 
 9 autoCreateTopicEnable=false 
10 autoCreateSubscriptionGroup=true 
11 rejectTransactionMessage=false 
12 fetchNamesrvAddrByAddressServer=false 
13 storePathRootDir=/root/store 
14 storePathCommitLog=/root/store/commitlog 
15 flushIntervalCommitLog=1000 
16 flushCommitLogTimed=false 
17 deleteWhen=04 
18 fileReservedTime=72 
19 diskMaxUsedSpaceRatio=88 
20 maxTransferBytesOnMessageInMemory=262144 
21 maxTransferCountOnMessageInMemory=32 
22 maxTransferBytesOnMessageInDisk=65536 
23 maxTransferCountOnMessageInDisk=8 
24 accessMessageInMemoryMaxRatio=40 
25 messageIndexEnable=true 
26 messageIndexSafe=false 
27 haMasterAddress= 
28 brokerRole=ASYNC_MASTER 
29 flushDiskType=ASYNC_FLUSH 
30 cleanFileForciblyEnable=true 
31 sendMessageThreadPoolNums=128 
32 pullMessageThreadPoolNums=128 
33 brokerPermission=6 
34 defaultTopicQueueNums=8 
35 clusterTopicEnable=true 
36 brokerTopicEnable=true 
37 adminBrokerThreadPoolNums=16 
38 clientManageThreadPoolNums=16 
39 flushConsumerOffsetInterval=5000 
40 flushConsumerOffsetHistoryInterval=60000 
41 sendThreadPoolQueueCapacity=100000 
42 pullThreadPoolQueueCapacity=100000 
43 filterServerNums=0 
44 longPollingEnable=true 
45 shortPollingTimeMills=1000 
46 notifyConsumerIdsChangedEnable=true 
47 offsetCheckInSlave=false 
48 listenPort=10911 
49 serverWorkerThreads=8 
50 serverCallbackExecutorThreads=0 
51 serverSelectorThreads=3 
52 serverOnewaySemaphoreValue=256 
53 serverAsyncSemaphoreValue=64 
54 serverChannelMaxIdleTimeSeconds=120 
55 serverSocketSndBufSize=131072 
56 serverSocketRcvBufSize=131072 
57 serverPooledByteBufAllocatorEnable=false 
58 clientWorkerThreads=4 
59 clientCallbackExecutorThreads=2 
60 clientOnewaySemaphoreValue=2048 
61 clientAsyncSemaphoreValue=2048 
62 connectTimeoutMillis=3000 
63 channelNotActiveInterval=60000 
64 clientChannelMaxIdleTimeSeconds=120 
65 clientSocketSndBufSize=131072 
66 clientSocketRcvBufSize=131072 
67 clientPooledByteBufAllocatorEnable=false 
68 mapedFileSizeCommitLog=1073741824 
69 mapedFileSizeConsumeQueue=6000000 
70 flushIntervalConsumeQueue=1000 
71 cleanResourceInterval=10000 
72 deleteCommitLogFilesInterval=100 
73 deleteConsumeQueueFilesInterval=100 
74 destroyMapedFileIntervalForcibly=120000 
75 redeleteHangedFileInterval=120000 
76 diskMaxUsedSpaceRatio=75 
77 putMsgIndexHightWater=600000 
78 maxMessageSize=524288 
79 checkCRCOnRecover=true 
80 flushCommitLogLeastPages=4 
81 flushConsumeQueueLeastPages=2 
82 flushCommitLogThoroughInterval=10000 
83 flushConsumeQueueThoroughInterval=60000 
84 maxHashSlotNum=5000000 
85 maxIndexNum=20000000 
86 maxMsgsNumBatch=64 
87 haListenPort=10912 
88 haSendHeartbeatInterval=5000 
89 haHousekeepingInterval=20000 
90 haTransferBatchSize=32768 
91 haSlaveFallbehindMax=268435456 
92 syncFlushTimeout=5000 
93 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 
94 flushDelayOffsetInterval=10000

broker-b.properties

配置解析注释文件:

分布式消息队列RocketMQ部署详解编程语言

  1 ## Broker对外服务的监听端口  
  2 listenPort = 10911  
  3 ## Name Server地址  
  4 namesrvAddr=192.168.3.229:9876              
  5 ##本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置 
  6 brokerIP1=192.168.3.229 
  7 ##本机主机名 
  8 brokerName=dev10.com 
  9 ##Broker所属哪个集群,其中缺省值为DefaultCluster 
 10 brokerClusterName=DefaultCluster 
 11 ## BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master与Slave通过BrokerName来配对 
 12 brokerId=0 
 13 ##是否允许Broker自动创建Topic,建议线下开启,线上关闭 
 14 autoCreateTopicEnable=true   //线上改为false 
 15 ##是否允许Broker自动创建订阅组,建议线下开启,线上关闭  
 16 autoCreateSubscriptionGroup=true  //线上改为false 
 17 ##是否拒绝事务消息接入  
 18 rejectTransactionMessage=false 
 19 ##是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式  
 20 fetchNamesrvAddrByAddressServer=false 
 21 ## commitLog存储路径 $HOME/store/commitlog  
 22 storePathCommitLog=/root/store/commitlog 
 23 ##消费队列存储路径  $HOME/store/consumequeue  
 24 storePathConsumeQueue=/root/store/consumequeue 
 25 ##消息索引存储路径 $HOME/store/index  
 26 storePathIndex=/root/store/index 
 27 ## checkpoint文件存储路径 $HOME/store/checkpoint  
 28 storeCheckpoint=/root/store/checkpoint 
 29 ##异常退出产生的文件-//abort文件存储路径 $HOME/store/abort  
 30 abortFile=/root/store/abort 
 31 ##删除文件时间点,默认凌晨4点  
 32 deleteWhen=04 
 33 ##文件保留时间,默认48小时  
 34 fileReservedTime=72 
 35 ##单次Pull消息(内存)传输的最大字节数  
 36 maxTransferBytesOnMessageInMemory=262144 
 37 ##单次Pull消息(内存)传输的最大条数  
 38 maxTransferCountOnMessageInMemory=32 
 39 ##单次Pull消息(磁盘)传输的最大字节数  
 40 maxTransferBytesOnMessageInDisk=65536     
 41 ##单次Pull消息(磁盘)传输的最大条数  
 42 maxTransferCountOnMessageInDisk=8 
 43 ##命中消息在内存的最大比例 
 44 accessMessageInMemoryMaxRatio=40 
 45 ##是否开启消息索引功能  
 46 messageIndexEnable=true 
 47 ##是否提供安全的消息索引机制,索引保证不丢  
 48 messageIndexSafe=false ⇒ 线上改为true 
 49 ##在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置 
 50 haMasterAddress= 
 51 ## Broker的角色 - ASYNC_MASTER异步复制Master-SYNC_MASTER同步双写Master-SLAVE 
 52 brokerRole=ASYNC_MASTER 
 53 ##刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 
 54 flushDiskType=ASYNC_FLUSH 
 55 ##磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除  
 56 cleanFileForciblyEnable=true 
 57 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~非默认配置~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 58 ##服务器服务权限 默认值6 6 读写 4 只读 2只写  
 59 brokerPermission=6 
 60 ##默认8 ,topic的queue数  
 61 defaultTopicQueueNums=4 
 62 ##默认 true 自动创建以集群名字命名的Topic功能 
 63 clusterTopicEnable=true 
 64 ##发送消息线程数 32+cpu*4 
 65 sendMessageThreadPoolNums=16 
 66 ##拉消息处理线程数 32+cpu*4 
 67 pullMessageThreadPoolNums=16 
 68 ##管控命令处理线程数 8 
 69 adminBrokerThreadPoolNums=8 
 70 ##Topic持久化路径 $HOME/store/config/topics.json 
 71 topicConfigPath=/root/store/config/topics.json 
 72 ##ConsumerOffset  $HOME /store/config/topics.json 
 73 consumerOffsetPath=/root/store/config/consumerOffset.json 
 74 ##Broker 配置文件路径 $HOME /store/config/broker.properties 
 75 #brokerConfigPath=/root/store/config /broker.properties 
 76 ##subscriptionGroup 持久化路径 $HOME /store/config/topics.json 
 77 subscriptionGroupPath=/root/store/config /subscriptionGroup.json 
 78 ##刷消费进度 时间 默认5秒 
 79 flushConsumerOffsetInterval=5000 
 80 ##消费进度历史统计 60秒 
 81 flushConsumerOffsetHistoryInterval=60000 
 82 ##查询消息最大时间跨度,单位小时 默认3小时 
 83 queryMessageMaxTimeSpan=3 
 84 =================== NettyServerConfig=================== 
 85 ##netty worker线程数默认 32 
 86 serverWorkerThreads=32 
 87 ##CallbackExecutor 默认是0 ,0表示系统值 
 88 serverCallbackExecutorThreads=0 
 89 ##netty selector 线程 默认 8 
 90 serverSelectorThreads=8 
 91 ##Oneway方式处理线程 
 92 serverOnewaySemaphoreValue=32 
 93 ##Async方式处理线程 
 94 serverAsyncSemaphoreValue=64 
 95 ##通信层最大空闲时间 默认120秒     
 96 serverChannelMaxIdleTimeSeconds=120 
 97 ==================== NettyClientConfig===================== 
 98 ##clientWorker 线程 4 
 99 clientWorkerThreads=4 
100 ##CallbackExecutor 
101 clientCallbackExecutorThreads=4 
102 ###clientSelector 1 
103 clientSelectorThreads=1 
104 ##clientOneway 默认 256 事务会查时可以多些 
105 clientOnewaySemaphoreValue=56 
106 ##clientAsync 默认 128 暂时无太多用途 
107 clientAsyncSemaphoreValue=28 
108 ##连接超时时间 3秒 
109 connectTimeoutMillis=3000 
110 ##channel 不活动时间 无心跳时间 6秒 
111 channelNotActiveInterval=60000 
112 ##channel 空闲时间 120秒 
113 clientChannelMaxIdleTimeSeconds=120 
114  
115 =============== MessageStoreConfig=========================== 
116 ##CommitLog每个文件大小 1G 
117 mapedFileSizeCommitLog=1073741824 
118 ##ConsumeQueue每个文件大小 默认存储50W条消息*8 
119 mapedFileSizeConsumeQueue=10000000 
120 ##CommitLog刷盘间隔时间(单位毫秒) 
121 flushIntervalCommitLog=1000 
122 ##ConsumeQueue刷盘间隔时间(单位毫秒) 
123 flushIntervalConsumeQueue=1000 
124 ##清理资源间隔时间(单位毫秒)10秒 
125 cleanResourceInterval=10000 
126 ##删除多个CommitLog文件的间隔时间(单位毫秒) 
127 deleteCommitLogFilesInterval=100 
128 ##删除多个ConsumeQueue文件的间隔时间(单位毫秒) 
129 deleteConsumeQueueFilesInterval=100 
130 ##强制删除文件间隔时间(单位毫秒)120秒 
131 destroyMapedFileIntervalForcibly=120000 
132 ##定期检查Hanged文件间隔时间(单位毫秒)120秒 
133 redeleteHangedFileInterval=120000 
134 ##磁盘空间最大使用率75% 
135 diskMaxUsedSpaceRatio=75 
136 ##写消息索引到ConsumeQueue,缓冲区高水位,超过则开始流控 600000 
137 putMsgIndexHightWater=600000 
138 ##最大消息大小,默认512K 1M 
139 maxMessageSize=1050000 
140 ##重启时,是否校验CRC 默认是true 
141 checkCRCOnRecover=true 
142 ##刷CommitLog,至少刷几个PAGE 
143 flushCommitLogLeastPages=4 
144 ##刷ConsumeQueue,至少刷几个PAGE 
145 flushConsumeQueueLeastPages=2 
146 ##刷CommitLog,彻底刷盘间隔时间 10秒 
147 flushCommitLogThoroughInterval=10000 
148 ##刷ConsumeQueue,彻底刷盘间隔时间 60秒 
149 flushConsumeQueueThoroughInterval=60000 
150 ##最大被拉取的消息个数,消息在内存 ? 
151 maxTransferCountOnMessageInMemory=32 
152 maxHashSlotNum=5000000 
153 maxIndexNum=20000000 
154 最大key查找拉取条数 
155 maxMsgsNumBatch=32 
156 ========================集群配置部分,非集群不添加==================== 
157 ###HA功能 端口号----非集群不加该配置 
158 haListenPort=10912 
159 ##HA心跳间隔 5秒 
160 haSendHeartbeatInterval=5000 
161 ###HA保持时间20秒 
162 haHousekeepingInterval=20000 
163 ###HA最大一次传输 32k 
164 haTransferBatchSize=32768 
165 ###如果不设置,则从NameServer获取Master HA服务地址 
166 #haMasterAddress= 
167 ###Slave落后Master超过此值,则认为存在异常 1024 * 1024 * 256 256M 
168 #haSlaveFallbehindMax=268435456

Broker参数解析

(3)、runbroker.sh参数调整

vim /apps/product/rocketmq/bin/runbroker.sh 
runbroker.sh需要根据内存大小进行适当地调整 
 
JAVA_OPT_1="-server-Xms8g -Xmx8g -Xmn2g -XX:PermSize=1g -XX:MaxPermSize=1g"

服务启动:

mkdir -p /data/rocketmq/store/commitlog /data/logs    #创建日志存放目录 
 
>>cd /apps/product/rocketmq/conf && sed -i 's#${user.home}#/data#g' *.xml    #更改为data目录

(1)、启动NameServer【两台启动方式相同】

> cd /apps/product/rocketmq/bin 
> nohup sh mqnamesrv &                 或者 nohup sh mqnamesrv > nohup.out 2>&1 &

(2)、启动BrokerServer A【10.10.10.23主机】

cd /apps/product/rocketmq/bin 
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &     或者 
 
#nohup sh mqbroker -n 10.10.10.23:9876 -c ../conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

(3)、启动BrokerServer B【10.10.10.24主机】

 > cd /apps/product/rocketmq/bin 
 nohup sh mqbroker -c ../conf/2m-noslave/broker-b.properties  >/dev/null 2>&1 & 
  
# nohup sh mqbroker -n 10.10.10.24:9876 -c ../conf/2m-noslave/broker-b.properties  >/dev/null 2>&1 & 

(4)、测试:

两台机器分别查看日志信息:  tail -100f /data/logs/rocketmqlogs/namesrv.log

               tail -100f /data/logs/rocketmqlogs/broker.log 

 

#netstat -ntlp   #查看端口号

# jps       #查看服务

 后台启动并生产日志:  nohup sh mqnamesrv >& /var/log/ns.log

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11682.html

(0)
上一篇 2021年7月19日 11:44
下一篇 2021年7月19日 11:44

相关推荐

发表回复

登录后才能评论