ActiveMQ 与ActiveMQ-CPP详解程序员

一天功夫终于知道怎么使用activeMQ的c++接口了。 花费时间最多的是不知道activemqcpp需要在activeMQ的基础上使用。

下面分别讲述activeMQ和activemqcpp的安装和使用。

一。 ActiveMQ的安装

      1. 下载 源包: http://activemq.apache.org/download-archives.html

       2. tar zxvf  apache-activemq-xxx.bin.tar.gz -C /usr/local
       3. cd apache-activemq-xxx/bin

       4. chmod 755 activemq

       5. 启动   ./activemq start

       启动后就可以查 看:  http://localhost:8161/admin, 默认 用户名和密码都是admin

 二。 安装ActiveMQ-CPP

          需要较多的依赖。参考 http://blog.csdn.net/lgh1700/article/details/51055784

(1)cppunit 
 
打开http://activemq.apache.org/cms/building.html页面,这里介绍了cms build时用到的依赖库。 
cppunit下载页面: 
https://sourceforge.net/projects/cppunit/files/cppunit/1.12.1/ 
这里选择1.12.1版本,获取到下载地址后,在linux下可以用wget命令直接下载,或者下载完成后传到linux系统中。 
 
tar解压后,进入目录,编译三部曲,configure、make、make install(install需要root权限): 
./configure --prefix=/usr/local/cppunit/ 
make 
make install 
执行完后在/usr/local/cppunit/目录下可以看到头文件和库文件。 
(2)apr 
 
apr的全称为Apache Portable Runtime(Apache可移植运行时),Apache旗下有很多开源软件。 
 
apr介绍页面: 
http://apr.apache.org/download.cgi 
这里选择最新的APR 1.5.2版本,地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-1.5.2.tar.gz 
 
同上,解压进入目录,三部曲: 
./configure --prefix=/usr/local/apr/ 
make 
make install 
(3)apr-util 
 
这里选择最新的APR-util 1.5.4版本,下载地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-util-1.5.4.tar.gz 
 
解压编译: 
./configure --prefix=/usr/local/aprutil --with-apr=/usr/local/apr/ 
make 
make install 
(4)apr-iconv 
 
这里选择最新的APR iconv 1.2.1版本,地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-iconv-1.2.1.tar.gz 
 
解压编译: 
./configure --prefix=/usr/local/apr-iconv/ --with-apr=/usr/local/apr/ 
make 
make install 
(5)openssl 
 
这里选择openssl 1.0.0a版本,下载地址为: 
http://www.openssl.org/source/openssl-1.0.0a.tar.gz 
 
解压编译: 
./config --prefix=/usr/local/openssl/ 
make 
make install 
 
若出现报错 
cms.pod around line 457: Expected text after =item, not a number 
在root权限下,执行rm -f /usr/bin/pod2man 然后重新make install 
(6)ActiveMQ-CPP 
 
这里选择最新的ActiveMQ-CPP 3.9.3版本,下载页面为: 
http://activemq.apache.org/cms/activemq-cpp-393-release.html 
 
解压编译: 
./configure --prefix=/usr/local/ActiveMQ-CPP --with-apr=/usr/local/apr/ --with-apr-util=/usr/local/aprutil --with-cppunit=/usr/local/cppunit --with-openssl=/usr/local/openssl 
make 
make install  
这样,/usr/local/下有了ActiveMQ-CPP目录。 包含activemqcpp的头文件和lib文件 

三。 示例

        启动ActiveMQ,然后启动activemqcpp的代码。 再查看http://localhost:8161/admin中的queue、topic等

       测试用例使用了别人的:

/* 
* Licensed to the Apache Software Foundation (ASF) under one or more 
* contributor license agreements.  See the NOTICE file distributed with 
* this work for additional information regarding copyright ownership. 
* The ASF licenses this file to You under the Apache License, Version 2.0 
* (the "License"); you may not use this file except in compliance with 
* the License.  You may obtain a copy of the License at 
* 
*	 http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 
// START SNIPPET: demo 
#include <activemq/library/ActiveMQCPP.h> 
#include <decaf/lang/Thread.h> 
#include <decaf/lang/Runnable.h> 
#include <decaf/util/concurrent/CountDownLatch.h> 
#include <decaf/lang/Integer.h> 
#include <decaf/lang/Long.h> 
#include <decaf/lang/System.h> 
#include <activemq/core/ActiveMQConnectionFactory.h> 
#include <activemq/util/Config.h> 
#include <cms/Connection.h> 
#include <cms/Session.h> 
#include <cms/TextMessage.h> 
#include <cms/BytesMessage.h> 
#include <cms/MapMessage.h> 
#include <cms/ExceptionListener.h> 
#include <cms/MessageListener.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <iostream> 
#include <memory> 
#include <decaf/util/Random.h> 
using namespace activemq::core; 
using namespace decaf::util::concurrent; 
using namespace decaf::util; 
using namespace decaf::lang; 
using namespace cms; 
using namespace std; 
#define  QUEUE_NAME	"eventQueue" 
#define NAME_BYTE_LEN		16 
#define USE_PRODUCER 1 
class HelloWorldProducer : public ExceptionListener, 
public MessageListener, 
public Runnable { 
private: 
CountDownLatch latch; 
CountDownLatch doneLatch; 
Connection* connection; 
Session* session; 
Destination* destination; 
MessageProducer* producer; 
int numMessages; 
bool useTopic; 
bool sessionTransacted; 
std::string brokerURI; 
bool bReciveMessage; 
long waitMillis; 
private: 
HelloWorldProducer(const HelloWorldProducer&); 
HelloWorldProducer& operator=(const HelloWorldProducer&); 
public: 
HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, 
long waitMillis=3000) : 
latch(1), 
doneLatch(numMessages),   
connection(NULL), 
session(NULL), 
destination(NULL), 
producer(NULL), 
numMessages(numMessages), 
useTopic(useTopic), 
sessionTransacted(sessionTransacted), 
brokerURI(brokerURI) , 
bReciveMessage(false), 
waitMillis(waitMillis) 
{ } 
virtual ~HelloWorldProducer(){ 
cleanup(); 
} 
void close() { 
this->cleanup(); 
} 
void waitUntilReady() { 
latch.await(); 
} 
virtual void run() { 
try { 
cout<<"producer run"<<endl; 
// Create a ConnectionFactory 
auto_ptr<ConnectionFactory> connectionFactory( 
ConnectionFactory::createCMSConnectionFactory(brokerURI)); 
cout<<"98"<<endl; 
cout<<"brokerURI = "<<brokerURI<<endl; 
// Create a Connection 
connection = connectionFactory->createConnection(); 
cout<<"101"<<endl; 
connection->start(); 
cout<<"103"<<endl; 
// Create a Session 
if (this->sessionTransacted) { 
session = connection->createSession(Session::SESSION_TRANSACTED); 
} else { 
session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
} 
session = connection->createSession(); 
cout<<"session"<<endl; 
// Create the destination (Topic or Queue) 
if (useTopic) { 
destination = session->createTopic(QUEUE_NAME); 
} else { 
destination = session->createQueue(QUEUE_NAME); 
} 
cout<<"topic or queeu"<<endl; 
// Create a MessageProducer from the Session to the Topic or Queue 
producer = session->createProducer(destination); 
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 
cout<<"topic or queeu 2222"<<endl; 
// Create the Thread Id String 
string threadIdStr = Long::toString(Thread::currentThread()->getId()); 
// Create a messages 
string text = (string) "Hello world! from thread " + threadIdStr; 
cout<<text<<endl; 
cout<<numMessages<<endl; 
for (int ix = 0; ix < numMessages; ++ix) { 
std::auto_ptr<TextMessage> message(session->createTextMessage(text)); 
//鍏抽敭娑堟伅... 
std::auto_ptr<Destination> tempDest(session->createTemporaryQueue()); 
//cms::Destination tempDest=session->createTemporaryTopic() ;  
MessageConsumer * responseConsumer = session->createConsumer(tempDest.get());   
responseConsumer->setMessageListener(this);//鐩戝惉... 
message->setCMSReplyTo(tempDest.get()); 
Random random; 
char buffer[NAME_BYTE_LEN]={0}; 
random.nextBytes((unsigned char *)buffer,NAME_BYTE_LEN); 
string correlationId=""; 
for(int i=0;i<NAME_BYTE_LEN;++i) 
{ 
char ch[NAME_BYTE_LEN*2]={0}; 
sprintf(ch,"%02X",(unsigned char)buffer[i]); 
string str(ch); 
correlationId+=str; 
} 
message->setCMSCorrelationID(correlationId); 
message->setIntProperty("Integer", ix); 
printf("Producer Sent message #%d from thread %s/n", ix + 1, threadIdStr.c_str()); 
producer->send(message.get()); 
// Indicate we are ready for messages. 
latch.countDown(); 
// Wait while asynchronous messages come in. 
doneLatch.await(waitMillis); 
}  
}	 
catch (CMSException& e) { 
printf("Producer run() CMSException /n" ); 
// Indicate we are ready for messages. 
latch.countDown(); 
e.printStackTrace(); 
} 
} 
// Called from the Producer since this class is a registered MessageListener. 
virtual void onMessage(const Message* message) { 
static int count = 0; 
try { 
count++; 
const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 
//ActiveMQMessageTransformation 
//std::auto_ptr<TextMessage> responsemessage(session->createTextMessage()); 
//responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 
//responsemessage->getCMSReplyTo() 
string text = ""; 
if (textMessage != NULL) { 
text = textMessage->getText(); 
} else { 
text = "NOT A TEXTMESSAGE!"; 
} 
printf("Producer Message #%d Received: %s/n", count, text.c_str()); 
//producer.send 
} catch (CMSException& e) { 
printf("Producer onMessage() CMSException /n" ); 
e.printStackTrace(); 
} 
// Commit all messages. 
if (this->sessionTransacted) { 
session->commit(); 
} 
// No matter what, tag the count down latch until done. 
doneLatch.countDown(); 
} 
// If something bad happens you see it here as this class is also been 
// registered as an ExceptionListener with the connection. 
virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
printf("Producer onException() CMS Exception occurred.  Shutting down client. /n" ); 
ex.printStackTrace(); 
exit(1); 
} 
private: 
void cleanup() { 
if (connection != NULL) { 
try { 
connection->close(); 
} catch (cms::CMSException& ex) { 
ex.printStackTrace(); 
} 
} 
// Destroy resources. 
try { 
delete destination; 
destination = NULL; 
delete producer; 
producer = NULL; 
delete session; 
session = NULL; 
delete connection; 
connection = NULL; 
} catch (CMSException& e) { 
e.printStackTrace(); 
} 
} 
}; 
class HelloWorldConsumer : public ExceptionListener, 
public MessageListener, 
public Runnable { 
private: 
CountDownLatch latch; 
CountDownLatch doneLatch; 
Connection* connection; 
Session* session; 
Destination* destination; 
MessageConsumer* consumer; 
MessageProducer *producer; 
long waitMillis; 
bool useTopic; 
bool sessionTransacted; 
std::string brokerURI; 
private: 
HelloWorldConsumer(const HelloWorldConsumer&); 
HelloWorldConsumer& operator=(const HelloWorldConsumer&); 
public: 
HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) : 
latch(1), 
doneLatch(numMessages), 
connection(NULL), 
session(NULL), 
destination(NULL), 
consumer(NULL), 
producer(NULL), 
waitMillis(waitMillis), 
useTopic(useTopic), 
sessionTransacted(sessionTransacted), 
brokerURI(brokerURI) { 
} 
virtual ~HelloWorldConsumer() { 
cleanup(); 
} 
void close() { 
this->cleanup(); 
} 
void waitUntilReady() { 
latch.await(); 
} 
virtual void run() { 
try { 
cout<<"consumer run"<<endl; 
// Create a ConnectionFactory 
auto_ptr<ConnectionFactory> connectionFactory( 
ConnectionFactory::createCMSConnectionFactory(brokerURI)); 
// Create a Connection 
connection = connectionFactory->createConnection(); 
connection->start(); 
connection->setExceptionListener(this); 
// Create a Session 
if (this->sessionTransacted == true) { 
session = connection->createSession(Session::SESSION_TRANSACTED); 
} else { 
session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
} 
// Create the destination (Topic or Queue) 
if (useTopic) { 
destination = session->createTopic(QUEUE_NAME); 
} else { 
destination = session->createQueue(QUEUE_NAME); 
} 
producer = session->createProducer(); 
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 
// Create a MessageConsumer from the Session to the Topic or Queue 
consumer = session->createConsumer(destination); 
consumer->setMessageListener(this); 
std::cout.flush(); 
std::cerr.flush(); 
// Indicate we are ready for messages. 
latch.countDown(); 
// Wait while asynchronous messages come in. 
doneLatch.await(); 
} catch (CMSException& e) { 
printf("Consumer onException() CMS Exception occurred.  Shutting down client. /n" ); 
// Indicate we are ready for messages. 
latch.countDown(); 
e.printStackTrace(); 
} 
} 
// Called from the consumer since this class is a registered MessageListener. 
virtual void onMessage(const Message* message) { 
static int count = 0; 
try { 
count++; 
// Create the Thread Id String 
string threadIdStr = Long::toString(Thread::currentThread()->getId()); 
static bool bPrintf=true; 
if(bPrintf) 
{ 
bPrintf=false; 
printf("consumer Message threadid: %s/n",  threadIdStr.c_str()); 
} 
string strReply="consumer return  xxx,ThreadID="+threadIdStr; 
const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 
if(NULL==textMessage) 
{ 
printf("NULL==textMessage", message->getCMSType().c_str()); 
//const cms::MapMessage* mapMsg = dynamic_cast<const cms::MapMessage*>(message); 
//if(mapMsg) 
//{ 
//	 
//	std::vector<std::string> elements = mapMsg->getMapNames(); 
//	std::vector<std::string>::iterator iter = elements.begin(); 
//	for(; iter != elements.end() ; ++iter)  
//	{ 
//		std::string key = *iter; 
//		cms::Message::ValueType elementType = mapMsg->getValueType(key); 
//		string strxxx; 
//		int cc=0; 
//		switch(elementType) { 
//	case cms::Message::BOOLEAN_TYPE: 
//		//msg->setBoolean(key, mapMsg->getBoolean(key)); 
//		break; 
//	case cms::Message::BYTE_TYPE: 
//		//msg->setByte(key, mapMsg->getByte(key)); 
//		break; 
//	case cms::Message::BYTE_ARRAY_TYPE: 
//		//msg->setBytes(key, mapMsg->getBytes(key)); 
//		break; 
//	case cms::Message::CHAR_TYPE: 
//		//msg->setChar(key, mapMsg->getChar(key)); 
//		break; 
//	case cms::Message::SHORT_TYPE: 
//		//msg->setShort(key, mapMsg->getShort(key)); 
//		break; 
//	case cms::Message::INTEGER_TYPE: 
//		//msg->setInt(key, mapMsg->getInt(key)); 
//		break; 
//	case cms::Message::LONG_TYPE: 
//		//msg->setLong(key, mapMsg->getLong(key)); 
//		break; 
//	case cms::Message::FLOAT_TYPE: 
//		//msg->setFloat(key, mapMsg->getFloat(key)); 
//		break; 
//	case cms::Message::DOUBLE_TYPE: 
//		//msg->setDouble(key, mapMsg->getDouble(key)); 
//		break; 
//	case cms::Message::STRING_TYPE: 
//		//msg->setString(key, mapMsg->getString(key)); 
//		strxxx=mapMsg->getString(key); 
//		cc=1; 
//		break; 
//	default: 
//		break; 
//		} 
//	} 
//} 
return; 
} 
std::auto_ptr<TextMessage> responsemessage(session->createTextMessage(strReply)); 
responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 
string text = ""; 
if (textMessage != NULL) { 
text = textMessage->getText(); 
} else { 
text = "NOT A TEXTMESSAGE!"; 
} 
int nProPerty=textMessage->getIntProperty("Integer"); 
printf("consumer Message #%d Received: %s,nProPerty[%d]/n", count, text.c_str(),nProPerty); 
const cms::Destination* destSend=textMessage->getCMSReplyTo(); 
if(destSend) 
{ 
this->producer->send(destSend,responsemessage.get()); 
printf("consumer Message #%d send: %s/n", count, strReply.c_str()); 
} 
} catch (CMSException& e) { 
printf("Consumer onMessage() CMS Exception occurred.  Shutting down client. /n" ); 
e.printStackTrace(); 
} 
// Commit all messages. 
if (this->sessionTransacted) { 
session->commit(); 
} 
// No matter what, tag the count down latch until done. 
//doneLatch.countDown(); 
} 
// If something bad happens you see it here as this class is also been 
// registered as an ExceptionListener with the connection. 
virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
printf("Consumer onException() CMS Exception occurred.  Shutting down client. /n" ); 
//printf("CMS Exception occurred.  Shutting down client./n"); 
ex.printStackTrace(); 
exit(1); 
} 
private: 
void cleanup() { 
if (connection != NULL) { 
try { 
connection->close(); 
} catch (cms::CMSException& ex) { 
ex.printStackTrace(); 
} 
} 
// Destroy resources. 
try { 
delete destination; 
destination = NULL; 
delete consumer; 
consumer = NULL; 
delete session; 
session = NULL; 
delete connection; 
connection = NULL; 
} catch (CMSException& e) { 
e.printStackTrace(); 
} 
} 
}; 
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { 
//if(argc<2) 
//{ 
//	printf("argc<2/r/n"); 
//	return 0; 
//} 
activemq::library::ActiveMQCPP::initializeLibrary(); 
{ 
std::cout << "=====================================================/n"; 
std::cout << "Starting the example:" << std::endl; 
std::cout << "-----------------------------------------------------/n"; 
// Set the URI to point to the IP Address of your broker. 
// add any optional params to the url to enable things like 
// tightMarshalling or tcp logging etc.  See the CMS web site for 
// a full list of configuration options. 
// 
//  http://activemq.apache.org/cms/ 
// 
// Wire Format Options: 
// ========================= 
// Use either stomp or openwire, the default ports are different for each 
// 
// Examples: 
//	tcp://127.0.0.1:61616					  default to openwire 
//	tcp://127.0.0.1:61616?wireFormat=openwire  same as above 
//	tcp://127.0.0.1:61613?wireFormat=stomp	 use stomp instead 
// 
// SSL: 
// ========================= 
// To use SSL you need to specify the location of the trusted Root CA or the 
// certificate for the broker you want to connect to.  Using the Root CA allows 
// you to use failover with multiple servers all using certificates signed by 
// the trusted root.  If using client authentication you also need to specify 
// the location of the client Certificate. 
// 
//	 System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" ); 
//	 System::setProperty( "decaf.net.ssl.keyStorePassword", "password" ); 
//	 System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" ); 
// 
// The you just specify the ssl transport in the URI, for example: 
// 
//	 ssl://localhost:61617 
// 
std::string brokerURI = 
"failover:(tcp://localhost:61616" 
//		"?wireFormat=openwire" 
//		"&transport.useInactivityMonitor=false" 
//		"&connection.alwaysSyncSend=true" 
//		"&connection.useAsyncSend=true" 
//		"?transport.commandTracingEnabled=true" 
//		"&transport.tcpTracingEnabled=true" 
//		"&wireFormat.tightEncodingEnabled=true" 
")"; 
//============================================================ 
// set to true to use topics instead of queues 
// Note in the code above that this causes createTopic or 
// createQueue to be used in both consumer an producer. 
//============================================================ 
bool useTopics = false; 
bool sessionTransacted = true; 
int numMessages = 1; 
bool useConsumer=true; 
bool useProducer=true; 
//int nSet=atoi(argv[1]); 
//if(1==nSet) 
//{ 
//#define USE_COMSUMER 
//} 
//else 
//{ 
//#define USE_PRODUCER 
// 
//} 
long long startTime = System::currentTimeMillis(); 
#ifdef USE_PRODUCER 
printf("褰撳墠 USE_PRODUCER /r/n"); 
int numProducerMessages = 30; 
int nThreadNumber=10; 
vector<HelloWorldProducer *> vHelloWorldProducer; 
for(int i=0;i<nThreadNumber;++i) 
{ 
HelloWorldProducer * producerTemp=new HelloWorldProducer(brokerURI, numProducerMessages, useTopics); 
vHelloWorldProducer.push_back(producerTemp); 
} 
#endif 
#ifdef USE_COMSUMER 
printf("褰撳墠 USE_COMSUMER /r/n"); 
HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted); 
// Start the consumer thread. 
Thread consumerThread(&consumer); 
consumerThread.start(); 
// Wait for the consumer to indicate that its ready to go. 
consumer.waitUntilReady(); 
#endif 
#ifdef USE_PRODUCER 
// Start the producer thread. 
vector<Thread *> vThread; 
for(int i=0;i<nThreadNumber;++i) 
{ 
HelloWorldProducer & ProducerTemp=*vHelloWorldProducer[i]; 
Thread * threadTemp=new Thread(&ProducerTemp); 
vThread.push_back(threadTemp); 
threadTemp->start(); 
ProducerTemp.waitUntilReady(); 
} 
for(int i=0;i<vThread.size();++i) 
{ 
Thread * threadTemp=vThread[i]; 
//threadTemp->join(); 
} 
while(1) 
{ 
Thread::sleep(10); 
} 
//Thread producerThread1(&producer1); 
//producerThread1.start(); 
//producer1.waitUntilReady(); 
//Thread producerThread2(&producer2); 
//producerThread2.start(); 
//producer2.waitUntilReady(); 
//Thread producerThread3(&producer3); 
//producerThread3.start(); 
//producer3.waitUntilReady(); 
#endif 
#ifdef USE_PRODUCER 
// Wait for the threads to complete. 
//producerThread1.join(); 
//producerThread2.join(); 
//producerThread3.join(); 
#endif 
#ifdef USE_COMSUMER 
consumerThread.join(); 
#endif 
long long endTime = System::currentTimeMillis(); 
double totalTime = (double)(endTime - startTime) / 1000.0; 
#ifdef USE_PRODUCER 
//producer1.close(); 
//producer2.close(); 
//producer3.close(); 
for(int i=0;i<vHelloWorldProducer.size();++i) 
{ 
HelloWorldProducer * ProducerTemp=vHelloWorldProducer[i]; 
ProducerTemp->close(); 
if(ProducerTemp) 
{ 
delete ProducerTemp; 
ProducerTemp=NULL; 
} 
} 
#endif 
#ifdef USE_COMSUMER 
consumer.close(); 
#endif 
std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; 
std::cout << "-----------------------------------------------------/n"; 
std::cout << "Finished with the example." << std::endl; 
std::cout << "=====================================================/n"; 
} 
activemq::library::ActiveMQCPP::shutdownLibrary(); 
return 0; 
} 
// END SNIPPET: demo 

以上代码即包含了生产者又包含了消费者。 使用ifdef实现分开编译。   将以上代码做两份,分别编译生产者和消费者。

在终端1中启动生产者。 会发现8161/admin中的queue中有event_queue, 并且有消息不断入队列;

另一个终端启动消费者,可以发现消息队列中消息在被消费。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/7469.html

(0)
上一篇 2021年7月17日
下一篇 2021年7月17日

相关推荐

发表回复

登录后才能评论